Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : /*
52 : : * Check if relation can be in given publication and throws appropriate
53 : : * error if not.
54 : : */
55 : : static void
62 akapila@postgresql.o 56 :GNC 807 : check_publication_add_relation(PublicationRelInfo *pri)
57 : : {
58 : 807 : Relation targetrel = pri->relation;
59 : : const char *errormsg;
60 : :
61 [ + + ]: 807 : if (pri->except)
34 62 : 81 : errormsg = gettext_noop("cannot specify relation \"%s\" in the publication EXCEPT clause");
63 : : else
62 64 : 726 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65 : :
66 : : /* If in EXCEPT clause, must be root partitioned table */
67 [ + + + + ]: 807 : if (pri->except && targetrel->rd_rel->relispartition)
68 [ + - ]: 4 : ereport(ERROR,
69 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
71 : : errdetail("This operation is not supported for individual partitions.")));
72 : :
73 : : /* Must be a regular or partitioned table */
2247 peter@eisentraut.org 74 [ + + ]:CBC 803 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
1489 tomas.vondra@postgre 75 [ + + ]: 115 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
3393 peter_e@gmx.net 76 [ + - ]: 9 : ereport(ERROR,
77 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
78 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
79 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
80 : :
81 : : /* Can't be system table */
82 [ + + ]: 794 : if (IsCatalogRelation(targetrel))
83 [ + - ]: 4 : ereport(ERROR,
84 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
86 : : errdetail("This operation is not supported for system tables.")));
87 : :
88 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
1630 dgustafsson@postgres 89 [ + + ]: 790 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3393 peter_e@gmx.net 90 [ + - ]: 4 : ereport(ERROR,
91 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
93 : : errdetail("This operation is not supported for temporary tables.")));
1630 dgustafsson@postgres 94 [ + + ]: 786 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
95 [ + - ]: 4 : ereport(ERROR,
96 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
97 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
98 : : errdetail("This operation is not supported for unlogged tables.")));
3393 peter_e@gmx.net 99 : 782 : }
100 : :
101 : : /*
102 : : * Check if schema can be in given publication and throw appropriate error if
103 : : * not.
104 : : */
105 : : static void
1651 akapila@postgresql.o 106 : 163 : check_publication_add_schema(Oid schemaid)
107 : : {
108 : : /* Can't be system namespace */
109 [ + + - + ]: 163 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
110 [ + - ]: 4 : ereport(ERROR,
111 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : : errmsg("cannot add schema \"%s\" to publication",
113 : : get_namespace_name(schemaid)),
114 : : errdetail("This operation is not supported for system schemas.")));
115 : :
116 : : /* Can't be temporary namespace */
117 [ - + ]: 159 : if (isAnyTempNamespace(schemaid))
1651 akapila@postgresql.o 118 [ # # ]:UBC 0 : ereport(ERROR,
119 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 : : errmsg("cannot add schema \"%s\" to publication",
121 : : get_namespace_name(schemaid)),
122 : : errdetail("Temporary schemas cannot be replicated.")));
1651 akapila@postgresql.o 123 :CBC 159 : }
124 : :
125 : : /*
126 : : * Returns if relation represented by oid and Form_pg_class entry
127 : : * is publishable.
128 : : *
129 : : * Does same checks as check_publication_add_relation() above except for
130 : : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 : : * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 : : * publication.
133 : : *
134 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 : : * ie all tables created during initdb. This mainly affects the preinstalled
136 : : * information_schema. IsCatalogRelationOid() only excludes tables with
137 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 : : * but really we should get rid of the FirstNormalObjectId test not
139 : : * IsCatalogRelationOid. We can't do so today because we don't want
140 : : * information_schema tables to be considered publishable; but this test
141 : : * is really inadequate for that, since the information_schema could be
142 : : * dropped and reloaded and then it'll be considered publishable. The best
143 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
144 : : * and depend on that instead of OID checks.
145 : : */
146 : : static bool
3393 peter_e@gmx.net 147 : 323126 : is_publishable_class(Oid relid, Form_pg_class reltuple)
148 : : {
2247 peter@eisentraut.org 149 : 331396 : return (reltuple->relkind == RELKIND_RELATION ||
208 akapila@postgresql.o 150 [ + + ]:GNC 8270 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
151 [ + + ]: 7264 : reltuple->relkind == RELKIND_SEQUENCE) &&
2554 tgl@sss.pgh.pa.us 152 [ + + ]:CBC 317233 : !IsCatalogRelationOid(relid) &&
3393 peter_e@gmx.net 153 [ + + + + : 646252 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
154 : : relid >= FirstNormalObjectId;
155 : : }
156 : :
157 : : /*
158 : : * Another variant of is_publishable_class(), taking a Relation.
159 : : */
160 : : bool
1376 akapila@postgresql.o 161 : 297619 : is_publishable_relation(Relation rel)
162 : : {
163 : 297619 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
164 : : }
165 : :
166 : : /*
167 : : * Similar to is_publishable_class() but checks whether the given OID
168 : : * is a publishable "table" or not.
169 : : */
170 : : static bool
33 msawada@postgresql.o 171 :GNC 546 : is_publishable_table(Oid tableoid)
172 : : {
173 : : HeapTuple tuple;
174 : : Form_pg_class relform;
175 : :
176 : 546 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(tableoid));
177 [ + + ]: 546 : if (!HeapTupleIsValid(tuple))
178 : 4 : return false;
179 : :
180 : 542 : relform = (Form_pg_class) GETSTRUCT(tuple);
181 : :
182 : : /*
183 : : * is_publishable_class() includes sequences, so we need to explicitly
184 : : * check the relkind to filter them out here.
185 : : */
186 [ + - + + ]: 1084 : if (relform->relkind != RELKIND_SEQUENCE &&
187 : 542 : is_publishable_class(tableoid, relform))
188 : : {
189 : 538 : ReleaseSysCache(tuple);
190 : 538 : return true;
191 : : }
192 : :
193 : 4 : ReleaseSysCache(tuple);
194 : 4 : return false;
195 : : }
196 : :
197 : : /*
198 : : * SQL-callable variant of the above
199 : : *
200 : : * This returns null when the relation does not exist. This is intended to be
201 : : * used for example in psql to avoid gratuitous errors when there are
202 : : * concurrent catalog changes.
203 : : */
204 : : Datum
1372 akapila@postgresql.o 205 :CBC 4288 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
206 : : {
207 : 4288 : Oid relid = PG_GETARG_OID(0);
208 : : HeapTuple tuple;
209 : : bool result;
210 : :
211 : 4288 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
212 [ - + ]: 4288 : if (!HeapTupleIsValid(tuple))
1372 akapila@postgresql.o 213 :UBC 0 : PG_RETURN_NULL();
1372 akapila@postgresql.o 214 :CBC 4288 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
215 : 4288 : ReleaseSysCache(tuple);
216 : 4288 : PG_RETURN_BOOL(result);
217 : : }
218 : :
219 : : /*
220 : : * Returns true if the ancestor is in the list of published relations.
221 : : * Otherwise, returns false.
222 : : */
223 : : static bool
1133 224 : 61 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
225 : : {
226 : : ListCell *lc;
227 : :
228 [ + - + + : 274 : foreach(lc, table_infos)
+ + ]
229 : : {
230 : 243 : Oid relid = ((published_rel *) lfirst(lc))->relid;
231 : :
232 [ + + ]: 243 : if (relid == ancestor)
233 : 30 : return true;
234 : : }
235 : :
236 : 31 : return false;
237 : : }
238 : :
239 : : /*
240 : : * Filter out the partitions whose parent tables are also present in the list.
241 : : */
242 : : static void
243 : 167 : filter_partitions(List *table_infos)
244 : : {
245 : : ListCell *lc;
246 : :
247 [ + + + + : 395 : foreach(lc, table_infos)
+ + ]
248 : : {
1651 249 : 228 : bool skip = false;
250 : 228 : List *ancestors = NIL;
251 : : ListCell *lc2;
1133 252 : 228 : published_rel *table_info = (published_rel *) lfirst(lc);
253 : :
254 [ + + ]: 228 : if (get_rel_relispartition(table_info->relid))
255 : 61 : ancestors = get_partition_ancestors(table_info->relid);
256 : :
1651 257 [ + + + + : 259 : foreach(lc2, ancestors)
+ + ]
258 : : {
259 : 61 : Oid ancestor = lfirst_oid(lc2);
260 : :
1133 261 [ + + ]: 61 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
262 : : {
1651 263 : 30 : skip = true;
264 : 30 : break;
265 : : }
266 : : }
267 : :
1133 268 [ + + ]: 228 : if (skip)
269 : 30 : table_infos = foreach_delete_current(table_infos, lc);
270 : : }
1651 271 : 167 : }
272 : :
273 : : /*
274 : : * Returns true if any schema is associated with the publication, false if no
275 : : * schema is associated with the publication.
276 : : */
277 : : bool
1609 278 : 215 : is_schema_publication(Oid pubid)
279 : : {
280 : : Relation pubschsrel;
281 : : ScanKeyData scankey;
282 : : SysScanDesc scan;
283 : : HeapTuple tup;
284 : 215 : bool result = false;
285 : :
286 : 215 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
287 : 215 : ScanKeyInit(&scankey,
288 : : Anum_pg_publication_namespace_pnpubid,
289 : : BTEqualStrategyNumber, F_OIDEQ,
290 : : ObjectIdGetDatum(pubid));
291 : :
292 : 215 : scan = systable_beginscan(pubschsrel,
293 : : PublicationNamespacePnnspidPnpubidIndexId,
294 : : true, NULL, 1, &scankey);
295 : 215 : tup = systable_getnext(scan);
296 : 215 : result = HeapTupleIsValid(tup);
297 : :
298 : 215 : systable_endscan(scan);
299 : 215 : table_close(pubschsrel, AccessShareLock);
300 : :
301 : 215 : return result;
302 : : }
303 : :
304 : : /*
305 : : * Returns true if the publication has explicitly included relation (i.e.,
306 : : * not marked as EXCEPT).
307 : : */
308 : : bool
46 akapila@postgresql.o 309 :GNC 49 : is_table_publication(Oid pubid)
310 : : {
311 : : Relation pubrelsrel;
312 : : ScanKeyData scankey;
313 : : SysScanDesc scan;
314 : : HeapTuple tup;
315 : 49 : bool result = false;
316 : :
317 : 49 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
318 : 49 : ScanKeyInit(&scankey,
319 : : Anum_pg_publication_rel_prpubid,
320 : : BTEqualStrategyNumber, F_OIDEQ,
321 : : ObjectIdGetDatum(pubid));
322 : :
323 : 49 : scan = systable_beginscan(pubrelsrel,
324 : : PublicationRelPrpubidIndexId,
325 : : true, NULL, 1, &scankey);
326 : 49 : tup = systable_getnext(scan);
327 [ + + ]: 49 : if (HeapTupleIsValid(tup))
328 : : {
329 : : Form_pg_publication_rel pubrel;
330 : :
331 : 25 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
332 : :
333 : : /*
334 : : * For any publication, pg_publication_rel contains either only EXCEPT
335 : : * entries or only explicitly included tables. Therefore, examining
336 : : * the first tuple is sufficient to determine table inclusion.
337 : : */
338 : 25 : result = !pubrel->prexcept;
339 : : }
340 : :
341 : 49 : systable_endscan(scan);
342 : 49 : table_close(pubrelsrel, AccessShareLock);
343 : :
344 : 49 : return result;
345 : : }
346 : :
347 : : /*
348 : : * Returns true if the relation has column list associated with the
349 : : * publication, false otherwise.
350 : : *
351 : : * If a column list is found, the corresponding bitmap is returned through the
352 : : * cols parameter, if provided. The bitmap is constructed within the given
353 : : * memory context (mcxt).
354 : : */
355 : : bool
544 akapila@postgresql.o 356 :CBC 908 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
357 : : Bitmapset **cols)
358 : : {
359 : : HeapTuple cftuple;
360 : 908 : bool found = false;
361 : :
362 [ + + ]: 908 : if (pub->alltables)
363 : 218 : return false;
364 : :
365 : 690 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
366 : : ObjectIdGetDatum(relid),
367 : : ObjectIdGetDatum(pub->oid));
368 [ + + ]: 690 : if (HeapTupleIsValid(cftuple))
369 : : {
370 : : Datum cfdatum;
371 : : bool isnull;
372 : :
373 : : /* Lookup the column list attribute. */
374 : 632 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
375 : : Anum_pg_publication_rel_prattrs, &isnull);
376 : :
377 : : /* Was a column list found? */
378 [ + + ]: 632 : if (!isnull)
379 : : {
380 : : /* Build the column list bitmap in the given memory context. */
381 [ + + ]: 191 : if (cols)
382 : 188 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
383 : :
384 : 191 : found = true;
385 : : }
386 : :
387 : 632 : ReleaseSysCache(cftuple);
388 : : }
389 : :
390 : 690 : return found;
391 : : }
392 : :
393 : : /*
394 : : * Gets the relations based on the publication partition option for a specified
395 : : * relation.
396 : : */
397 : : List *
1686 398 : 2497 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
399 : : Oid relid)
400 : : {
401 [ + + + + ]: 2497 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
402 : : pub_partopt != PUBLICATION_PART_ROOT)
403 : 701 : {
404 : 701 : List *all_parts = find_all_inheritors(relid, NoLock,
405 : : NULL);
406 : :
407 [ + + ]: 701 : if (pub_partopt == PUBLICATION_PART_ALL)
408 : 682 : result = list_concat(result, all_parts);
409 [ + - ]: 19 : else if (pub_partopt == PUBLICATION_PART_LEAF)
410 : : {
411 : : ListCell *lc;
412 : :
413 [ + - + + : 69 : foreach(lc, all_parts)
+ + ]
414 : : {
415 : 50 : Oid partOid = lfirst_oid(lc);
416 : :
417 [ + + ]: 50 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
418 : 30 : result = lappend_oid(result, partOid);
419 : : }
420 : : }
421 : : else
1686 akapila@postgresql.o 422 :UBC 0 : Assert(false);
423 : : }
424 : : else
1686 akapila@postgresql.o 425 :CBC 1796 : result = lappend_oid(result, relid);
426 : :
427 : 2497 : return result;
428 : : }
429 : :
430 : : /*
431 : : * Returns the relid of the topmost ancestor that is published via this
432 : : * publication if any and set its ancestor level to ancestor_level,
433 : : * otherwise returns InvalidOid.
434 : : *
435 : : * The ancestor_level value allows us to compare the results for multiple
436 : : * publications, and decide which value is higher up.
437 : : *
438 : : * Note that the list of ancestors should be ordered such that the topmost
439 : : * ancestor is at the end of the list.
440 : : */
441 : : Oid
1511 tomas.vondra@postgre 442 : 426 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
443 : : {
444 : : ListCell *lc;
1533 akapila@postgresql.o 445 : 426 : Oid topmost_relid = InvalidOid;
1511 tomas.vondra@postgre 446 : 426 : int level = 0;
447 : :
448 : : /*
449 : : * Find the "topmost" ancestor that is in this publication.
450 : : */
1533 akapila@postgresql.o 451 [ + - + + : 862 : foreach(lc, ancestors)
+ + ]
452 : : {
453 : 436 : Oid ancestor = lfirst_oid(lc);
62 akapila@postgresql.o 454 :GNC 436 : List *apubids = GetRelationIncludedPublications(ancestor);
1533 akapila@postgresql.o 455 :CBC 436 : List *aschemaPubids = NIL;
456 : :
1511 tomas.vondra@postgre 457 : 436 : level++;
458 : :
1533 akapila@postgresql.o 459 [ + + ]: 436 : if (list_member_oid(apubids, puboid))
460 : : {
461 : 223 : topmost_relid = ancestor;
462 : :
1511 tomas.vondra@postgre 463 [ + + ]: 223 : if (ancestor_level)
464 : 43 : *ancestor_level = level;
465 : : }
466 : : else
467 : : {
1489 468 : 213 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
1533 akapila@postgresql.o 469 [ + + ]: 213 : if (list_member_oid(aschemaPubids, puboid))
470 : : {
471 : 13 : topmost_relid = ancestor;
472 : :
1511 tomas.vondra@postgre 473 [ + + ]: 13 : if (ancestor_level)
474 : 5 : *ancestor_level = level;
475 : : }
476 : : }
477 : :
1533 akapila@postgresql.o 478 : 436 : list_free(apubids);
479 : 436 : list_free(aschemaPubids);
480 : : }
481 : :
482 : 426 : return topmost_relid;
483 : : }
484 : :
485 : : /*
486 : : * attnumstoint2vector
487 : : * Convert a Bitmapset of AttrNumbers into an int2vector.
488 : : *
489 : : * AttrNumber numbers are 0-based, i.e., not offset by
490 : : * FirstLowInvalidHeapAttributeNumber.
491 : : */
492 : : static int2vector *
628 drowley@postgresql.o 493 : 212 : attnumstoint2vector(Bitmapset *attrs)
494 : : {
495 : : int2vector *result;
496 : 212 : int n = bms_num_members(attrs);
497 : 212 : int i = -1;
498 : 212 : int j = 0;
499 : :
500 : 212 : result = buildint2vector(NULL, n);
501 : :
502 [ + + ]: 577 : while ((i = bms_next_member(attrs, i)) >= 0)
503 : : {
504 [ - + ]: 365 : Assert(i <= PG_INT16_MAX);
505 : :
506 : 365 : result->values[j++] = (int16) i;
507 : : }
508 : :
509 : 212 : return result;
510 : : }
511 : :
512 : : /*
513 : : * Insert new publication / relation mapping.
514 : : */
515 : : ObjectAddress
1533 akapila@postgresql.o 516 : 829 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
517 : : bool if_not_exists, AlterPublicationStmt *alter_stmt)
518 : : {
519 : : Relation rel;
520 : : HeapTuple tup;
521 : : Datum values[Natts_pg_publication_rel];
522 : : bool nulls[Natts_pg_publication_rel];
523 : 829 : Relation targetrel = pri->relation;
524 : 829 : Oid relid = RelationGetRelid(targetrel);
525 : : Oid pubreloid;
526 : : Bitmapset *attnums;
3393 peter_e@gmx.net 527 : 829 : Publication *pub = GetPublication(pubid);
528 : : ObjectAddress myself,
529 : : referenced;
1686 akapila@postgresql.o 530 : 829 : List *relids = NIL;
531 : : int i;
532 : : bool inval_except_table;
533 : :
2661 andres@anarazel.de 534 : 829 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
535 : :
536 : : /*
537 : : * Check for duplicates. Note that this does not really prevent
538 : : * duplicates, it's here just to provide nicer error message in common
539 : : * case. The real protection is the unique key on the catalog.
540 : : */
3393 peter_e@gmx.net 541 [ + + ]: 829 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
542 : : ObjectIdGetDatum(pubid)))
543 : : {
2661 andres@anarazel.de 544 : 22 : table_close(rel, RowExclusiveLock);
545 : :
3393 peter_e@gmx.net 546 [ + + ]: 22 : if (if_not_exists)
547 : 18 : return InvalidObjectAddress;
548 : :
549 [ + - ]: 4 : ereport(ERROR,
550 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
551 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
552 : : RelationGetRelationName(targetrel), pub->name)));
553 : : }
554 : :
62 akapila@postgresql.o 555 :GNC 807 : check_publication_add_relation(pri);
556 : :
557 : : /* Validate and translate column names into a Bitmapset of attnums. */
628 drowley@postgresql.o 558 :CBC 782 : attnums = pub_collist_validate(pri->relation, pri->columns);
559 : :
560 : : /* Form a tuple. */
3393 peter_e@gmx.net 561 : 766 : memset(values, 0, sizeof(values));
562 : 766 : memset(nulls, false, sizeof(nulls));
563 : :
1587 alvherre@alvh.no-ip. 564 : 766 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
565 : : Anum_pg_publication_rel_oid);
566 : 766 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
3393 peter_e@gmx.net 567 : 766 : values[Anum_pg_publication_rel_prpubid - 1] =
568 : 766 : ObjectIdGetDatum(pubid);
569 : 766 : values[Anum_pg_publication_rel_prrelid - 1] =
570 : 766 : ObjectIdGetDatum(relid);
62 akapila@postgresql.o 571 :GNC 766 : values[Anum_pg_publication_rel_prexcept - 1] =
572 : 766 : BoolGetDatum(pri->except);
573 : :
574 : : /* Add qualifications, if available */
1533 akapila@postgresql.o 575 [ + + ]:CBC 766 : if (pri->whereClause != NULL)
576 : 219 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
577 : : else
578 : 547 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
579 : :
580 : : /* Add column list, if available */
1501 tomas.vondra@postgre 581 [ + + ]: 766 : if (pri->columns)
628 drowley@postgresql.o 582 : 212 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
583 : : else
1501 tomas.vondra@postgre 584 : 554 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
585 : :
3393 peter_e@gmx.net 586 : 766 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
587 : :
588 : : /* Insert tuple into catalog. */
2723 andres@anarazel.de 589 : 766 : CatalogTupleInsert(rel, tup);
3393 peter_e@gmx.net 590 : 766 : heap_freetuple(tup);
591 : :
592 : : /* Register dependencies as needed */
1587 alvherre@alvh.no-ip. 593 : 766 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
594 : :
595 : : /* Add dependency on the publication */
3393 peter_e@gmx.net 596 : 766 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
597 : 766 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
598 : :
599 : : /* Add dependency on the relation */
600 : 766 : ObjectAddressSet(referenced, RelationRelationId, relid);
601 : 766 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
602 : :
603 : : /* Add dependency on the objects mentioned in the qualifications */
1533 akapila@postgresql.o 604 [ + + ]: 766 : if (pri->whereClause)
605 : 219 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
606 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
607 : : false);
608 : :
609 : : /* Add dependency on the columns, if any are listed */
628 drowley@postgresql.o 610 : 766 : i = -1;
611 [ + + ]: 1131 : while ((i = bms_next_member(attnums, i)) >= 0)
612 : : {
613 : 365 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
1501 tomas.vondra@postgre 614 : 365 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
615 : : }
616 : :
617 : : /* Close the table. */
2661 andres@anarazel.de 618 : 766 : table_close(rel, RowExclusiveLock);
619 : :
620 : : /*
621 : : * Determine whether EXCEPT tables require explicit relcache invalidation.
622 : : *
623 : : * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
624 : : * here, as CreatePublication() function invalidates all relations as part
625 : : * of defining a FOR ALL TABLES publication.
626 : : *
627 : : * For ALTER PUBLICATION, invalidation is needed only when adding an
628 : : * EXCEPT table to a publication already marked as ALL TABLES. For
629 : : * publications that were originally empty or defined as ALL SEQUENCES and
630 : : * are being converted to ALL TABLES, invalidation is skipped here, as
631 : : * AlterPublicationAllFlags() function invalidates all relations while
632 : : * marking the publication as ALL TABLES publication.
633 : : */
46 akapila@postgresql.o 634 [ + + + + ]:GNC 775 : inval_except_table = (alter_stmt != NULL) && pub->alltables &&
635 [ + - + - ]: 9 : (alter_stmt->for_all_tables && pri->except);
636 : :
637 [ + + + + ]: 766 : if (!pri->except || inval_except_table)
638 : : {
639 : : /*
640 : : * Invalidate relcache so that publication info is rebuilt.
641 : : *
642 : : * For the partitioned tables, we must invalidate all partitions
643 : : * contained in the respective partition hierarchies, not just the one
644 : : * explicitly mentioned in the publication. This is required because
645 : : * we implicitly publish the child tables when the parent table is
646 : : * published.
647 : : */
62 648 : 698 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
649 : : relid);
650 : :
651 : 698 : InvalidatePublicationRels(relids);
652 : : }
653 : :
3393 peter_e@gmx.net 654 :CBC 766 : return myself;
655 : : }
656 : :
657 : : /*
658 : : * pub_collist_validate
659 : : * Process and validate the 'columns' list and ensure the columns are all
660 : : * valid to use for a publication. Checks for and raises an ERROR for
661 : : * any unknown columns, system columns, duplicate columns, or virtual
662 : : * generated columns.
663 : : *
664 : : * Looks up each column's attnum and returns a 0-based Bitmapset of the
665 : : * corresponding attnums.
666 : : */
667 : : Bitmapset *
628 drowley@postgresql.o 668 : 1072 : pub_collist_validate(Relation targetrel, List *columns)
669 : : {
1501 tomas.vondra@postgre 670 : 1072 : Bitmapset *set = NULL;
671 : : ListCell *lc;
452 peter@eisentraut.org 672 : 1072 : TupleDesc tupdesc = RelationGetDescr(targetrel);
673 : :
1501 tomas.vondra@postgre 674 [ + + + + : 1617 : foreach(lc, columns)
+ + ]
675 : : {
676 : 569 : char *colname = strVal(lfirst(lc));
677 : 569 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
678 : :
679 [ + + ]: 569 : if (attnum == InvalidAttrNumber)
680 [ + - ]: 4 : ereport(ERROR,
681 : : errcode(ERRCODE_UNDEFINED_COLUMN),
682 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
683 : : colname, RelationGetRelationName(targetrel)));
684 : :
685 [ + + ]: 565 : if (!AttrNumberIsForUserDefinedAttr(attnum))
686 [ + - ]: 8 : ereport(ERROR,
687 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
688 : : errmsg("cannot use system column \"%s\" in publication column list",
689 : : colname));
690 : :
452 peter@eisentraut.org 691 [ + + ]: 557 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
692 [ + - ]: 4 : ereport(ERROR,
693 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
694 : : errmsg("cannot use virtual generated column \"%s\" in publication column list",
695 : : colname));
696 : :
1501 tomas.vondra@postgre 697 [ + + ]: 553 : if (bms_is_member(attnum, set))
698 [ + - ]: 8 : ereport(ERROR,
699 : : errcode(ERRCODE_DUPLICATE_OBJECT),
700 : : errmsg("duplicate column \"%s\" in publication column list",
701 : : colname));
702 : :
703 : 545 : set = bms_add_member(set, attnum);
704 : : }
705 : :
628 drowley@postgresql.o 706 : 1048 : return set;
707 : : }
708 : :
709 : : /*
710 : : * Transform a column list (represented by an array Datum) to a bitmapset.
711 : : *
712 : : * If columns isn't NULL, add the column numbers to that set.
713 : : *
714 : : * If mcxt isn't NULL, build the bitmapset in that context.
715 : : */
716 : : Bitmapset *
1501 tomas.vondra@postgre 717 : 278 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
718 : : {
628 drowley@postgresql.o 719 : 278 : Bitmapset *result = columns;
720 : : ArrayType *arr;
721 : : int nelems;
722 : : int16 *elems;
1454 tgl@sss.pgh.pa.us 723 : 278 : MemoryContext oldcxt = NULL;
724 : :
1501 tomas.vondra@postgre 725 : 278 : arr = DatumGetArrayTypeP(pubcols);
726 : 278 : nelems = ARR_DIMS(arr)[0];
727 [ - + ]: 278 : elems = (int16 *) ARR_DATA_PTR(arr);
728 : :
729 : : /* If a memory context was specified, switch to it. */
730 [ + + ]: 278 : if (mcxt)
731 : 39 : oldcxt = MemoryContextSwitchTo(mcxt);
732 : :
733 [ + + ]: 764 : for (int i = 0; i < nelems; i++)
734 : 486 : result = bms_add_member(result, elems[i]);
735 : :
736 [ + + ]: 278 : if (mcxt)
737 : 39 : MemoryContextSwitchTo(oldcxt);
738 : :
739 : 278 : return result;
740 : : }
741 : :
742 : : /*
743 : : * Returns a bitmap representing the columns of the specified table.
744 : : *
745 : : * Generated columns are included if include_gencols_type is
746 : : * PUBLISH_GENCOLS_STORED.
747 : : */
748 : : Bitmapset *
467 akapila@postgresql.o 749 : 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
750 : : {
544 751 : 9 : Bitmapset *result = NULL;
752 : 9 : TupleDesc desc = RelationGetDescr(relation);
753 : :
754 [ + + ]: 30 : for (int i = 0; i < desc->natts; i++)
755 : : {
756 : 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
757 : :
467 758 [ + + ]: 21 : if (att->attisdropped)
544 759 : 1 : continue;
760 : :
467 761 [ + + ]: 20 : if (att->attgenerated)
762 : : {
763 : : /* We only support replication of STORED generated cols. */
764 [ + + ]: 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
765 : 1 : continue;
766 : :
767 : : /* User hasn't requested to replicate STORED generated cols. */
768 [ + - ]: 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
769 : 1 : continue;
770 : : }
771 : :
544 772 : 18 : result = bms_add_member(result, att->attnum);
773 : : }
774 : :
775 : 9 : return result;
776 : : }
777 : :
778 : : /*
779 : : * Insert new publication / schema mapping.
780 : : */
781 : : ObjectAddress
1489 tomas.vondra@postgre 782 : 175 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
783 : : {
784 : : Relation rel;
785 : : HeapTuple tup;
786 : : Datum values[Natts_pg_publication_namespace];
787 : : bool nulls[Natts_pg_publication_namespace];
788 : : Oid psschid;
1651 akapila@postgresql.o 789 : 175 : Publication *pub = GetPublication(pubid);
790 : 175 : List *schemaRels = NIL;
791 : : ObjectAddress myself,
792 : : referenced;
793 : :
794 : 175 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
795 : :
796 : : /*
797 : : * Check for duplicates. Note that this does not really prevent
798 : : * duplicates, it's here just to provide nicer error message in common
799 : : * case. The real protection is the unique key on the catalog.
800 : : */
1489 tomas.vondra@postgre 801 [ + + ]: 175 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
802 : : ObjectIdGetDatum(schemaid),
803 : : ObjectIdGetDatum(pubid)))
804 : : {
1651 akapila@postgresql.o 805 : 12 : table_close(rel, RowExclusiveLock);
806 : :
807 [ + + ]: 12 : if (if_not_exists)
808 : 8 : return InvalidObjectAddress;
809 : :
810 [ + - ]: 4 : ereport(ERROR,
811 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
812 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
813 : : get_namespace_name(schemaid), pub->name)));
814 : : }
815 : :
816 : 163 : check_publication_add_schema(schemaid);
817 : :
818 : : /* Form a tuple */
819 : 159 : memset(values, 0, sizeof(values));
820 : 159 : memset(nulls, false, sizeof(nulls));
821 : :
822 : 159 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
823 : : Anum_pg_publication_namespace_oid);
824 : 159 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
825 : 159 : values[Anum_pg_publication_namespace_pnpubid - 1] =
826 : 159 : ObjectIdGetDatum(pubid);
827 : 159 : values[Anum_pg_publication_namespace_pnnspid - 1] =
828 : 159 : ObjectIdGetDatum(schemaid);
829 : :
830 : 159 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
831 : :
832 : : /* Insert tuple into catalog */
833 : 159 : CatalogTupleInsert(rel, tup);
834 : 159 : heap_freetuple(tup);
835 : :
836 : 159 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
837 : :
838 : : /* Add dependency on the publication */
839 : 159 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
840 : 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
841 : :
842 : : /* Add dependency on the schema */
843 : 159 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
844 : 159 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
845 : :
846 : : /* Close the table */
847 : 159 : table_close(rel, RowExclusiveLock);
848 : :
849 : : /*
850 : : * Invalidate relcache so that publication info is rebuilt. See
851 : : * publication_add_relation for why we need to consider all the
852 : : * partitions.
853 : : */
1489 tomas.vondra@postgre 854 : 159 : schemaRels = GetSchemaPublicationRelations(schemaid,
855 : : PUBLICATION_PART_ALL);
1651 akapila@postgresql.o 856 : 159 : InvalidatePublicationRels(schemaRels);
857 : :
858 : 159 : return myself;
859 : : }
860 : :
861 : : /*
862 : : * Internal function to get the list of publication oids for a relation.
863 : : *
864 : : * If except_flag is true, returns the list of publication that specified the
865 : : * relation in the EXCEPT clause; otherwise, returns the list of publications
866 : : * in which relation is included.
867 : : */
868 : : static List *
62 akapila@postgresql.o 869 :GNC 16673 : get_relation_publications(Oid relid, bool except_flag)
870 : : {
3275 bruce@momjian.us 871 :CBC 16673 : List *result = NIL;
872 : : CatCList *pubrellist;
873 : :
874 : : /* Find all publications associated with the relation. */
3393 peter_e@gmx.net 875 : 16673 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
876 : : ObjectIdGetDatum(relid));
62 akapila@postgresql.o 877 [ + + ]:GNC 18293 : for (int i = 0; i < pubrellist->n_members; i++)
878 : : {
3393 peter_e@gmx.net 879 :CBC 1620 : HeapTuple tup = &pubrellist->members[i]->tuple;
62 akapila@postgresql.o 880 :GNC 1620 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
881 : 1620 : Oid pubid = pubrel->prpubid;
882 : :
883 [ + + ]: 1620 : if (pubrel->prexcept == except_flag)
884 : 1150 : result = lappend_oid(result, pubid);
885 : : }
886 : :
3393 peter_e@gmx.net 887 :CBC 16673 : ReleaseSysCacheList(pubrellist);
888 : :
889 : 16673 : return result;
890 : : }
891 : :
892 : : /*
893 : : * Gets list of publication oids for a relation.
894 : : */
895 : : List *
62 akapila@postgresql.o 896 :GNC 8979 : GetRelationIncludedPublications(Oid relid)
897 : : {
898 : 8979 : return get_relation_publications(relid, false);
899 : : }
900 : :
901 : : /*
902 : : * Gets list of publication oids which has relation in the EXCEPT clause.
903 : : */
904 : : List *
905 : 7694 : GetRelationExcludedPublications(Oid relid)
906 : : {
907 : 7694 : return get_relation_publications(relid, true);
908 : : }
909 : :
910 : : /*
911 : : * Internal function to get the list of relation oids for a publication.
912 : : *
913 : : * If except_flag is true, returns the list of relations specified in the
914 : : * EXCEPT clause of the publication; otherwise, returns the list of relations
915 : : * included in the publication.
916 : : */
917 : : static List *
918 : 680 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
919 : : bool except_flag)
920 : : {
921 : : List *result;
922 : : Relation pubrelsrel;
923 : : ScanKeyData scankey;
924 : : SysScanDesc scan;
925 : : HeapTuple tup;
926 : :
927 : : /* Find all relations associated with the publication. */
2661 andres@anarazel.de 928 :CBC 680 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
929 : :
3393 peter_e@gmx.net 930 : 680 : ScanKeyInit(&scankey,
931 : : Anum_pg_publication_rel_prpubid,
932 : : BTEqualStrategyNumber, F_OIDEQ,
933 : : ObjectIdGetDatum(pubid));
934 : :
1574 alvherre@alvh.no-ip. 935 : 680 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
936 : : true, NULL, 1, &scankey);
937 : :
3393 peter_e@gmx.net 938 : 680 : result = NIL;
939 [ + + ]: 2000 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
940 : : {
941 : : Form_pg_publication_rel pubrel;
942 : :
943 : 640 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
944 : :
62 akapila@postgresql.o 945 [ - + ]:GNC 640 : if (except_flag == pubrel->prexcept)
946 : 640 : result = GetPubPartitionOptionRelations(result, pub_partopt,
947 : : pubrel->prrelid);
948 : : }
949 : :
3393 peter_e@gmx.net 950 :CBC 680 : systable_endscan(scan);
2661 andres@anarazel.de 951 : 680 : table_close(pubrelsrel, AccessShareLock);
952 : :
953 : : /* Now sort and de-duplicate the result list */
1609 akapila@postgresql.o 954 : 680 : list_sort(result, list_oid_cmp);
955 : 680 : list_deduplicate_oid(result);
956 : :
3393 peter_e@gmx.net 957 : 680 : return result;
958 : : }
959 : :
960 : : /*
961 : : * Gets list of relation oids that are associated with a publication.
962 : : *
963 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
964 : : * should use GetAllPublicationRelations().
965 : : */
966 : : List *
62 akapila@postgresql.o 967 :GNC 610 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
968 : : {
969 [ - + ]: 610 : Assert(!GetPublication(pubid)->alltables);
970 : :
971 : 610 : return get_publication_relations(pubid, pub_partopt, false);
972 : : }
973 : :
974 : : /*
975 : : * Gets list of table oids that were specified in the EXCEPT clause for a
976 : : * publication.
977 : : *
978 : : * This should only be used FOR ALL TABLES publications.
979 : : */
980 : : List *
981 : 70 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
982 : : {
983 [ - + ]: 70 : Assert(GetPublication(pubid)->alltables);
984 : :
985 : 70 : return get_publication_relations(pubid, pub_partopt, true);
986 : : }
987 : :
988 : : /*
989 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
990 : : */
991 : : List *
3393 peter_e@gmx.net 992 :CBC 6003 : GetAllTablesPublications(void)
993 : : {
994 : : List *result;
995 : : Relation rel;
996 : : ScanKeyData scankey;
997 : : SysScanDesc scan;
998 : : HeapTuple tup;
999 : :
1000 : : /* Find all publications that are marked as for all tables. */
2661 andres@anarazel.de 1001 : 6003 : rel = table_open(PublicationRelationId, AccessShareLock);
1002 : :
3393 peter_e@gmx.net 1003 : 6003 : ScanKeyInit(&scankey,
1004 : : Anum_pg_publication_puballtables,
1005 : : BTEqualStrategyNumber, F_BOOLEQ,
1006 : : BoolGetDatum(true));
1007 : :
1008 : 6003 : scan = systable_beginscan(rel, InvalidOid, false,
1009 : : NULL, 1, &scankey);
1010 : :
1011 : 6003 : result = NIL;
1012 [ + + ]: 6133 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1013 : : {
2540 tgl@sss.pgh.pa.us 1014 : 130 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
1015 : :
2723 andres@anarazel.de 1016 : 130 : result = lappend_oid(result, oid);
1017 : : }
1018 : :
3393 peter_e@gmx.net 1019 : 6003 : systable_endscan(scan);
2661 andres@anarazel.de 1020 : 6003 : table_close(rel, AccessShareLock);
1021 : :
3393 peter_e@gmx.net 1022 : 6003 : return result;
1023 : : }
1024 : :
1025 : : /*
1026 : : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
1027 : : * publication.
1028 : : *
1029 : : * If the publication publishes partition changes via their respective root
1030 : : * partitioned tables, we must exclude partitions in favor of including the
1031 : : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1032 : : * publication.
1033 : : *
1034 : : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1035 : : * in the EXCEPT clause.
1036 : : */
1037 : : List *
62 akapila@postgresql.o 1038 :GNC 55 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1039 : : {
1040 : : Relation classRel;
1041 : : ScanKeyData key[1];
1042 : : TableScanDesc scan;
1043 : : HeapTuple tuple;
3393 peter_e@gmx.net 1044 :CBC 55 : List *result = NIL;
62 akapila@postgresql.o 1045 :GNC 55 : List *exceptlist = NIL;
1046 : :
208 1047 [ + + - + ]: 55 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1048 : :
1049 : : /* EXCEPT filtering applies only to relations, not sequences */
62 1050 [ + + ]: 55 : if (relkind == RELKIND_RELATION)
1051 : 49 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1052 : 49 : PUBLICATION_PART_ROOT :
1053 : : PUBLICATION_PART_LEAF);
1054 : :
2661 andres@anarazel.de 1055 :CBC 55 : classRel = table_open(RelationRelationId, AccessShareLock);
1056 : :
3393 peter_e@gmx.net 1057 : 55 : ScanKeyInit(&key[0],
1058 : : Anum_pg_class_relkind,
1059 : : BTEqualStrategyNumber, F_CHAREQ,
1060 : : CharGetDatum(relkind));
1061 : :
2612 andres@anarazel.de 1062 : 55 : scan = table_beginscan_catalog(classRel, 1, key);
1063 : :
3393 peter_e@gmx.net 1064 [ + + ]: 3769 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1065 : : {
3275 bruce@momjian.us 1066 : 3714 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
2723 andres@anarazel.de 1067 : 3714 : Oid relid = relForm->oid;
1068 : :
2218 peter@eisentraut.org 1069 [ + + ]: 3714 : if (is_publishable_class(relid, relForm) &&
62 akapila@postgresql.o 1070 [ + + + + ]:GNC 135 : !(relForm->relispartition && pubviaroot) &&
1071 [ + + ]: 112 : !list_member_oid(exceptlist, relid))
3393 peter_e@gmx.net 1072 :CBC 104 : result = lappend_oid(result, relid);
1073 : : }
1074 : :
2612 andres@anarazel.de 1075 : 55 : table_endscan(scan);
1076 : :
2218 peter@eisentraut.org 1077 [ + + ]: 55 : if (pubviaroot)
1078 : : {
1079 : 4 : ScanKeyInit(&key[0],
1080 : : Anum_pg_class_relkind,
1081 : : BTEqualStrategyNumber, F_CHAREQ,
1082 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
1083 : :
1084 : 4 : scan = table_beginscan_catalog(classRel, 1, key);
1085 : :
1086 [ + + ]: 21 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1087 : : {
1088 : 17 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1089 : 17 : Oid relid = relForm->oid;
1090 : :
1091 [ + - ]: 17 : if (is_publishable_class(relid, relForm) &&
62 akapila@postgresql.o 1092 [ + + ]:GNC 17 : !relForm->relispartition &&
1093 [ + + ]: 13 : !list_member_oid(exceptlist, relid))
2218 peter@eisentraut.org 1094 :CBC 12 : result = lappend_oid(result, relid);
1095 : : }
1096 : :
1097 : 4 : table_endscan(scan);
1098 : : }
1099 : :
2215 1100 : 55 : table_close(classRel, AccessShareLock);
3393 peter_e@gmx.net 1101 : 55 : return result;
1102 : : }
1103 : :
1104 : : /*
1105 : : * Gets the list of schema oids for a publication.
1106 : : *
1107 : : * This should only be used FOR TABLES IN SCHEMA publications.
1108 : : */
1109 : : List *
1489 tomas.vondra@postgre 1110 : 576 : GetPublicationSchemas(Oid pubid)
1111 : : {
1651 akapila@postgresql.o 1112 : 576 : List *result = NIL;
1113 : : Relation pubschsrel;
1114 : : ScanKeyData scankey;
1115 : : SysScanDesc scan;
1116 : : HeapTuple tup;
1117 : :
1118 : : /* Find all schemas associated with the publication */
1119 : 576 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1120 : :
1489 tomas.vondra@postgre 1121 : 576 : ScanKeyInit(&scankey,
1122 : : Anum_pg_publication_namespace_pnpubid,
1123 : : BTEqualStrategyNumber, F_OIDEQ,
1124 : : ObjectIdGetDatum(pubid));
1125 : :
1651 akapila@postgresql.o 1126 : 576 : scan = systable_beginscan(pubschsrel,
1127 : : PublicationNamespacePnnspidPnpubidIndexId,
1128 : : true, NULL, 1, &scankey);
1129 [ + + ]: 616 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1130 : : {
1131 : : Form_pg_publication_namespace pubsch;
1132 : :
1133 : 40 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1134 : :
1135 : 40 : result = lappend_oid(result, pubsch->pnnspid);
1136 : : }
1137 : :
1138 : 576 : systable_endscan(scan);
1139 : 576 : table_close(pubschsrel, AccessShareLock);
1140 : :
1141 : 576 : return result;
1142 : : }
1143 : :
1144 : : /*
1145 : : * Gets the list of publication oids associated with a specified schema.
1146 : : */
1147 : : List *
1489 tomas.vondra@postgre 1148 : 8647 : GetSchemaPublications(Oid schemaid)
1149 : : {
1651 akapila@postgresql.o 1150 : 8647 : List *result = NIL;
1151 : : CatCList *pubschlist;
1152 : : int i;
1153 : :
1154 : : /* Find all publications associated with the schema */
1155 : 8647 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1156 : : ObjectIdGetDatum(schemaid));
1157 [ + + ]: 8726 : for (i = 0; i < pubschlist->n_members; i++)
1158 : : {
1159 : 79 : HeapTuple tup = &pubschlist->members[i]->tuple;
1160 : 79 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1161 : :
1162 : 79 : result = lappend_oid(result, pubid);
1163 : : }
1164 : :
1165 : 8647 : ReleaseSysCacheList(pubschlist);
1166 : :
1167 : 8647 : return result;
1168 : : }
1169 : :
1170 : : /*
1171 : : * Get the list of publishable relation oids for a specified schema.
1172 : : */
1173 : : List *
1489 tomas.vondra@postgre 1174 : 310 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1175 : : {
1176 : : Relation classRel;
1177 : : ScanKeyData key[1];
1178 : : TableScanDesc scan;
1179 : : HeapTuple tuple;
1651 akapila@postgresql.o 1180 : 310 : List *result = NIL;
1181 : :
1182 [ - + ]: 310 : Assert(OidIsValid(schemaid));
1183 : :
1184 : 310 : classRel = table_open(RelationRelationId, AccessShareLock);
1185 : :
1186 : 310 : ScanKeyInit(&key[0],
1187 : : Anum_pg_class_relnamespace,
1188 : : BTEqualStrategyNumber, F_OIDEQ,
1189 : : ObjectIdGetDatum(schemaid));
1190 : :
1191 : : /* get all the relations present in the specified schema */
1192 : 310 : scan = table_beginscan_catalog(classRel, 1, key);
1193 [ + + ]: 17256 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1194 : : {
1195 : 16946 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1196 : 16946 : Oid relid = relForm->oid;
1197 : : char relkind;
1198 : :
1199 [ + + ]: 16946 : if (!is_publishable_class(relid, relForm))
1200 : 5839 : continue;
1201 : :
1202 : 11107 : relkind = get_rel_relkind(relid);
1489 tomas.vondra@postgre 1203 [ + + ]: 11107 : if (relkind == RELKIND_RELATION)
1204 : 9554 : result = lappend_oid(result, relid);
1205 [ + + ]: 1553 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1206 : : {
1651 akapila@postgresql.o 1207 : 495 : List *partitionrels = NIL;
1208 : :
1209 : : /*
1210 : : * It is quite possible that some of the partitions are in a
1211 : : * different schema than the parent table, so we need to get such
1212 : : * partitions separately.
1213 : : */
1214 : 495 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1215 : : pub_partopt,
1216 : : relForm->oid);
1217 : 495 : result = list_concat_unique_oid(result, partitionrels);
1218 : : }
1219 : : }
1220 : :
1221 : 310 : table_endscan(scan);
1222 : 310 : table_close(classRel, AccessShareLock);
1223 : 310 : return result;
1224 : : }
1225 : :
1226 : : /*
1227 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1228 : : * publication.
1229 : : */
1230 : : List *
1489 tomas.vondra@postgre 1231 : 283 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1232 : : {
1651 akapila@postgresql.o 1233 : 283 : List *result = NIL;
1489 tomas.vondra@postgre 1234 : 283 : List *pubschemalist = GetPublicationSchemas(pubid);
1235 : : ListCell *cell;
1236 : :
1651 akapila@postgresql.o 1237 [ + + + + : 303 : foreach(cell, pubschemalist)
+ + ]
1238 : : {
1239 : 20 : Oid schemaid = lfirst_oid(cell);
1240 : 20 : List *schemaRels = NIL;
1241 : :
1489 tomas.vondra@postgre 1242 : 20 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1651 akapila@postgresql.o 1243 : 20 : result = list_concat(result, schemaRels);
1244 : : }
1245 : :
1246 : 283 : return result;
1247 : : }
1248 : :
1249 : : /*
1250 : : * Get publication using oid
1251 : : *
1252 : : * The Publication struct and its data are palloc'ed here.
1253 : : */
1254 : : Publication *
3393 peter_e@gmx.net 1255 : 4843 : GetPublication(Oid pubid)
1256 : : {
1257 : : HeapTuple tup;
1258 : : Publication *pub;
1259 : : Form_pg_publication pubform;
1260 : :
1261 : 4843 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1262 [ - + ]: 4843 : if (!HeapTupleIsValid(tup))
3393 peter_e@gmx.net 1263 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1264 : :
3393 peter_e@gmx.net 1265 :CBC 4843 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1266 : :
146 michael@paquier.xyz 1267 :GNC 4843 : pub = palloc_object(Publication);
3393 peter_e@gmx.net 1268 :CBC 4843 : pub->oid = pubid;
1269 : 4843 : pub->name = pstrdup(NameStr(pubform->pubname));
1270 : 4843 : pub->alltables = pubform->puballtables;
208 akapila@postgresql.o 1271 :GNC 4843 : pub->allsequences = pubform->puballsequences;
3393 peter_e@gmx.net 1272 :CBC 4843 : pub->pubactions.pubinsert = pubform->pubinsert;
1273 : 4843 : pub->pubactions.pubupdate = pubform->pubupdate;
1274 : 4843 : pub->pubactions.pubdelete = pubform->pubdelete;
2950 1275 : 4843 : pub->pubactions.pubtruncate = pubform->pubtruncate;
2218 peter@eisentraut.org 1276 : 4843 : pub->pubviaroot = pubform->pubviaroot;
462 akapila@postgresql.o 1277 : 4843 : pub->pubgencols_type = pubform->pubgencols;
1278 : :
3393 peter_e@gmx.net 1279 : 4843 : ReleaseSysCache(tup);
1280 : :
1281 : 4843 : return pub;
1282 : : }
1283 : :
1284 : : /*
1285 : : * Get Publication using name.
1286 : : */
1287 : : Publication *
1288 : 1763 : GetPublicationByName(const char *pubname, bool missing_ok)
1289 : : {
1290 : : Oid oid;
1291 : :
2325 alvherre@alvh.no-ip. 1292 : 1763 : oid = get_publication_oid(pubname, missing_ok);
1293 : :
1294 [ + + ]: 1763 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1295 : : }
1296 : :
1297 : : /*
1298 : : * A helper function for pg_get_publication_tables() to check whether the
1299 : : * table with the given relid is published in the specified publication.
1300 : : *
1301 : : * This function evaluates the effective published OID based on the
1302 : : * publish_via_partition_root setting, rather than just checking catalog entries
1303 : : * (e.g., pg_publication_rel). For instance, when publish_via_partition_root is
1304 : : * false, it returns false for a parent partitioned table and returns true
1305 : : * for its leaf partitions, even if the parent is the one explicitly added
1306 : : * to the publication.
1307 : : *
1308 : : * For performance reasons, this function avoids the overhead of constructing
1309 : : * the complete list of published tables during the evaluation. It can execute
1310 : : * quickly even when the publication contains a large number of relations.
1311 : : *
1312 : : * Note: this leaks memory for the ancestors list into the current memory
1313 : : * context.
1314 : : */
1315 : : static bool
33 msawada@postgresql.o 1316 :GNC 982 : is_table_publishable_in_publication(Oid relid, Publication *pub)
1317 : : {
1318 : : bool relispartition;
1319 : 982 : List *ancestors = NIL;
1320 : :
1321 : : /*
1322 : : * For non-pubviaroot publications, a partitioned table is never the
1323 : : * effective published OID; only its leaf partitions can be.
1324 : : */
1325 [ + + + + ]: 982 : if (!pub->pubviaroot && get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE)
1326 : 86 : return false;
1327 : :
1328 : 896 : relispartition = get_rel_relispartition(relid);
1329 : :
1330 [ + + ]: 896 : if (relispartition)
1331 : 170 : ancestors = get_partition_ancestors(relid);
1332 : :
1333 [ + + ]: 896 : if (pub->alltables)
1334 : : {
1335 : : /*
1336 : : * ALL TABLES with pubviaroot includes only regular tables or top-most
1337 : : * partitioned tables -- never child partitions.
1338 : : */
1339 [ + + + + ]: 200 : if (pub->pubviaroot && relispartition)
1340 : 12 : return false;
1341 : :
1342 : : /*
1343 : : * For ALL TABLES publications, the table is published unless it
1344 : : * appears in the EXCEPT clause. Only the top-most can appear in the
1345 : : * EXCEPT clause, so exclusion must be evaluated at the top-most
1346 : : * ancestor if it has. These publications store only EXCEPT'ed tables
1347 : : * in pg_publication_rel, so checking existence is sufficient.
1348 : : *
1349 : : * Note that this existence check below would incorrectly return true
1350 : : * (published) for partitions when pubviaroot is enabled; however,
1351 : : * that case is already caught and returned false by the above check.
1352 : : */
1353 [ + + ]: 188 : return !SearchSysCacheExists2(PUBLICATIONRELMAP,
1354 : : ObjectIdGetDatum(ancestors
1355 : : ? llast_oid(ancestors) : relid),
1356 : : ObjectIdGetDatum(pub->oid));
1357 : : }
1358 : :
1359 : : /*
1360 : : * Non-ALL-TABLE publication cases.
1361 : : *
1362 : : * A table is published if it (or a containing schema) was explicitly
1363 : : * added, or if it is a partition whose ancestor was added.
1364 : : */
1365 : :
1366 : : /*
1367 : : * If an ancestor is published, the partition's status depends on
1368 : : * publish_via_partition_root value.
1369 : : *
1370 : : * If it's true, the ancestor's relation OID is the effective published
1371 : : * OID, so the partition itself should be excluded (return false).
1372 : : *
1373 : : * If it's false, the partition is covered by its ancestor's presence in
1374 : : * the publication, it should be included (return true).
1375 : : */
1376 [ + + + + ]: 836 : if (relispartition &&
1377 : 140 : OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, NULL)))
1378 : 44 : return !pub->pubviaroot;
1379 : :
1380 : : /*
1381 : : * Check whether the table is explicitly published via pg_publication_rel
1382 : : * or pg_publication_namespace.
1383 : : */
1384 : 652 : return (SearchSysCacheExists2(PUBLICATIONRELMAP,
1385 : : ObjectIdGetDatum(relid),
1386 [ + + + + ]: 1022 : ObjectIdGetDatum(pub->oid)) ||
1387 : 370 : SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1388 : : ObjectIdGetDatum(get_rel_namespace(relid)),
1389 : : ObjectIdGetDatum(pub->oid)));
1390 : : }
1391 : :
1392 : : /*
1393 : : * Helper function to get information of the tables in the given
1394 : : * publication(s).
1395 : : *
1396 : : * If filter_by_relid is true, only the row(s) for target_relid is returned;
1397 : : * if target_relid does not exist or is not part of the publications, zero
1398 : : * rows are returned. If filter_by_relid is false, rows for all tables
1399 : : * within the specified publications are returned and target_relid is
1400 : : * ignored.
1401 : : *
1402 : : * Returns pubid, relid, column list, and row filter for each table.
1403 : : */
1404 : : static Datum
1405 : 1592 : pg_get_publication_tables(FunctionCallInfo fcinfo, ArrayType *pubnames,
1406 : : Oid target_relid, bool filter_by_relid,
1407 : : bool pub_missing_ok)
1408 : : {
1409 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1410 : : FuncCallContext *funcctx;
1133 akapila@postgresql.o 1411 :CBC 1592 : List *table_infos = NIL;
1412 : :
1413 : : /* stuff done only on the first call of the function */
3393 peter_e@gmx.net 1414 [ + + ]: 1592 : if (SRF_IS_FIRSTCALL())
1415 : : {
1416 : : TupleDesc tupdesc;
1417 : : MemoryContext oldcontext;
1418 : : Datum *elems;
1419 : : int nelems,
1420 : : i;
1133 akapila@postgresql.o 1421 : 750 : bool viaroot = false;
1422 : :
1423 : : /* create a function context for cross-call persistence */
3393 peter_e@gmx.net 1424 : 750 : funcctx = SRF_FIRSTCALL_INIT();
1425 : :
1426 : : /*
1427 : : * Preliminary check if the specified table can be published in the
1428 : : * first place. If not, we can return early without checking the given
1429 : : * publications and the table.
1430 : : */
33 msawada@postgresql.o 1431 [ + + + + ]:GNC 750 : if (filter_by_relid && !is_publishable_table(target_relid))
1432 : 8 : SRF_RETURN_DONE(funcctx);
1433 : :
1434 : : /* switch to memory context appropriate for multiple function calls */
3393 peter_e@gmx.net 1435 :CBC 742 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1436 : :
1437 : : /*
1438 : : * Deconstruct the parameter into elements where each element is a
1439 : : * publication name.
1440 : : */
33 msawada@postgresql.o 1441 :GNC 742 : deconstruct_array_builtin(pubnames, TEXTOID, &elems, NULL, &nelems);
1442 : :
1443 : : /* Get Oids of tables from each publication. */
1133 akapila@postgresql.o 1444 [ + + ]:CBC 1982 : for (i = 0; i < nelems; i++)
1445 : : {
1446 : : Publication *pub_elem;
1447 : 1240 : List *pub_elem_tables = NIL;
1448 : : ListCell *lc;
1449 : :
33 msawada@postgresql.o 1450 :GNC 1240 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]),
1451 : : pub_missing_ok);
1452 : :
1453 [ + + ]: 1240 : if (pub_elem == NULL)
1454 : 6 : continue;
1455 : :
1456 [ + + ]: 1234 : if (filter_by_relid)
1457 : : {
1458 : : /* Check if the given table is published for the publication */
1459 [ + + ]: 982 : if (is_table_publishable_in_publication(target_relid, pub_elem))
1460 : : {
1461 : 496 : pub_elem_tables = list_make1_oid(target_relid);
1462 : : }
1463 : : }
1464 : : else
1465 : : {
1466 : : /*
1467 : : * Publications support partitioned tables. If
1468 : : * publish_via_partition_root is false, all changes are
1469 : : * replicated using leaf partition identity and schema, so we
1470 : : * only need those. Otherwise, get the partitioned table
1471 : : * itself.
1472 : : */
1473 [ + + ]: 252 : if (pub_elem->alltables)
1474 : 49 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1475 : : RELKIND_RELATION,
1476 : 49 : pub_elem->pubviaroot);
1477 : : else
1478 : : {
1479 : : List *relids,
1480 : : *schemarelids;
1481 : :
1482 : 203 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1483 : 203 : pub_elem->pubviaroot ?
1484 : 203 : PUBLICATION_PART_ROOT :
1485 : : PUBLICATION_PART_LEAF);
1486 : 203 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1487 : 203 : pub_elem->pubviaroot ?
1488 : 203 : PUBLICATION_PART_ROOT :
1489 : : PUBLICATION_PART_LEAF);
1490 : 203 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1491 : : }
1492 : : }
1493 : :
1494 : : /*
1495 : : * Record the published table and the corresponding publication so
1496 : : * that we can get row filters and column lists later.
1497 : : *
1498 : : * When a table is published by multiple publications, to obtain
1499 : : * all row filters and column lists, the structure related to this
1500 : : * table will be recorded multiple times.
1501 : : */
1133 akapila@postgresql.o 1502 [ + + + + :CBC 2106 : foreach(lc, pub_elem_tables)
+ + ]
1503 : : {
146 michael@paquier.xyz 1504 :GNC 872 : published_rel *table_info = palloc_object(published_rel);
1505 : :
1133 akapila@postgresql.o 1506 :CBC 872 : table_info->relid = lfirst_oid(lc);
1507 : 872 : table_info->pubid = pub_elem->oid;
1508 : 872 : table_infos = lappend(table_infos, table_info);
1509 : : }
1510 : :
1511 : : /* At least one publication is using publish_via_partition_root. */
1512 [ + + ]: 1234 : if (pub_elem->pubviaroot)
1513 : 247 : viaroot = true;
1514 : : }
1515 : :
1516 : : /*
1517 : : * If the publication publishes partition changes via their respective
1518 : : * root partitioned tables, we must exclude partitions in favor of
1519 : : * including the root partitioned tables. Otherwise, the function
1520 : : * could return both the child and parent tables which could cause
1521 : : * data of the child table to be double-published on the subscriber
1522 : : * side.
1523 : : */
1524 [ + + ]: 742 : if (viaroot)
1525 : 167 : filter_partitions(table_infos);
1526 : :
1527 : : /* Construct a tuple descriptor for the result rows. */
1412 michael@paquier.xyz 1528 : 742 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1133 akapila@postgresql.o 1529 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1530 : : OIDOID, -1, 0);
1531 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1532 : : OIDOID, -1, 0);
1533 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1534 : : INT2VECTOROID, -1, 0);
1535 : 742 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1536 : : PG_NODE_TREEOID, -1, 0);
1537 : :
50 drowley@postgresql.o 1538 :GNC 742 : TupleDescFinalize(tupdesc);
1447 akapila@postgresql.o 1539 :CBC 742 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
523 peter@eisentraut.org 1540 : 742 : funcctx->user_fctx = table_infos;
1541 : :
3393 peter_e@gmx.net 1542 : 742 : MemoryContextSwitchTo(oldcontext);
1543 : : }
1544 : :
1545 : : /* stuff done on every call of the function */
1546 : 1584 : funcctx = SRF_PERCALL_SETUP();
1133 akapila@postgresql.o 1547 : 1584 : table_infos = (List *) funcctx->user_fctx;
1548 : :
1549 [ + + ]: 1584 : if (funcctx->call_cntr < list_length(table_infos))
1550 : : {
1447 1551 : 842 : HeapTuple pubtuple = NULL;
1552 : : HeapTuple rettuple;
1553 : : Publication *pub;
1133 1554 : 842 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1555 : 842 : Oid relid = table_info->relid;
1320 1556 : 842 : Oid schemaid = get_rel_namespace(relid);
1389 peter@eisentraut.org 1557 : 842 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1558 : 842 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1559 : :
1560 : : /*
1561 : : * Form tuple with appropriate data.
1562 : : */
1563 : :
1133 akapila@postgresql.o 1564 : 842 : pub = GetPublication(table_info->pubid);
1565 : :
1566 : 842 : values[0] = ObjectIdGetDatum(pub->oid);
1567 : 842 : values[1] = ObjectIdGetDatum(relid);
1568 : :
1569 : : /*
1570 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1571 : : * FOR TABLES IN SCHEMA publications.
1572 : : */
1573 [ + + ]: 842 : if (!pub->alltables &&
1320 1574 [ + + ]: 573 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1575 : : ObjectIdGetDatum(schemaid),
1576 : : ObjectIdGetDatum(pub->oid)))
1577 : 537 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1578 : : ObjectIdGetDatum(relid),
1579 : : ObjectIdGetDatum(pub->oid));
1580 : :
1447 1581 [ + + ]: 842 : if (HeapTupleIsValid(pubtuple))
1582 : : {
1583 : : /* Lookup the column list attribute. */
1133 1584 : 498 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1585 : : Anum_pg_publication_rel_prattrs,
1586 : : &(nulls[2]));
1587 : :
1588 : : /* Null indicates no filter. */
1589 : 498 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1590 : : Anum_pg_publication_rel_prqual,
1591 : : &(nulls[3]));
1592 : : }
1593 : : else
1594 : : {
1447 1595 : 344 : nulls[2] = true;
1133 1596 : 344 : nulls[3] = true;
1597 : : }
1598 : :
1599 : : /* Show all columns when the column list is not specified. */
1600 [ + + ]: 842 : if (nulls[2])
1601 : : {
1208 1602 : 746 : Relation rel = table_open(relid, AccessShareLock);
1603 : 746 : int nattnums = 0;
1604 : : int16 *attnums;
1605 : 746 : TupleDesc desc = RelationGetDescr(rel);
1606 : : int i;
1607 : :
146 michael@paquier.xyz 1608 :GNC 746 : attnums = palloc_array(int16, desc->natts);
1609 : :
1208 akapila@postgresql.o 1610 [ + + ]:CBC 1974 : for (i = 0; i < desc->natts; i++)
1611 : : {
1612 : 1228 : Form_pg_attribute att = TupleDescAttr(desc, i);
1613 : :
467 1614 [ + + ]: 1228 : if (att->attisdropped)
1208 1615 : 4 : continue;
1616 : :
467 1617 [ + + ]: 1224 : if (att->attgenerated)
1618 : : {
1619 : : /* We only support replication of STORED generated cols. */
1620 [ + + ]: 22 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1621 : 10 : continue;
1622 : :
1623 : : /*
1624 : : * User hasn't requested to replicate STORED generated
1625 : : * cols.
1626 : : */
1627 [ + + ]: 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1628 : 9 : continue;
1629 : : }
1630 : :
1208 1631 : 1205 : attnums[nattnums++] = att->attnum;
1632 : : }
1633 : :
1634 [ + + ]: 746 : if (nattnums > 0)
1635 : : {
1133 1636 : 742 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1637 : 742 : nulls[2] = false;
1638 : : }
1639 : :
1208 1640 : 746 : table_close(rel, AccessShareLock);
1641 : : }
1642 : :
1447 1643 : 842 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1644 : :
1645 : 842 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1646 : : }
1647 : :
3393 peter_e@gmx.net 1648 : 742 : SRF_RETURN_DONE(funcctx);
1649 : : }
1650 : :
1651 : : Datum
33 msawada@postgresql.o 1652 :GNC 550 : pg_get_publication_tables_a(PG_FUNCTION_ARGS)
1653 : : {
1654 : : /*
1655 : : * Get information for all tables in the given publications.
1656 : : * filter_by_relid is false so all tables are returned; pub_missing_ok is
1657 : : * false for backward compatibility.
1658 : : */
1659 : 550 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1660 : : InvalidOid, false, false);
1661 : : }
1662 : :
1663 : : Datum
1664 : 1042 : pg_get_publication_tables_b(PG_FUNCTION_ARGS)
1665 : : {
1666 : : /*
1667 : : * Get information for the specified table in the given publications. The
1668 : : * SQL-level function is declared STRICT, so target_relid is guaranteed to
1669 : : * be non-NULL here.
1670 : : */
1671 : 1042 : return pg_get_publication_tables(fcinfo, PG_GETARG_ARRAYTYPE_P(0),
1672 : : PG_GETARG_OID(1), true, true);
1673 : : }
1674 : :
1675 : : /*
1676 : : * Returns Oids of sequences in a publication.
1677 : : */
1678 : : Datum
208 akapila@postgresql.o 1679 : 234 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1680 : : {
1681 : : FuncCallContext *funcctx;
1682 : 234 : List *sequences = NIL;
1683 : :
1684 : : /* stuff done only on the first call of the function */
1685 [ + + ]: 234 : if (SRF_IS_FIRSTCALL())
1686 : : {
1687 : 217 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1688 : : Publication *publication;
1689 : : MemoryContext oldcontext;
1690 : :
1691 : : /* create a function context for cross-call persistence */
1692 : 217 : funcctx = SRF_FIRSTCALL_INIT();
1693 : :
1694 : : /* switch to memory context appropriate for multiple function calls */
1695 : 217 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1696 : :
1697 : 217 : publication = GetPublicationByName(pubname, false);
1698 : :
1699 [ + + ]: 217 : if (publication->allsequences)
62 1700 : 6 : sequences = GetAllPublicationRelations(publication->oid,
1701 : : RELKIND_SEQUENCE,
1702 : : false);
1703 : :
165 peter@eisentraut.org 1704 : 217 : funcctx->user_fctx = sequences;
1705 : :
208 akapila@postgresql.o 1706 : 217 : MemoryContextSwitchTo(oldcontext);
1707 : : }
1708 : :
1709 : : /* stuff done on every call of the function */
1710 : 234 : funcctx = SRF_PERCALL_SETUP();
1711 : 234 : sequences = (List *) funcctx->user_fctx;
1712 : :
1713 [ + + ]: 234 : if (funcctx->call_cntr < list_length(sequences))
1714 : : {
1715 : 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1716 : :
1717 : 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1718 : : }
1719 : :
1720 : 217 : SRF_RETURN_DONE(funcctx);
1721 : : }
|