Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : /*
52 : : * Check if relation can be in given publication and throws appropriate
53 : : * error if not.
54 : : */
55 : : static void
87 akapila@postgresql.o 56 :GNC 807 : check_publication_add_relation(PublicationRelInfo *pri)
57 : : {
58 : 807 : Relation targetrel = pri->relation;
59 : : const char *relname;
60 : : const char *errormsg;
61 : :
62 [ + + ]: 807 : if (pri->except)
63 : : {
22 64 : 81 : relname = RelationGetQualifiedRelationName(targetrel);
59 65 : 81 : errormsg = gettext_noop("cannot specify relation \"%s\" in the publication EXCEPT clause");
66 : : }
67 : : else
68 : : {
22 69 : 726 : relname = RelationGetRelationName(targetrel);
87 70 : 726 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
71 : : }
72 : :
73 : : /* If in EXCEPT clause, must be root partitioned table */
74 [ + + + + ]: 807 : if (pri->except && targetrel->rd_rel->relispartition)
75 [ + - ]: 4 : ereport(ERROR,
76 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
77 : : errmsg(errormsg, relname),
78 : : errdetail("This operation is not supported for individual partitions.")));
79 : :
80 : : /* Must be a regular or partitioned table */
2272 peter@eisentraut.org 81 [ + + ]:CBC 803 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
1514 tomas.vondra@postgre 82 [ + + ]: 115 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
3418 peter_e@gmx.net 83 [ + - ]: 9 : ereport(ERROR,
84 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : : errmsg(errormsg, relname),
86 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
87 : :
88 : : /* Can't be system table */
89 [ + + ]: 794 : if (IsCatalogRelation(targetrel))
90 [ + - ]: 4 : ereport(ERROR,
91 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : : errmsg(errormsg, relname),
93 : : errdetail("This operation is not supported for system tables.")));
94 : :
95 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
1655 dgustafsson@postgres 96 [ + + ]: 790 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3418 peter_e@gmx.net 97 [ + - ]: 4 : ereport(ERROR,
98 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
99 : : errmsg(errormsg, relname),
100 : : errdetail("This operation is not supported for temporary tables.")));
1655 dgustafsson@postgres 101 [ + + ]: 786 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
102 [ + - ]: 4 : ereport(ERROR,
103 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104 : : errmsg(errormsg, relname),
105 : : errdetail("This operation is not supported for unlogged tables.")));
3418 peter_e@gmx.net 106 : 782 : }
107 : :
108 : : /*
109 : : * Check if schema can be in given publication and throw appropriate error if
110 : : * not.
111 : : */
112 : : static void
1676 akapila@postgresql.o 113 : 163 : check_publication_add_schema(Oid schemaid)
114 : : {
115 : : /* Can't be system namespace */
116 [ + + - + ]: 163 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
117 [ + - ]: 4 : ereport(ERROR,
118 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119 : : errmsg("cannot add schema \"%s\" to publication",
120 : : get_namespace_name(schemaid)),
121 : : errdetail("This operation is not supported for system schemas.")));
122 : :
123 : : /* Can't be temporary namespace */
124 [ - + ]: 159 : if (isAnyTempNamespace(schemaid))
1676 akapila@postgresql.o 125 [ # # ]:UBC 0 : ereport(ERROR,
126 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 : : errmsg("cannot add schema \"%s\" to publication",
128 : : get_namespace_name(schemaid)),
129 : : errdetail("Temporary schemas cannot be replicated.")));
1676 akapila@postgresql.o 130 :CBC 159 : }
131 : :
132 : : /*
133 : : * Returns if relation represented by oid and Form_pg_class entry
134 : : * is publishable.
135 : : *
136 : : * Does same checks as check_publication_add_relation() above except for
137 : : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
138 : : * not throw errors. Here, the additional check is to support ALL SEQUENCES
139 : : * publication.
140 : : *
141 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
142 : : * ie all tables created during initdb. This mainly affects the preinstalled
143 : : * information_schema. IsCatalogRelationOid() only excludes tables with
144 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
145 : : * but really we should get rid of the FirstNormalObjectId test not
146 : : * IsCatalogRelationOid. We can't do so today because we don't want
147 : : * information_schema tables to be considered publishable; but this test
148 : : * is really inadequate for that, since the information_schema could be
149 : : * dropped and reloaded and then it'll be considered publishable. The best
150 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
151 : : * and depend on that instead of OID checks.
152 : : */
153 : : static bool
3418 peter_e@gmx.net 154 : 323123 : is_publishable_class(Oid relid, Form_pg_class reltuple)
155 : : {
2272 peter@eisentraut.org 156 : 331393 : return (reltuple->relkind == RELKIND_RELATION ||
233 akapila@postgresql.o 157 [ + + ]:GNC 8270 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
158 [ + + ]: 7264 : reltuple->relkind == RELKIND_SEQUENCE) &&
2579 tgl@sss.pgh.pa.us 159 [ + + ]:CBC 317230 : !IsCatalogRelationOid(relid) &&
3418 peter_e@gmx.net 160 [ + + + + : 646246 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
161 : : relid >= FirstNormalObjectId;
162 : : }
163 : :
164 : : /*
165 : : * Another variant of is_publishable_class(), taking a Relation.
166 : : */
167 : : bool
1401 akapila@postgresql.o 168 : 297616 : is_publishable_relation(Relation rel)
169 : : {
170 : 297616 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
171 : : }
172 : :
173 : : /*
174 : : * Similar to is_publishable_class() but checks whether the given OID
175 : : * is a publishable "table" or not.
176 : : */
177 : : static bool
58 msawada@postgresql.o 178 :GNC 546 : is_publishable_table(Oid tableoid)
179 : : {
180 : : HeapTuple tuple;
181 : : Form_pg_class relform;
182 : :
183 : 546 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(tableoid));
184 [ + + ]: 546 : if (!HeapTupleIsValid(tuple))
185 : 4 : return false;
186 : :
187 : 542 : relform = (Form_pg_class) GETSTRUCT(tuple);
188 : :
189 : : /*
190 : : * is_publishable_class() includes sequences, so we need to explicitly
191 : : * check the relkind to filter them out here.
192 : : */
193 [ + - + + ]: 1084 : if (relform->relkind != RELKIND_SEQUENCE &&
194 : 542 : is_publishable_class(tableoid, relform))
195 : : {
196 : 538 : ReleaseSysCache(tuple);
197 : 538 : return true;
198 : : }
199 : :
200 : 4 : ReleaseSysCache(tuple);
201 : 4 : return false;
202 : : }
203 : :
204 : : /*
205 : : * SQL-callable variant of the above
206 : : *
207 : : * This returns null when the relation does not exist. This is intended to be
208 : : * used for example in psql to avoid gratuitous errors when there are
209 : : * concurrent catalog changes.
210 : : */
211 : : Datum
1397 akapila@postgresql.o 212 :CBC 4288 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
213 : : {
214 : 4288 : Oid relid = PG_GETARG_OID(0);
215 : : HeapTuple tuple;
216 : : bool result;
217 : :
218 : 4288 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
219 [ - + ]: 4288 : if (!HeapTupleIsValid(tuple))
1397 akapila@postgresql.o 220 :UBC 0 : PG_RETURN_NULL();
1397 akapila@postgresql.o 221 :CBC 4288 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
222 : 4288 : ReleaseSysCache(tuple);
223 : 4288 : PG_RETURN_BOOL(result);
224 : : }
225 : :
226 : : /*
227 : : * Returns true if the ancestor is in the list of published relations.
228 : : * Otherwise, returns false.
229 : : */
230 : : static bool
1158 231 : 61 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
232 : : {
233 : : ListCell *lc;
234 : :
235 [ + - + + : 276 : foreach(lc, table_infos)
+ + ]
236 : : {
237 : 245 : Oid relid = ((published_rel *) lfirst(lc))->relid;
238 : :
239 [ + + ]: 245 : if (relid == ancestor)
240 : 30 : return true;
241 : : }
242 : :
243 : 31 : return false;
244 : : }
245 : :
246 : : /*
247 : : * Filter out the partitions whose parent tables are also present in the list.
248 : : */
249 : : static void
250 : 167 : filter_partitions(List *table_infos)
251 : : {
252 : : ListCell *lc;
253 : :
254 [ + + + + : 395 : foreach(lc, table_infos)
+ + ]
255 : : {
1676 256 : 228 : bool skip = false;
257 : 228 : List *ancestors = NIL;
258 : : ListCell *lc2;
1158 259 : 228 : published_rel *table_info = (published_rel *) lfirst(lc);
260 : :
261 [ + + ]: 228 : if (get_rel_relispartition(table_info->relid))
262 : 61 : ancestors = get_partition_ancestors(table_info->relid);
263 : :
1676 264 [ + + + + : 259 : foreach(lc2, ancestors)
+ + ]
265 : : {
266 : 61 : Oid ancestor = lfirst_oid(lc2);
267 : :
1158 268 [ + + ]: 61 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
269 : : {
1676 270 : 30 : skip = true;
271 : 30 : break;
272 : : }
273 : : }
274 : :
1158 275 [ + + ]: 228 : if (skip)
276 : 30 : table_infos = foreach_delete_current(table_infos, lc);
277 : : }
1676 278 : 167 : }
279 : :
280 : : /*
281 : : * Returns true if any schema is associated with the publication, false if no
282 : : * schema is associated with the publication.
283 : : */
284 : : bool
1634 285 : 215 : is_schema_publication(Oid pubid)
286 : : {
287 : : Relation pubschsrel;
288 : : ScanKeyData scankey;
289 : : SysScanDesc scan;
290 : : HeapTuple tup;
291 : 215 : bool result = false;
292 : :
293 : 215 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
294 : 215 : ScanKeyInit(&scankey,
295 : : Anum_pg_publication_namespace_pnpubid,
296 : : BTEqualStrategyNumber, F_OIDEQ,
297 : : ObjectIdGetDatum(pubid));
298 : :
299 : 215 : scan = systable_beginscan(pubschsrel,
300 : : PublicationNamespacePnnspidPnpubidIndexId,
301 : : true, NULL, 1, &scankey);
302 : 215 : tup = systable_getnext(scan);
303 : 215 : result = HeapTupleIsValid(tup);
304 : :
305 : 215 : systable_endscan(scan);
306 : 215 : table_close(pubschsrel, AccessShareLock);
307 : :
308 : 215 : return result;
309 : : }
310 : :
311 : : /*
312 : : * Returns true if the publication has explicitly included relation (i.e.,
313 : : * not marked as EXCEPT).
314 : : */
315 : : bool
71 akapila@postgresql.o 316 :GNC 49 : is_table_publication(Oid pubid)
317 : : {
318 : : Relation pubrelsrel;
319 : : ScanKeyData scankey;
320 : : SysScanDesc scan;
321 : : HeapTuple tup;
322 : 49 : bool result = false;
323 : :
324 : 49 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
325 : 49 : ScanKeyInit(&scankey,
326 : : Anum_pg_publication_rel_prpubid,
327 : : BTEqualStrategyNumber, F_OIDEQ,
328 : : ObjectIdGetDatum(pubid));
329 : :
330 : 49 : scan = systable_beginscan(pubrelsrel,
331 : : PublicationRelPrpubidIndexId,
332 : : true, NULL, 1, &scankey);
333 : 49 : tup = systable_getnext(scan);
334 [ + + ]: 49 : if (HeapTupleIsValid(tup))
335 : : {
336 : : Form_pg_publication_rel pubrel;
337 : :
338 : 25 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
339 : :
340 : : /*
341 : : * For any publication, pg_publication_rel contains either only EXCEPT
342 : : * entries or only explicitly included tables. Therefore, examining
343 : : * the first tuple is sufficient to determine table inclusion.
344 : : */
345 : 25 : result = !pubrel->prexcept;
346 : : }
347 : :
348 : 49 : systable_endscan(scan);
349 : 49 : table_close(pubrelsrel, AccessShareLock);
350 : :
351 : 49 : return result;
352 : : }
353 : :
354 : : /*
355 : : * Returns true if the relation has column list associated with the
356 : : * publication, false otherwise.
357 : : *
358 : : * If a column list is found, the corresponding bitmap is returned through the
359 : : * cols parameter, if provided. The bitmap is constructed within the given
360 : : * memory context (mcxt).
361 : : */
362 : : bool
569 akapila@postgresql.o 363 :CBC 911 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
364 : : Bitmapset **cols)
365 : : {
366 : : HeapTuple cftuple;
367 : 911 : bool found = false;
368 : :
369 [ + + ]: 911 : if (pub->alltables)
370 : 220 : return false;
371 : :
372 : 691 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
373 : : ObjectIdGetDatum(relid),
374 : : ObjectIdGetDatum(pub->oid));
375 [ + + ]: 691 : if (HeapTupleIsValid(cftuple))
376 : : {
377 : : Datum cfdatum;
378 : : bool isnull;
379 : :
380 : : /* Lookup the column list attribute. */
381 : 633 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
382 : : Anum_pg_publication_rel_prattrs, &isnull);
383 : :
384 : : /* Was a column list found? */
385 [ + + ]: 633 : if (!isnull)
386 : : {
387 : : /* Build the column list bitmap in the given memory context. */
388 [ + + ]: 191 : if (cols)
389 : 188 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
390 : :
391 : 191 : found = true;
392 : : }
393 : :
394 : 633 : ReleaseSysCache(cftuple);
395 : : }
396 : :
397 : 691 : return found;
398 : : }
399 : :
400 : : /*
401 : : * Gets the relations based on the publication partition option for a specified
402 : : * relation.
403 : : */
404 : : List *
1711 405 : 2497 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
406 : : Oid relid)
407 : : {
408 [ + + + + ]: 2497 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
409 : : pub_partopt != PUBLICATION_PART_ROOT)
410 : 701 : {
411 : 701 : List *all_parts = find_all_inheritors(relid, NoLock,
412 : : NULL);
413 : :
414 [ + + ]: 701 : if (pub_partopt == PUBLICATION_PART_ALL)
415 : 682 : result = list_concat(result, all_parts);
416 [ + - ]: 19 : else if (pub_partopt == PUBLICATION_PART_LEAF)
417 : : {
418 : : ListCell *lc;
419 : :
420 [ + - + + : 69 : foreach(lc, all_parts)
+ + ]
421 : : {
422 : 50 : Oid partOid = lfirst_oid(lc);
423 : :
424 [ + + ]: 50 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
425 : 30 : result = lappend_oid(result, partOid);
426 : : }
427 : : }
428 : : else
1711 akapila@postgresql.o 429 :UBC 0 : Assert(false);
430 : : }
431 : : else
1711 akapila@postgresql.o 432 :CBC 1796 : result = lappend_oid(result, relid);
433 : :
434 : 2497 : return result;
435 : : }
436 : :
437 : : /*
438 : : * Returns the relid of the topmost ancestor that is published via this
439 : : * publication if any and set its ancestor level to ancestor_level,
440 : : * otherwise returns InvalidOid.
441 : : *
442 : : * The ancestor_level value allows us to compare the results for multiple
443 : : * publications, and decide which value is higher up.
444 : : *
445 : : * Note that the list of ancestors should be ordered such that the topmost
446 : : * ancestor is at the end of the list.
447 : : */
448 : : Oid
1536 tomas.vondra@postgre 449 : 426 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
450 : : {
451 : : ListCell *lc;
1558 akapila@postgresql.o 452 : 426 : Oid topmost_relid = InvalidOid;
1536 tomas.vondra@postgre 453 : 426 : int level = 0;
454 : :
455 : : /*
456 : : * Find the "topmost" ancestor that is in this publication.
457 : : */
1558 akapila@postgresql.o 458 [ + - + + : 862 : foreach(lc, ancestors)
+ + ]
459 : : {
460 : 436 : Oid ancestor = lfirst_oid(lc);
87 akapila@postgresql.o 461 :GNC 436 : List *apubids = GetRelationIncludedPublications(ancestor);
1558 akapila@postgresql.o 462 :CBC 436 : List *aschemaPubids = NIL;
463 : :
1536 tomas.vondra@postgre 464 : 436 : level++;
465 : :
1558 akapila@postgresql.o 466 [ + + ]: 436 : if (list_member_oid(apubids, puboid))
467 : : {
468 : 223 : topmost_relid = ancestor;
469 : :
1536 tomas.vondra@postgre 470 [ + + ]: 223 : if (ancestor_level)
471 : 43 : *ancestor_level = level;
472 : : }
473 : : else
474 : : {
1514 475 : 213 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
1558 akapila@postgresql.o 476 [ + + ]: 213 : if (list_member_oid(aschemaPubids, puboid))
477 : : {
478 : 13 : topmost_relid = ancestor;
479 : :
1536 tomas.vondra@postgre 480 [ + + ]: 13 : if (ancestor_level)
481 : 5 : *ancestor_level = level;
482 : : }
483 : : }
484 : :
1558 akapila@postgresql.o 485 : 436 : list_free(apubids);
486 : 436 : list_free(aschemaPubids);
487 : : }
488 : :
489 : 426 : return topmost_relid;
490 : : }
491 : :
492 : : /*
493 : : * attnumstoint2vector
494 : : * Convert a Bitmapset of AttrNumbers into an int2vector.
495 : : *
496 : : * AttrNumber numbers are 0-based, i.e., not offset by
497 : : * FirstLowInvalidHeapAttributeNumber.
498 : : */
499 : : static int2vector *
653 drowley@postgresql.o 500 : 212 : attnumstoint2vector(Bitmapset *attrs)
501 : : {
502 : : int2vector *result;
503 : 212 : int n = bms_num_members(attrs);
504 : 212 : int i = -1;
505 : 212 : int j = 0;
506 : :
507 : 212 : result = buildint2vector(NULL, n);
508 : :
509 [ + + ]: 577 : while ((i = bms_next_member(attrs, i)) >= 0)
510 : : {
511 [ - + ]: 365 : Assert(i <= PG_INT16_MAX);
512 : :
513 : 365 : result->values[j++] = (int16) i;
514 : : }
515 : :
516 : 212 : return result;
517 : : }
518 : :
519 : : /*
520 : : * Insert new publication / relation mapping.
521 : : */
522 : : ObjectAddress
1558 akapila@postgresql.o 523 : 829 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
524 : : bool if_not_exists, AlterPublicationStmt *alter_stmt)
525 : : {
526 : : Relation rel;
527 : : HeapTuple tup;
528 : : Datum values[Natts_pg_publication_rel];
529 : : bool nulls[Natts_pg_publication_rel];
530 : 829 : Relation targetrel = pri->relation;
531 : 829 : Oid relid = RelationGetRelid(targetrel);
532 : : Oid pubreloid;
533 : : Bitmapset *attnums;
3418 peter_e@gmx.net 534 : 829 : Publication *pub = GetPublication(pubid);
535 : : ObjectAddress myself,
536 : : referenced;
1711 akapila@postgresql.o 537 : 829 : List *relids = NIL;
538 : : int i;
539 : : bool inval_except_table;
540 : :
2686 andres@anarazel.de 541 : 829 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
542 : :
543 : : /*
544 : : * Check for duplicates. Note that this does not really prevent
545 : : * duplicates, it's here just to provide nicer error message in common
546 : : * case. The real protection is the unique key on the catalog.
547 : : */
3418 peter_e@gmx.net 548 [ + + ]: 829 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
549 : : ObjectIdGetDatum(pubid)))
550 : : {
2686 andres@anarazel.de 551 : 22 : table_close(rel, RowExclusiveLock);
552 : :
3418 peter_e@gmx.net 553 [ + + ]: 22 : if (if_not_exists)
554 : 18 : return InvalidObjectAddress;
555 : :
556 [ + - ]: 4 : ereport(ERROR,
557 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
558 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
559 : : RelationGetRelationName(targetrel), pub->name)));
560 : : }
561 : :
87 akapila@postgresql.o 562 :GNC 807 : check_publication_add_relation(pri);
563 : :
564 : : /* Validate and translate column names into a Bitmapset of attnums. */
653 drowley@postgresql.o 565 :CBC 782 : attnums = pub_collist_validate(pri->relation, pri->columns);
566 : :
567 : : /* Form a tuple. */
3418 peter_e@gmx.net 568 : 766 : memset(values, 0, sizeof(values));
569 : 766 : memset(nulls, false, sizeof(nulls));
570 : :
1612 alvherre@alvh.no-ip. 571 : 766 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
572 : : Anum_pg_publication_rel_oid);
573 : 766 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
3418 peter_e@gmx.net 574 : 766 : values[Anum_pg_publication_rel_prpubid - 1] =
575 : 766 : ObjectIdGetDatum(pubid);
576 : 766 : values[Anum_pg_publication_rel_prrelid - 1] =
577 : 766 : ObjectIdGetDatum(relid);
87 akapila@postgresql.o 578 :GNC 766 : values[Anum_pg_publication_rel_prexcept - 1] =
579 : 766 : BoolGetDatum(pri->except);
580 : :
581 : : /* Add qualifications, if available */
1558 akapila@postgresql.o 582 [ + + ]:CBC 766 : if (pri->whereClause != NULL)
583 : 219 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
584 : : else
585 : 547 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
586 : :
587 : : /* Add column list, if available */
1526 tomas.vondra@postgre 588 [ + + ]: 766 : if (pri->columns)
653 drowley@postgresql.o 589 : 212 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
590 : : else
1526 tomas.vondra@postgre 591 : 554 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
592 : :
3418 peter_e@gmx.net 593 : 766 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
594 : :
595 : : /* Insert tuple into catalog. */
2748 andres@anarazel.de 596 : 766 : CatalogTupleInsert(rel, tup);
3418 peter_e@gmx.net 597 : 766 : heap_freetuple(tup);
598 : :
599 : : /* Register dependencies as needed */
1612 alvherre@alvh.no-ip. 600 : 766 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
601 : :
602 : : /* Add dependency on the publication */
3418 peter_e@gmx.net 603 : 766 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
604 : 766 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
605 : :
606 : : /* Add dependency on the relation */
607 : 766 : ObjectAddressSet(referenced, RelationRelationId, relid);
608 : 766 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
609 : :
610 : : /* Add dependency on the objects mentioned in the qualifications */
1558 akapila@postgresql.o 611 [ + + ]: 766 : if (pri->whereClause)
612 : 219 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
613 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
614 : : false);
615 : :
616 : : /* Add dependency on the columns, if any are listed */
653 drowley@postgresql.o 617 : 766 : i = -1;
618 [ + + ]: 1131 : while ((i = bms_next_member(attnums, i)) >= 0)
619 : : {
620 : 365 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
1526 tomas.vondra@postgre 621 : 365 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
622 : : }
623 : :
624 : : /* Close the table. */
2686 andres@anarazel.de 625 : 766 : table_close(rel, RowExclusiveLock);
626 : :
627 : : /*
628 : : * Determine whether EXCEPT tables require explicit relcache invalidation.
629 : : *
630 : : * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
631 : : * here, as CreatePublication() function invalidates all relations as part
632 : : * of defining a FOR ALL TABLES publication.
633 : : *
634 : : * For ALTER PUBLICATION, invalidation is needed only when adding an
635 : : * EXCEPT table to a publication already marked as ALL TABLES. For
636 : : * publications that were originally empty or defined as ALL SEQUENCES and
637 : : * are being converted to ALL TABLES, invalidation is skipped here, as
638 : : * AlterPublicationAllFlags() function invalidates all relations while
639 : : * marking the publication as ALL TABLES publication.
640 : : */
71 akapila@postgresql.o 641 [ + + + + ]:GNC 775 : inval_except_table = (alter_stmt != NULL) && pub->alltables &&
642 [ + - + - ]: 9 : (alter_stmt->for_all_tables && pri->except);
643 : :
644 [ + + + + ]: 766 : if (!pri->except || inval_except_table)
645 : : {
646 : : /*
647 : : * Invalidate relcache so that publication info is rebuilt.
648 : : *
649 : : * For the partitioned tables, we must invalidate all partitions
650 : : * contained in the respective partition hierarchies, not just the one
651 : : * explicitly mentioned in the publication. This is required because
652 : : * we implicitly publish the child tables when the parent table is
653 : : * published.
654 : : */
87 655 : 698 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
656 : : relid);
657 : :
658 : 698 : InvalidatePublicationRels(relids);
659 : : }
660 : :
3418 peter_e@gmx.net 661 :CBC 766 : return myself;
662 : : }
663 : :
664 : : /*
665 : : * pub_collist_validate
666 : : * Process and validate the 'columns' list and ensure the columns are all
667 : : * valid to use for a publication. Checks for and raises an ERROR for
668 : : * any unknown columns, system columns, duplicate columns, or virtual
669 : : * generated columns.
670 : : *
671 : : * Looks up each column's attnum and returns a 0-based Bitmapset of the
672 : : * corresponding attnums.
673 : : */
674 : : Bitmapset *
653 drowley@postgresql.o 675 : 1072 : pub_collist_validate(Relation targetrel, List *columns)
676 : : {
1526 tomas.vondra@postgre 677 : 1072 : Bitmapset *set = NULL;
678 : : ListCell *lc;
477 peter@eisentraut.org 679 : 1072 : TupleDesc tupdesc = RelationGetDescr(targetrel);
680 : :
1526 tomas.vondra@postgre 681 [ + + + + : 1617 : foreach(lc, columns)
+ + ]
682 : : {
683 : 569 : char *colname = strVal(lfirst(lc));
684 : 569 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
685 : :
686 [ + + ]: 569 : if (attnum == InvalidAttrNumber)
687 [ + - ]: 4 : ereport(ERROR,
688 : : errcode(ERRCODE_UNDEFINED_COLUMN),
689 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
690 : : colname, RelationGetRelationName(targetrel)));
691 : :
692 [ + + ]: 565 : if (!AttrNumberIsForUserDefinedAttr(attnum))
693 [ + - ]: 8 : ereport(ERROR,
694 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
695 : : errmsg("cannot use system column \"%s\" in publication column list",
696 : : colname));
697 : :
477 peter@eisentraut.org 698 [ + + ]: 557 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
699 [ + - ]: 4 : ereport(ERROR,
700 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
701 : : errmsg("cannot use virtual generated column \"%s\" in publication column list",
702 : : colname));
703 : :
1526 tomas.vondra@postgre 704 [ + + ]: 553 : if (bms_is_member(attnum, set))
705 [ + - ]: 8 : ereport(ERROR,
706 : : errcode(ERRCODE_DUPLICATE_OBJECT),
707 : : errmsg("duplicate column \"%s\" in publication column list",
708 : : colname));
709 : :
710 : 545 : set = bms_add_member(set, attnum);
711 : : }
712 : :
653 drowley@postgresql.o 713 : 1048 : return set;
714 : : }
715 : :
716 : : /*
717 : : * Transform a column list (represented by an array Datum) to a bitmapset.
718 : : *
719 : : * If columns isn't NULL, add the column numbers to that set.
720 : : *
721 : : * If mcxt isn't NULL, build the bitmapset in that context.
722 : : */
723 : : Bitmapset *
1526 tomas.vondra@postgre 724 : 278 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
725 : : {
653 drowley@postgresql.o 726 : 278 : Bitmapset *result = columns;
727 : : ArrayType *arr;
728 : : int nelems;
729 : : int16 *elems;
1479 tgl@sss.pgh.pa.us 730 : 278 : MemoryContext oldcxt = NULL;
731 : :
1526 tomas.vondra@postgre 732 : 278 : arr = DatumGetArrayTypeP(pubcols);
733 : 278 : nelems = ARR_DIMS(arr)[0];
734 [ - + ]: 278 : elems = (int16 *) ARR_DATA_PTR(arr);
735 : :
736 : : /* If a memory context was specified, switch to it. */
737 [ + + ]: 278 : if (mcxt)
738 : 39 : oldcxt = MemoryContextSwitchTo(mcxt);
739 : :
740 [ + + ]: 764 : for (int i = 0; i < nelems; i++)
741 : 486 : result = bms_add_member(result, elems[i]);
742 : :
743 [ + + ]: 278 : if (mcxt)
744 : 39 : MemoryContextSwitchTo(oldcxt);
745 : :
746 : 278 : return result;
747 : : }
748 : :
749 : : /*
750 : : * Returns a bitmap representing the columns of the specified table.
751 : : *
752 : : * Generated columns are included if include_gencols_type is
753 : : * PUBLISH_GENCOLS_STORED.
754 : : */
755 : : Bitmapset *
492 akapila@postgresql.o 756 : 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
757 : : {
569 758 : 9 : Bitmapset *result = NULL;
759 : 9 : TupleDesc desc = RelationGetDescr(relation);
760 : :
761 [ + + ]: 30 : for (int i = 0; i < desc->natts; i++)
762 : : {
763 : 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
764 : :
492 765 [ + + ]: 21 : if (att->attisdropped)
569 766 : 1 : continue;
767 : :
492 768 [ + + ]: 20 : if (att->attgenerated)
769 : : {
770 : : /* We only support replication of STORED generated cols. */
771 [ + + ]: 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
772 : 1 : continue;
773 : :
774 : : /* User hasn't requested to replicate STORED generated cols. */
775 [ + - ]: 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
776 : 1 : continue;
777 : : }
778 : :
569 779 : 18 : result = bms_add_member(result, att->attnum);
780 : : }
781 : :
782 : 9 : return result;
783 : : }
784 : :
785 : : /*
786 : : * Insert new publication / schema mapping.
787 : : */
788 : : ObjectAddress
1514 tomas.vondra@postgre 789 : 175 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
790 : : {
791 : : Relation rel;
792 : : HeapTuple tup;
793 : : Datum values[Natts_pg_publication_namespace];
794 : : bool nulls[Natts_pg_publication_namespace];
795 : : Oid psschid;
1676 akapila@postgresql.o 796 : 175 : Publication *pub = GetPublication(pubid);
797 : 175 : List *schemaRels = NIL;
798 : : ObjectAddress myself,
799 : : referenced;
800 : :
801 : 175 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
802 : :
803 : : /*
804 : : * Check for duplicates. Note that this does not really prevent
805 : : * duplicates, it's here just to provide nicer error message in common
806 : : * case. The real protection is the unique key on the catalog.
807 : : */
1514 tomas.vondra@postgre 808 [ + + ]: 175 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
809 : : ObjectIdGetDatum(schemaid),
810 : : ObjectIdGetDatum(pubid)))
811 : : {
1676 akapila@postgresql.o 812 : 12 : table_close(rel, RowExclusiveLock);
813 : :
814 [ + + ]: 12 : if (if_not_exists)
815 : 8 : return InvalidObjectAddress;
816 : :
817 [ + - ]: 4 : ereport(ERROR,
818 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
819 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
820 : : get_namespace_name(schemaid), pub->name)));
821 : : }
822 : :
823 : 163 : check_publication_add_schema(schemaid);
824 : :
825 : : /* Form a tuple */
826 : 159 : memset(values, 0, sizeof(values));
827 : 159 : memset(nulls, false, sizeof(nulls));
828 : :
829 : 159 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
830 : : Anum_pg_publication_namespace_oid);
831 : 159 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
832 : 159 : values[Anum_pg_publication_namespace_pnpubid - 1] =
833 : 159 : ObjectIdGetDatum(pubid);
834 : 159 : values[Anum_pg_publication_namespace_pnnspid - 1] =
835 : 159 : ObjectIdGetDatum(schemaid);
836 : :
837 : 159 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
838 : :
839 : : /* Insert tuple into catalog */
840 : 159 : CatalogTupleInsert(rel, tup);
841 : 159 : heap_freetuple(tup);
842 : :
843 : 159 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
844 : :
845 : : /* Add dependency on the publication */
846 : 159 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
847 : 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
848 : :
849 : : /* Add dependency on the schema */
850 : 159 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
851 : 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
852 : :
853 : : /* Close the table */
854 : 159 : table_close(rel, RowExclusiveLock);
855 : :
856 : : /*
857 : : * Invalidate relcache so that publication info is rebuilt. See
858 : : * publication_add_relation for why we need to consider all the
859 : : * partitions.
860 : : */
1514 tomas.vondra@postgre 861 : 159 : schemaRels = GetSchemaPublicationRelations(schemaid,
862 : : PUBLICATION_PART_ALL);
1676 akapila@postgresql.o 863 : 159 : InvalidatePublicationRels(schemaRels);
864 : :
865 : 159 : return myself;
866 : : }
867 : :
868 : : /*
869 : : * Internal function to get the list of publication oids for a relation.
870 : : *
871 : : * If except_flag is true, returns the list of publication that specified the
872 : : * relation in the EXCEPT clause; otherwise, returns the list of publications
873 : : * in which relation is included.
874 : : */
875 : : static List *
87 akapila@postgresql.o 876 :GNC 16673 : get_relation_publications(Oid relid, bool except_flag)
877 : : {
3300 bruce@momjian.us 878 :CBC 16673 : List *result = NIL;
879 : : CatCList *pubrellist;
880 : :
881 : : /* Find all publications associated with the relation. */
3418 peter_e@gmx.net 882 : 16673 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
883 : : ObjectIdGetDatum(relid));
87 akapila@postgresql.o 884 [ + + ]:GNC 18285 : for (int i = 0; i < pubrellist->n_members; i++)
885 : : {
3418 peter_e@gmx.net 886 :CBC 1612 : HeapTuple tup = &pubrellist->members[i]->tuple;
87 akapila@postgresql.o 887 :GNC 1612 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
888 : 1612 : Oid pubid = pubrel->prpubid;
889 : :
890 [ + + ]: 1612 : if (pubrel->prexcept == except_flag)
891 : 1140 : result = lappend_oid(result, pubid);
892 : : }
893 : :
3418 peter_e@gmx.net 894 :CBC 16673 : ReleaseSysCacheList(pubrellist);
895 : :
896 : 16673 : return result;
897 : : }
898 : :
899 : : /*
900 : : * Gets list of publication oids for a relation.
901 : : */
902 : : List *
87 akapila@postgresql.o 903 :GNC 8971 : GetRelationIncludedPublications(Oid relid)
904 : : {
905 : 8971 : return get_relation_publications(relid, false);
906 : : }
907 : :
908 : : /*
909 : : * Gets list of publication oids which has relation in the EXCEPT clause.
910 : : */
911 : : List *
912 : 7702 : GetRelationExcludedPublications(Oid relid)
913 : : {
914 : 7702 : return get_relation_publications(relid, true);
915 : : }
916 : :
917 : : /*
918 : : * Internal function to get the list of relation oids for a publication.
919 : : *
920 : : * If except_flag is true, returns the list of relations specified in the
921 : : * EXCEPT clause of the publication; otherwise, returns the list of relations
922 : : * included in the publication.
923 : : */
924 : : static List *
925 : 680 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
926 : : bool except_flag)
927 : : {
928 : : List *result;
929 : : Relation pubrelsrel;
930 : : ScanKeyData scankey;
931 : : SysScanDesc scan;
932 : : HeapTuple tup;
933 : :
934 : : /* Find all relations associated with the publication. */
2686 andres@anarazel.de 935 :CBC 680 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
936 : :
3418 peter_e@gmx.net 937 : 680 : ScanKeyInit(&scankey,
938 : : Anum_pg_publication_rel_prpubid,
939 : : BTEqualStrategyNumber, F_OIDEQ,
940 : : ObjectIdGetDatum(pubid));
941 : :
1599 alvherre@alvh.no-ip. 942 : 680 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
943 : : true, NULL, 1, &scankey);
944 : :
3418 peter_e@gmx.net 945 : 680 : result = NIL;
946 [ + + ]: 2000 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
947 : : {
948 : : Form_pg_publication_rel pubrel;
949 : :
950 : 640 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
951 : :
87 akapila@postgresql.o 952 [ - + ]:GNC 640 : if (except_flag == pubrel->prexcept)
953 : 640 : result = GetPubPartitionOptionRelations(result, pub_partopt,
954 : : pubrel->prrelid);
955 : : }
956 : :
3418 peter_e@gmx.net 957 :CBC 680 : systable_endscan(scan);
2686 andres@anarazel.de 958 : 680 : table_close(pubrelsrel, AccessShareLock);
959 : :
960 : : /* Now sort and de-duplicate the result list */
1634 akapila@postgresql.o 961 : 680 : list_sort(result, list_oid_cmp);
962 : 680 : list_deduplicate_oid(result);
963 : :
3418 peter_e@gmx.net 964 : 680 : return result;
965 : : }
966 : :
967 : : /*
968 : : * Gets list of relation oids that are associated with a publication.
969 : : *
970 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
971 : : * should use GetAllPublicationRelations().
972 : : */
973 : : List *
87 akapila@postgresql.o 974 :GNC 610 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
975 : : {
976 [ - + ]: 610 : Assert(!GetPublication(pubid)->alltables);
977 : :
978 : 610 : return get_publication_relations(pubid, pub_partopt, false);
979 : : }
980 : :
981 : : /*
982 : : * Gets list of table oids that were specified in the EXCEPT clause for a
983 : : * publication.
984 : : *
985 : : * This should only be used FOR ALL TABLES publications.
986 : : */
987 : : List *
988 : 70 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
989 : : {
990 [ - + ]: 70 : Assert(GetPublication(pubid)->alltables);
991 : :
992 : 70 : return get_publication_relations(pubid, pub_partopt, true);
993 : : }
994 : :
995 : : /*
996 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
997 : : */
998 : : List *
3418 peter_e@gmx.net 999 :CBC 6005 : GetAllTablesPublications(void)
1000 : : {
1001 : : List *result;
1002 : : Relation rel;
1003 : : ScanKeyData scankey;
1004 : : SysScanDesc scan;
1005 : : HeapTuple tup;
1006 : :
1007 : : /* Find all publications that are marked as for all tables. */
2686 andres@anarazel.de 1008 : 6005 : rel = table_open(PublicationRelationId, AccessShareLock);
1009 : :
3418 peter_e@gmx.net 1010 : 6005 : ScanKeyInit(&scankey,
1011 : : Anum_pg_publication_puballtables,
1012 : : BTEqualStrategyNumber, F_BOOLEQ,
1013 : : BoolGetDatum(true));
1014 : :
1015 : 6005 : scan = systable_beginscan(rel, InvalidOid, false,
1016 : : NULL, 1, &scankey);
1017 : :
1018 : 6005 : result = NIL;
1019 [ + + ]: 6135 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1020 : : {
2565 tgl@sss.pgh.pa.us 1021 : 130 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
1022 : :
2748 andres@anarazel.de 1023 : 130 : result = lappend_oid(result, oid);
1024 : : }
1025 : :
3418 peter_e@gmx.net 1026 : 6005 : systable_endscan(scan);
2686 andres@anarazel.de 1027 : 6005 : table_close(rel, AccessShareLock);
1028 : :
3418 peter_e@gmx.net 1029 : 6005 : return result;
1030 : : }
1031 : :
1032 : : /*
1033 : : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
1034 : : * publication.
1035 : : *
1036 : : * If the publication publishes partition changes via their respective root
1037 : : * partitioned tables, we must exclude partitions in favor of including the
1038 : : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1039 : : * publication.
1040 : : *
1041 : : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1042 : : * in the EXCEPT clause.
1043 : : */
1044 : : List *
87 akapila@postgresql.o 1045 :GNC 55 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1046 : : {
1047 : : Relation classRel;
1048 : : ScanKeyData key[1];
1049 : : TableScanDesc scan;
1050 : : HeapTuple tuple;
3418 peter_e@gmx.net 1051 :CBC 55 : List *result = NIL;
87 akapila@postgresql.o 1052 :GNC 55 : List *exceptlist = NIL;
1053 : :
233 1054 [ + + - + ]: 55 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1055 : :
1056 : : /* EXCEPT filtering applies only to relations, not sequences */
87 1057 [ + + ]: 55 : if (relkind == RELKIND_RELATION)
1058 : 49 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1059 : 49 : PUBLICATION_PART_ROOT :
1060 : : PUBLICATION_PART_LEAF);
1061 : :
2686 andres@anarazel.de 1062 :CBC 55 : classRel = table_open(RelationRelationId, AccessShareLock);
1063 : :
3418 peter_e@gmx.net 1064 : 55 : ScanKeyInit(&key[0],
1065 : : Anum_pg_class_relkind,
1066 : : BTEqualStrategyNumber, F_CHAREQ,
1067 : : CharGetDatum(relkind));
1068 : :
2637 andres@anarazel.de 1069 : 55 : scan = table_beginscan_catalog(classRel, 1, key);
1070 : :
3418 peter_e@gmx.net 1071 [ + + ]: 3769 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1072 : : {
3300 bruce@momjian.us 1073 : 3714 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
2748 andres@anarazel.de 1074 : 3714 : Oid relid = relForm->oid;
1075 : :
2243 peter@eisentraut.org 1076 [ + + ]: 3714 : if (is_publishable_class(relid, relForm) &&
87 akapila@postgresql.o 1077 [ + + + + ]:GNC 135 : !(relForm->relispartition && pubviaroot) &&
1078 [ + + ]: 112 : !list_member_oid(exceptlist, relid))
3418 peter_e@gmx.net 1079 :CBC 104 : result = lappend_oid(result, relid);
1080 : : }
1081 : :
2637 andres@anarazel.de 1082 : 55 : table_endscan(scan);
1083 : :
2243 peter@eisentraut.org 1084 [ + + ]: 55 : if (pubviaroot)
1085 : : {
1086 : 4 : ScanKeyInit(&key[0],
1087 : : Anum_pg_class_relkind,
1088 : : BTEqualStrategyNumber, F_CHAREQ,
1089 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
1090 : :
1091 : 4 : scan = table_beginscan_catalog(classRel, 1, key);
1092 : :
1093 [ + + ]: 21 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1094 : : {
1095 : 17 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1096 : 17 : Oid relid = relForm->oid;
1097 : :
1098 [ + - ]: 17 : if (is_publishable_class(relid, relForm) &&
87 akapila@postgresql.o 1099 [ + + ]:GNC 17 : !relForm->relispartition &&
1100 [ + + ]: 13 : !list_member_oid(exceptlist, relid))
2243 peter@eisentraut.org 1101 :CBC 12 : result = lappend_oid(result, relid);
1102 : : }
1103 : :
1104 : 4 : table_endscan(scan);
1105 : : }
1106 : :
2240 1107 : 55 : table_close(classRel, AccessShareLock);
3418 peter_e@gmx.net 1108 : 55 : return result;
1109 : : }
1110 : :
1111 : : /*
1112 : : * Gets the list of schema oids for a publication.
1113 : : *
1114 : : * This should only be used FOR TABLES IN SCHEMA publications.
1115 : : */
1116 : : List *
1514 tomas.vondra@postgre 1117 : 576 : GetPublicationSchemas(Oid pubid)
1118 : : {
1676 akapila@postgresql.o 1119 : 576 : List *result = NIL;
1120 : : Relation pubschsrel;
1121 : : ScanKeyData scankey;
1122 : : SysScanDesc scan;
1123 : : HeapTuple tup;
1124 : :
1125 : : /* Find all schemas associated with the publication */
1126 : 576 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1127 : :
1514 tomas.vondra@postgre 1128 : 576 : ScanKeyInit(&scankey,
1129 : : Anum_pg_publication_namespace_pnpubid,
1130 : : BTEqualStrategyNumber, F_OIDEQ,
1131 : : ObjectIdGetDatum(pubid));
1132 : :
1676 akapila@postgresql.o 1133 : 576 : scan = systable_beginscan(pubschsrel,
1134 : : PublicationNamespacePnnspidPnpubidIndexId,
1135 : : true, NULL, 1, &scankey);
1136 [ + + ]: 616 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1137 : : {
1138 : : Form_pg_publication_namespace pubsch;
1139 : :
1140 : 40 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1141 : :
1142 : 40 : result = lappend_oid(result, pubsch->pnnspid);
1143 : : }
1144 : :
1145 : 576 : systable_endscan(scan);
1146 : 576 : table_close(pubschsrel, AccessShareLock);
1147 : :
1148 : 576 : return result;
1149 : : }
1150 : :
1151 : : /*
1152 : : * Gets the list of publication oids associated with a specified schema.
1153 : : */
1154 : : List *
1514 tomas.vondra@postgre 1155 : 8639 : GetSchemaPublications(Oid schemaid)
1156 : : {
1676 akapila@postgresql.o 1157 : 8639 : List *result = NIL;
1158 : : CatCList *pubschlist;
1159 : : int i;
1160 : :
1161 : : /* Find all publications associated with the schema */
1162 : 8639 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1163 : : ObjectIdGetDatum(schemaid));
1164 [ + + ]: 8718 : for (i = 0; i < pubschlist->n_members; i++)
1165 : : {
1166 : 79 : HeapTuple tup = &pubschlist->members[i]->tuple;
1167 : 79 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1168 : :
1169 : 79 : result = lappend_oid(result, pubid);
1170 : : }
1171 : :
1172 : 8639 : ReleaseSysCacheList(pubschlist);
1173 : :
1174 : 8639 : return result;
1175 : : }
1176 : :
1177 : : /*
1178 : : * Get the list of publishable relation oids for a specified schema.
1179 : : */
1180 : : List *
1514 tomas.vondra@postgre 1181 : 310 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1182 : : {
1183 : : Relation classRel;
1184 : : ScanKeyData key[1];
1185 : : TableScanDesc scan;
1186 : : HeapTuple tuple;
1676 akapila@postgresql.o 1187 : 310 : List *result = NIL;
1188 : :
1189 [ - + ]: 310 : Assert(OidIsValid(schemaid));
1190 : :
1191 : 310 : classRel = table_open(RelationRelationId, AccessShareLock);
1192 : :
1193 : 310 : ScanKeyInit(&key[0],
1194 : : Anum_pg_class_relnamespace,
1195 : : BTEqualStrategyNumber, F_OIDEQ,
1196 : : ObjectIdGetDatum(schemaid));
1197 : :
1198 : : /* get all the relations present in the specified schema */
1199 : 310 : scan = table_beginscan_catalog(classRel, 1, key);
1200 [ + + ]: 17256 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1201 : : {
1202 : 16946 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1203 : 16946 : Oid relid = relForm->oid;
1204 : : char relkind;
1205 : :
1206 [ + + ]: 16946 : if (!is_publishable_class(relid, relForm))
1207 : 5839 : continue;
1208 : :
1209 : 11107 : relkind = get_rel_relkind(relid);
1514 tomas.vondra@postgre 1210 [ + + ]: 11107 : if (relkind == RELKIND_RELATION)
1211 : 9554 : result = lappend_oid(result, relid);
1212 [ + + ]: 1553 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1213 : : {
1676 akapila@postgresql.o 1214 : 495 : List *partitionrels = NIL;
1215 : :
1216 : : /*
1217 : : * It is quite possible that some of the partitions are in a
1218 : : * different schema than the parent table, so we need to get such
1219 : : * partitions separately.
1220 : : */
1221 : 495 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1222 : : pub_partopt,
1223 : : relForm->oid);
1224 : 495 : result = list_concat_unique_oid(result, partitionrels);
1225 : : }
1226 : : }
1227 : :
1228 : 310 : table_endscan(scan);
1229 : 310 : table_close(classRel, AccessShareLock);
1230 : 310 : return result;
1231 : : }
1232 : :
1233 : : /*
1234 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1235 : : * publication.
1236 : : */
1237 : : List *
1514 tomas.vondra@postgre 1238 : 283 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1239 : : {
1676 akapila@postgresql.o 1240 : 283 : List *result = NIL;
1514 tomas.vondra@postgre 1241 : 283 : List *pubschemalist = GetPublicationSchemas(pubid);
1242 : : ListCell *cell;
1243 : :
1676 akapila@postgresql.o 1244 [ + + + + : 303 : foreach(cell, pubschemalist)
+ + ]
1245 : : {
1246 : 20 : Oid schemaid = lfirst_oid(cell);
1247 : 20 : List *schemaRels = NIL;
1248 : :
1514 tomas.vondra@postgre 1249 : 20 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1676 akapila@postgresql.o 1250 : 20 : result = list_concat(result, schemaRels);
1251 : : }
1252 : :
1253 : 283 : return result;
1254 : : }
1255 : :
1256 : : /*
1257 : : * Get publication using oid
1258 : : *
1259 : : * The Publication struct and its data are palloc'ed here.
1260 : : */
1261 : : Publication *
3418 peter_e@gmx.net 1262 : 4840 : GetPublication(Oid pubid)
1263 : : {
1264 : : HeapTuple tup;
1265 : : Publication *pub;
1266 : : Form_pg_publication pubform;
1267 : :
1268 : 4840 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1269 [ - + ]: 4840 : if (!HeapTupleIsValid(tup))
3418 peter_e@gmx.net 1270 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1271 : :
3418 peter_e@gmx.net 1272 :CBC 4840 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1273 : :
171 michael@paquier.xyz 1274 :GNC 4840 : pub = palloc_object(Publication);
3418 peter_e@gmx.net 1275 :CBC 4840 : pub->oid = pubid;
1276 : 4840 : pub->name = pstrdup(NameStr(pubform->pubname));
1277 : 4840 : pub->alltables = pubform->puballtables;
233 akapila@postgresql.o 1278 :GNC 4840 : pub->allsequences = pubform->puballsequences;
3418 peter_e@gmx.net 1279 :CBC 4840 : pub->pubactions.pubinsert = pubform->pubinsert;
1280 : 4840 : pub->pubactions.pubupdate = pubform->pubupdate;
1281 : 4840 : pub->pubactions.pubdelete = pubform->pubdelete;
2975 1282 : 4840 : pub->pubactions.pubtruncate = pubform->pubtruncate;
2243 peter@eisentraut.org 1283 : 4840 : pub->pubviaroot = pubform->pubviaroot;
487 akapila@postgresql.o 1284 : 4840 : pub->pubgencols_type = pubform->pubgencols;
1285 : :
3418 peter_e@gmx.net 1286 : 4840 : ReleaseSysCache(tup);
1287 : :
1288 : 4840 : return pub;
1289 : : }
1290 : :
1291 : : /*
1292 : : * Get Publication using name.
1293 : : */
1294 : : Publication *
1295 : 1759 : GetPublicationByName(const char *pubname, bool missing_ok)
1296 : : {
1297 : : Oid oid;
1298 : :
2350 alvherre@alvh.no-ip. 1299 : 1759 : oid = get_publication_oid(pubname, missing_ok);
1300 : :
1301 [ + + ]: 1759 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1302 : : }
1303 : :
1304 : : /*
1305 : : * A helper function for pg_get_publication_tables() to check whether the
1306 : : * table with the given relid is published in the specified publication.
1307 : : *
1308 : : * This function evaluates the effective published OID based on the
1309 : : * publish_via_partition_root setting, rather than just checking catalog entries
1310 : : * (e.g., pg_publication_rel). For instance, when publish_via_partition_root is
1311 : : * false, it returns false for a parent partitioned table and returns true
1312 : : * for its leaf partitions, even if the parent is the one explicitly added
1313 : : * to the publication.
1314 : : *
1315 : : * For performance reasons, this function avoids the overhead of constructing
1316 : : * the complete list of published tables during the evaluation. It can execute
1317 : : * quickly even when the publication contains a large number of relations.
1318 : : *
1319 : : * Note: this leaks memory for the ancestors list into the current memory
1320 : : * context.
1321 : : */
1322 : : static bool
58 msawada@postgresql.o 1323 :GNC 982 : is_table_publishable_in_publication(Oid relid, Publication *pub)
1324 : : {
1325 : : bool relispartition;
1326 : 982 : List *ancestors = NIL;
1327 : :
1328 : : /*
1329 : : * For non-pubviaroot publications, a partitioned table is never the
1330 : : * effective published OID; only its leaf partitions can be.
1331 : : */
1332 [ + + + + ]: 982 : if (!pub->pubviaroot && get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE)
1333 : 86 : return false;
1334 : :
1335 : 896 : relispartition = get_rel_relispartition(relid);
1336 : :
1337 [ + + ]: 896 : if (relispartition)
1338 : 170 : ancestors = get_partition_ancestors(relid);
1339 : :
1340 [ + + ]: 896 : if (pub->alltables)
1341 : : {
1342 : : /*
1343 : : * ALL TABLES with pubviaroot includes only regular tables or top-most
1344 : : * partitioned tables -- never child partitions.
1345 : : */
1346 [ + + + + ]: 200 : if (pub->pubviaroot && relispartition)
1347 : 12 : return false;
1348 : :
1349 : : /*
1350 : : * For ALL TABLES publications, the table is published unless it
1351 : : * appears in the EXCEPT clause. Only the top-most can appear in the
1352 : : * EXCEPT clause, so exclusion must be evaluated at the top-most
1353 : : * ancestor if it has. These publications store only EXCEPT'ed tables
1354 : : * in pg_publication_rel, so checking existence is sufficient.
1355 : : *
1356 : : * Note that this existence check below would incorrectly return true
1357 : : * (published) for partitions when pubviaroot is enabled; however,
1358 : : * that case is already caught and returned false by the above check.
1359 : : */
1360 [ + + ]: 188 : return !SearchSysCacheExists2(PUBLICATIONRELMAP,
1361 : : ObjectIdGetDatum(ancestors
1362 : : ? llast_oid(ancestors) : relid),
1363 : : ObjectIdGetDatum(pub->oid));
1364 : : }
1365 : :
1366 : : /*
1367 : : * Non-ALL-TABLE publication cases.
1368 : : *
1369 : : * A table is published if it (or a containing schema) was explicitly
1370 : : * added, or if it is a partition whose ancestor was added.
1371 : : */
1372 : :
1373 : : /*
1374 : : * If an ancestor is published, the partition's status depends on
1375 : : * publish_via_partition_root value.
1376 : : *
1377 : : * If it's true, the ancestor's relation OID is the effective published
1378 : : * OID, so the partition itself should be excluded (return false).
1379 : : *
1380 : : * If it's false, the partition is covered by its ancestor's presence in
1381 : : * the publication, it should be included (return true).
1382 : : */
1383 [ + + + + ]: 836 : if (relispartition &&
1384 : 140 : OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, NULL)))
1385 : 44 : return !pub->pubviaroot;
1386 : :
1387 : : /*
1388 : : * Check whether the table is explicitly published via pg_publication_rel
1389 : : * or pg_publication_namespace.
1390 : : */
1391 : 652 : return (SearchSysCacheExists2(PUBLICATIONRELMAP,
1392 : : ObjectIdGetDatum(relid),
1393 [ + + + + ]: 1022 : ObjectIdGetDatum(pub->oid)) ||
1394 : 370 : SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1395 : : ObjectIdGetDatum(get_rel_namespace(relid)),
1396 : : ObjectIdGetDatum(pub->oid)));
1397 : : }
1398 : :
1399 : : /*
1400 : : * Helper function to get information of the tables in the given
1401 : : * publication(s).
1402 : : *
1403 : : * If filter_by_relid is true, only the row(s) for target_relid is returned;
1404 : : * if target_relid does not exist or is not part of the publications, zero
1405 : : * rows are returned. If filter_by_relid is false, rows for all tables
1406 : : * within the specified publications are returned and target_relid is
1407 : : * ignored.
1408 : : *
1409 : : * Returns pubid, relid, column list, and row filter for each table.
1410 : : */
1411 : : static Datum
1412 : 1592 : pg_get_publication_tables(FunctionCallInfo fcinfo, ArrayType *pubnames,
1413 : : Oid target_relid, bool filter_by_relid,
1414 : : bool pub_missing_ok)
1415 : : {
1416 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1417 : : FuncCallContext *funcctx;
1158 akapila@postgresql.o 1418 :CBC 1592 : List *table_infos = NIL;
1419 : :
1420 : : /* stuff done only on the first call of the function */
3418 peter_e@gmx.net 1421 [ + + ]: 1592 : if (SRF_IS_FIRSTCALL())
1422 : : {
1423 : : TupleDesc tupdesc;
1424 : : MemoryContext oldcontext;
1425 : : Datum *elems;
1426 : : int nelems,
1427 : : i;
1158 akapila@postgresql.o 1428 : 750 : bool viaroot = false;
1429 : :
1430 : : /* create a function context for cross-call persistence */
3418 peter_e@gmx.net 1431 : 750 : funcctx = SRF_FIRSTCALL_INIT();
1432 : :
1433 : : /*
1434 : : * Preliminary check if the specified table can be published in the
1435 : : * first place. If not, we can return early without checking the given
1436 : : * publications and the table.
1437 : : */
58 msawada@postgresql.o 1438 [ + + + + ]:GNC 750 : if (filter_by_relid && !is_publishable_table(target_relid))
1439 : 8 : SRF_RETURN_DONE(funcctx);
1440 : :
1441 : : /* switch to memory context appropriate for multiple function calls */
3418 peter_e@gmx.net 1442 :CBC 742 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1443 : :
1444 : : /*
1445 : : * Deconstruct the parameter into elements where each element is a
1446 : : * publication name.
1447 : : */
58 msawada@postgresql.o 1448 :GNC 742 : deconstruct_array_builtin(pubnames, TEXTOID, &elems, NULL, &nelems);
1449 : :
1450 : : /* Get Oids of tables from each publication. */
1158 akapila@postgresql.o 1451 [ + + ]:CBC 1982 : for (i = 0; i < nelems; i++)
1452 : : {
1453 : : Publication *pub_elem;
1454 : 1240 : List *pub_elem_tables = NIL;
1455 : : ListCell *lc;
1456 : :
58 msawada@postgresql.o 1457 :GNC 1240 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]),
1458 : : pub_missing_ok);
1459 : :
1460 [ + + ]: 1240 : if (pub_elem == NULL)
1461 : 6 : continue;
1462 : :
1463 [ + + ]: 1234 : if (filter_by_relid)
1464 : : {
1465 : : /* Check if the given table is published for the publication */
1466 [ + + ]: 982 : if (is_table_publishable_in_publication(target_relid, pub_elem))
1467 : : {
1468 : 496 : pub_elem_tables = list_make1_oid(target_relid);
1469 : : }
1470 : : }
1471 : : else
1472 : : {
1473 : : /*
1474 : : * Publications support partitioned tables. If
1475 : : * publish_via_partition_root is false, all changes are
1476 : : * replicated using leaf partition identity and schema, so we
1477 : : * only need those. Otherwise, get the partitioned table
1478 : : * itself.
1479 : : */
1480 [ + + ]: 252 : if (pub_elem->alltables)
1481 : 49 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1482 : : RELKIND_RELATION,
1483 : 49 : pub_elem->pubviaroot);
1484 : : else
1485 : : {
1486 : : List *relids,
1487 : : *schemarelids;
1488 : :
1489 : 203 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1490 : 203 : pub_elem->pubviaroot ?
1491 : 203 : PUBLICATION_PART_ROOT :
1492 : : PUBLICATION_PART_LEAF);
1493 : 203 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1494 : 203 : pub_elem->pubviaroot ?
1495 : 203 : PUBLICATION_PART_ROOT :
1496 : : PUBLICATION_PART_LEAF);
1497 : 203 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1498 : : }
1499 : : }
1500 : :
1501 : : /*
1502 : : * Record the published table and the corresponding publication so
1503 : : * that we can get row filters and column lists later.
1504 : : *
1505 : : * When a table is published by multiple publications, to obtain
1506 : : * all row filters and column lists, the structure related to this
1507 : : * table will be recorded multiple times.
1508 : : */
1158 akapila@postgresql.o 1509 [ + + + + :CBC 2106 : foreach(lc, pub_elem_tables)
+ + ]
1510 : : {
171 michael@paquier.xyz 1511 :GNC 872 : published_rel *table_info = palloc_object(published_rel);
1512 : :
1158 akapila@postgresql.o 1513 :CBC 872 : table_info->relid = lfirst_oid(lc);
1514 : 872 : table_info->pubid = pub_elem->oid;
1515 : 872 : table_infos = lappend(table_infos, table_info);
1516 : : }
1517 : :
1518 : : /* At least one publication is using publish_via_partition_root. */
1519 [ + + ]: 1234 : if (pub_elem->pubviaroot)
1520 : 247 : viaroot = true;
1521 : : }
1522 : :
1523 : : /*
1524 : : * If the publication publishes partition changes via their respective
1525 : : * root partitioned tables, we must exclude partitions in favor of
1526 : : * including the root partitioned tables. Otherwise, the function
1527 : : * could return both the child and parent tables which could cause
1528 : : * data of the child table to be double-published on the subscriber
1529 : : * side.
1530 : : */
1531 [ + + ]: 742 : if (viaroot)
1532 : 167 : filter_partitions(table_infos);
1533 : :
1534 : : /* Construct a tuple descriptor for the result rows. */
1437 michael@paquier.xyz 1535 : 742 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1158 akapila@postgresql.o 1536 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1537 : : OIDOID, -1, 0);
1538 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1539 : : OIDOID, -1, 0);
1540 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1541 : : INT2VECTOROID, -1, 0);
1542 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1543 : : PG_NODE_TREEOID, -1, 0);
1544 : :
75 drowley@postgresql.o 1545 :GNC 742 : TupleDescFinalize(tupdesc);
1472 akapila@postgresql.o 1546 :CBC 742 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
548 peter@eisentraut.org 1547 : 742 : funcctx->user_fctx = table_infos;
1548 : :
3418 peter_e@gmx.net 1549 : 742 : MemoryContextSwitchTo(oldcontext);
1550 : : }
1551 : :
1552 : : /* stuff done on every call of the function */
1553 : 1584 : funcctx = SRF_PERCALL_SETUP();
1158 akapila@postgresql.o 1554 : 1584 : table_infos = (List *) funcctx->user_fctx;
1555 : :
1556 [ + + ]: 1584 : if (funcctx->call_cntr < list_length(table_infos))
1557 : : {
1472 1558 : 842 : HeapTuple pubtuple = NULL;
1559 : : HeapTuple rettuple;
1560 : : Publication *pub;
1158 1561 : 842 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1562 : 842 : Oid relid = table_info->relid;
1345 1563 : 842 : Oid schemaid = get_rel_namespace(relid);
1414 peter@eisentraut.org 1564 : 842 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1565 : 842 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1566 : :
1567 : : /*
1568 : : * Form tuple with appropriate data.
1569 : : */
1570 : :
1158 akapila@postgresql.o 1571 : 842 : pub = GetPublication(table_info->pubid);
1572 : :
1573 : 842 : values[0] = ObjectIdGetDatum(pub->oid);
1574 : 842 : values[1] = ObjectIdGetDatum(relid);
1575 : :
1576 : : /*
1577 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1578 : : * FOR TABLES IN SCHEMA publications.
1579 : : */
1580 [ + + ]: 842 : if (!pub->alltables &&
1345 1581 [ + + ]: 573 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1582 : : ObjectIdGetDatum(schemaid),
1583 : : ObjectIdGetDatum(pub->oid)))
1584 : 537 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1585 : : ObjectIdGetDatum(relid),
1586 : : ObjectIdGetDatum(pub->oid));
1587 : :
1472 1588 [ + + ]: 842 : if (HeapTupleIsValid(pubtuple))
1589 : : {
1590 : : /* Lookup the column list attribute. */
1158 1591 : 498 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1592 : : Anum_pg_publication_rel_prattrs,
1593 : : &(nulls[2]));
1594 : :
1595 : : /* Null indicates no filter. */
1596 : 498 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1597 : : Anum_pg_publication_rel_prqual,
1598 : : &(nulls[3]));
1599 : : }
1600 : : else
1601 : : {
1472 1602 : 344 : nulls[2] = true;
1158 1603 : 344 : nulls[3] = true;
1604 : : }
1605 : :
1606 : : /* Show all columns when the column list is not specified. */
1607 [ + + ]: 842 : if (nulls[2])
1608 : : {
1233 1609 : 746 : Relation rel = table_open(relid, AccessShareLock);
1610 : 746 : int nattnums = 0;
1611 : : int16 *attnums;
1612 : 746 : TupleDesc desc = RelationGetDescr(rel);
1613 : : int i;
1614 : :
171 michael@paquier.xyz 1615 :GNC 746 : attnums = palloc_array(int16, desc->natts);
1616 : :
1233 akapila@postgresql.o 1617 [ + + ]:CBC 1974 : for (i = 0; i < desc->natts; i++)
1618 : : {
1619 : 1228 : Form_pg_attribute att = TupleDescAttr(desc, i);
1620 : :
492 1621 [ + + ]: 1228 : if (att->attisdropped)
1233 1622 : 4 : continue;
1623 : :
492 1624 [ + + ]: 1224 : if (att->attgenerated)
1625 : : {
1626 : : /* We only support replication of STORED generated cols. */
1627 [ + + ]: 22 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1628 : 10 : continue;
1629 : :
1630 : : /*
1631 : : * User hasn't requested to replicate STORED generated
1632 : : * cols.
1633 : : */
1634 [ + + ]: 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1635 : 9 : continue;
1636 : : }
1637 : :
1233 1638 : 1205 : attnums[nattnums++] = att->attnum;
1639 : : }
1640 : :
1641 [ + + ]: 746 : if (nattnums > 0)
1642 : : {
1158 1643 : 742 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1644 : 742 : nulls[2] = false;
1645 : : }
1646 : :
1233 1647 : 746 : table_close(rel, AccessShareLock);
1648 : : }
1649 : :
1472 1650 : 842 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1651 : :
1652 : 842 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1653 : : }
1654 : :
3418 peter_e@gmx.net 1655 : 742 : SRF_RETURN_DONE(funcctx);
1656 : : }
1657 : :
1658 : : Datum
58 msawada@postgresql.o 1659 :GNC 550 : pg_get_publication_tables_a(PG_FUNCTION_ARGS)
1660 : : {
1661 : : /*
1662 : : * Get information for all tables in the given publications.
1663 : : * filter_by_relid is false so all tables are returned; pub_missing_ok is
1664 : : * false for backward compatibility.
1665 : : */
1666 : 550 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1667 : : InvalidOid, false, false);
1668 : : }
1669 : :
1670 : : Datum
1671 : 1042 : pg_get_publication_tables_b(PG_FUNCTION_ARGS)
1672 : : {
1673 : : /*
1674 : : * Get information for the specified table in the given publications. The
1675 : : * SQL-level function is declared STRICT, so target_relid is guaranteed to
1676 : : * be non-NULL here.
1677 : : */
1678 : 1042 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1679 : : PG_GETARG_OID(1), true, true);
1680 : : }
1681 : :
1682 : : /*
1683 : : * Returns Oids of sequences in a publication.
1684 : : */
1685 : : Datum
233 akapila@postgresql.o 1686 : 234 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1687 : : {
1688 : : FuncCallContext *funcctx;
1689 : 234 : List *sequences = NIL;
1690 : :
1691 : : /* stuff done only on the first call of the function */
1692 [ + + ]: 234 : if (SRF_IS_FIRSTCALL())
1693 : : {
1694 : 217 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1695 : : Publication *publication;
1696 : : MemoryContext oldcontext;
1697 : :
1698 : : /* create a function context for cross-call persistence */
1699 : 217 : funcctx = SRF_FIRSTCALL_INIT();
1700 : :
1701 : : /* switch to memory context appropriate for multiple function calls */
1702 : 217 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1703 : :
1704 : 217 : publication = GetPublicationByName(pubname, false);
1705 : :
1706 [ + + ]: 217 : if (publication->allsequences)
87 1707 : 6 : sequences = GetAllPublicationRelations(publication->oid,
1708 : : RELKIND_SEQUENCE,
1709 : : false);
1710 : :
190 peter@eisentraut.org 1711 : 217 : funcctx->user_fctx = sequences;
1712 : :
233 akapila@postgresql.o 1713 : 217 : MemoryContextSwitchTo(oldcontext);
1714 : : }
1715 : :
1716 : : /* stuff done on every call of the function */
1717 : 234 : funcctx = SRF_PERCALL_SETUP();
1718 : 234 : sequences = (List *) funcctx->user_fctx;
1719 : :
1720 [ + + ]: 234 : if (funcctx->call_cntr < list_length(sequences))
1721 : : {
1722 : 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1723 : :
1724 : 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1725 : : }
1726 : :
1727 : 217 : SRF_RETURN_DONE(funcctx);
1728 : : }
|