Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : /*
52 : : * Check if relation can be in given publication and throws appropriate
53 : : * error if not.
54 : : */
55 : : static void
3203 peter_e@gmx.net 56 :CBC 563 : check_publication_add_relation(Relation targetrel)
57 : : {
58 : : /* Must be a regular or partitioned table */
2057 peter@eisentraut.org 59 [ + + ]: 563 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
1299 tomas.vondra@postgre 60 [ + + ]: 72 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
3203 peter_e@gmx.net 61 [ + - ]: 7 : ereport(ERROR,
62 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63 : : errmsg("cannot add relation \"%s\" to publication",
64 : : RelationGetRelationName(targetrel)),
65 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
66 : :
67 : : /* Can't be system table */
68 [ + + ]: 556 : if (IsCatalogRelation(targetrel))
69 [ + - ]: 3 : ereport(ERROR,
70 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : : errmsg("cannot add relation \"%s\" to publication",
72 : : RelationGetRelationName(targetrel)),
73 : : errdetail("This operation is not supported for system tables.")));
74 : :
75 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
1440 dgustafsson@postgres 76 [ + + ]: 553 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3203 peter_e@gmx.net 77 [ + - ]: 3 : ereport(ERROR,
78 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : : errmsg("cannot add relation \"%s\" to publication",
80 : : RelationGetRelationName(targetrel)),
81 : : errdetail("This operation is not supported for temporary tables.")));
1440 dgustafsson@postgres 82 [ + + ]: 550 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
83 [ + - ]: 3 : ereport(ERROR,
84 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : : errmsg("cannot add relation \"%s\" to publication",
86 : : RelationGetRelationName(targetrel)),
87 : : errdetail("This operation is not supported for unlogged tables.")));
3203 peter_e@gmx.net 88 : 547 : }
89 : :
90 : : /*
91 : : * Check if schema can be in given publication and throw appropriate error if
92 : : * not.
93 : : */
94 : : static void
1461 akapila@postgresql.o 95 : 122 : check_publication_add_schema(Oid schemaid)
96 : : {
97 : : /* Can't be system namespace */
98 [ + + - + ]: 122 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
99 [ + - ]: 3 : ereport(ERROR,
100 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
101 : : errmsg("cannot add schema \"%s\" to publication",
102 : : get_namespace_name(schemaid)),
103 : : errdetail("This operation is not supported for system schemas.")));
104 : :
105 : : /* Can't be temporary namespace */
106 [ - + ]: 119 : if (isAnyTempNamespace(schemaid))
1461 akapila@postgresql.o 107 [ # # ]:UBC 0 : ereport(ERROR,
108 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : : errmsg("cannot add schema \"%s\" to publication",
110 : : get_namespace_name(schemaid)),
111 : : errdetail("Temporary schemas cannot be replicated.")));
1461 akapila@postgresql.o 112 :CBC 119 : }
113 : :
114 : : /*
115 : : * Returns if relation represented by oid and Form_pg_class entry
116 : : * is publishable.
117 : : *
118 : : * Does same checks as check_publication_add_relation() above except for
119 : : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
120 : : * not throw errors. Here, the additional check is to support ALL SEQUENCES
121 : : * publication.
122 : : *
123 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
124 : : * ie all tables created during initdb. This mainly affects the preinstalled
125 : : * information_schema. IsCatalogRelationOid() only excludes tables with
126 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
127 : : * but really we should get rid of the FirstNormalObjectId test not
128 : : * IsCatalogRelationOid. We can't do so today because we don't want
129 : : * information_schema tables to be considered publishable; but this test
130 : : * is really inadequate for that, since the information_schema could be
131 : : * dropped and reloaded and then it'll be considered publishable. The best
132 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
133 : : * and depend on that instead of OID checks.
134 : : */
135 : : static bool
3203 peter_e@gmx.net 136 : 299709 : is_publishable_class(Oid relid, Form_pg_class reltuple)
137 : : {
2057 peter@eisentraut.org 138 : 305852 : return (reltuple->relkind == RELKIND_RELATION ||
18 akapila@postgresql.o 139 [ + + ]:GNC 6143 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
140 [ + + ]: 5391 : reltuple->relkind == RELKIND_SEQUENCE) &&
2364 tgl@sss.pgh.pa.us 141 [ + + ]:CBC 295334 : !IsCatalogRelationOid(relid) &&
3203 peter_e@gmx.net 142 [ + + + + : 599418 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
143 : : relid >= FirstNormalObjectId;
144 : : }
145 : :
146 : : /*
147 : : * Another variant of is_publishable_class(), taking a Relation.
148 : : */
149 : : bool
1186 akapila@postgresql.o 150 : 271367 : is_publishable_relation(Relation rel)
151 : : {
152 : 271367 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
153 : : }
154 : :
155 : : /*
156 : : * SQL-callable variant of the above
157 : : *
158 : : * This returns null when the relation does not exist. This is intended to be
159 : : * used for example in psql to avoid gratuitous errors when there are
160 : : * concurrent catalog changes.
161 : : */
162 : : Datum
1182 163 : 3118 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
164 : : {
165 : 3118 : Oid relid = PG_GETARG_OID(0);
166 : : HeapTuple tuple;
167 : : bool result;
168 : :
169 : 3118 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
170 [ - + ]: 3118 : if (!HeapTupleIsValid(tuple))
1182 akapila@postgresql.o 171 :UBC 0 : PG_RETURN_NULL();
1182 akapila@postgresql.o 172 :CBC 3118 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
173 : 3118 : ReleaseSysCache(tuple);
174 : 3118 : PG_RETURN_BOOL(result);
175 : : }
176 : :
177 : : /*
178 : : * Returns true if the ancestor is in the list of published relations.
179 : : * Otherwise, returns false.
180 : : */
181 : : static bool
943 182 : 101 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
183 : : {
184 : : ListCell *lc;
185 : :
186 [ + - + + : 347 : foreach(lc, table_infos)
+ + ]
187 : : {
188 : 286 : Oid relid = ((published_rel *) lfirst(lc))->relid;
189 : :
190 [ + + ]: 286 : if (relid == ancestor)
191 : 40 : return true;
192 : : }
193 : :
194 : 61 : return false;
195 : : }
196 : :
197 : : /*
198 : : * Filter out the partitions whose parent tables are also present in the list.
199 : : */
200 : : static void
201 : 176 : filter_partitions(List *table_infos)
202 : : {
203 : : ListCell *lc;
204 : :
205 [ + - + + : 517 : foreach(lc, table_infos)
+ + ]
206 : : {
1461 207 : 341 : bool skip = false;
208 : 341 : List *ancestors = NIL;
209 : : ListCell *lc2;
943 210 : 341 : published_rel *table_info = (published_rel *) lfirst(lc);
211 : :
212 [ + + ]: 341 : if (get_rel_relispartition(table_info->relid))
213 : 101 : ancestors = get_partition_ancestors(table_info->relid);
214 : :
1461 215 [ + + + + : 402 : foreach(lc2, ancestors)
+ + ]
216 : : {
217 : 101 : Oid ancestor = lfirst_oid(lc2);
218 : :
943 219 [ + + ]: 101 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
220 : : {
1461 221 : 40 : skip = true;
222 : 40 : break;
223 : : }
224 : : }
225 : :
943 226 [ + + ]: 341 : if (skip)
227 : 40 : table_infos = foreach_delete_current(table_infos, lc);
228 : : }
1461 229 : 176 : }
230 : :
231 : : /*
232 : : * Returns true if any schema is associated with the publication, false if no
233 : : * schema is associated with the publication.
234 : : */
235 : : bool
1419 236 : 142 : is_schema_publication(Oid pubid)
237 : : {
238 : : Relation pubschsrel;
239 : : ScanKeyData scankey;
240 : : SysScanDesc scan;
241 : : HeapTuple tup;
242 : 142 : bool result = false;
243 : :
244 : 142 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
245 : 142 : ScanKeyInit(&scankey,
246 : : Anum_pg_publication_namespace_pnpubid,
247 : : BTEqualStrategyNumber, F_OIDEQ,
248 : : ObjectIdGetDatum(pubid));
249 : :
250 : 142 : scan = systable_beginscan(pubschsrel,
251 : : PublicationNamespacePnnspidPnpubidIndexId,
252 : : true, NULL, 1, &scankey);
253 : 142 : tup = systable_getnext(scan);
254 : 142 : result = HeapTupleIsValid(tup);
255 : :
256 : 142 : systable_endscan(scan);
257 : 142 : table_close(pubschsrel, AccessShareLock);
258 : :
259 : 142 : return result;
260 : : }
261 : :
262 : : /*
263 : : * Returns true if the relation has column list associated with the
264 : : * publication, false otherwise.
265 : : *
266 : : * If a column list is found, the corresponding bitmap is returned through the
267 : : * cols parameter, if provided. The bitmap is constructed within the given
268 : : * memory context (mcxt).
269 : : */
270 : : bool
354 271 : 793 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
272 : : Bitmapset **cols)
273 : : {
274 : : HeapTuple cftuple;
275 : 793 : bool found = false;
276 : :
277 [ + + ]: 793 : if (pub->alltables)
278 : 190 : return false;
279 : :
280 : 603 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
281 : : ObjectIdGetDatum(relid),
282 : : ObjectIdGetDatum(pub->oid));
283 [ + + ]: 603 : if (HeapTupleIsValid(cftuple))
284 : : {
285 : : Datum cfdatum;
286 : : bool isnull;
287 : :
288 : : /* Lookup the column list attribute. */
289 : 551 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
290 : : Anum_pg_publication_rel_prattrs, &isnull);
291 : :
292 : : /* Was a column list found? */
293 [ + + ]: 551 : if (!isnull)
294 : : {
295 : : /* Build the column list bitmap in the given memory context. */
296 [ + + ]: 157 : if (cols)
297 : 154 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
298 : :
299 : 157 : found = true;
300 : : }
301 : :
302 : 551 : ReleaseSysCache(cftuple);
303 : : }
304 : :
305 : 603 : return found;
306 : : }
307 : :
308 : : /*
309 : : * Gets the relations based on the publication partition option for a specified
310 : : * relation.
311 : : */
312 : : List *
1496 313 : 2914 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
314 : : Oid relid)
315 : : {
316 [ + + + + ]: 2914 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
317 : : pub_partopt != PUBLICATION_PART_ROOT)
318 : 594 : {
319 : 594 : List *all_parts = find_all_inheritors(relid, NoLock,
320 : : NULL);
321 : :
322 [ + + ]: 594 : if (pub_partopt == PUBLICATION_PART_ALL)
323 : 493 : result = list_concat(result, all_parts);
324 [ + - ]: 101 : else if (pub_partopt == PUBLICATION_PART_LEAF)
325 : : {
326 : : ListCell *lc;
327 : :
328 [ + - + + : 369 : foreach(lc, all_parts)
+ + ]
329 : : {
330 : 268 : Oid partOid = lfirst_oid(lc);
331 : :
332 [ + + ]: 268 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
333 : 167 : result = lappend_oid(result, partOid);
334 : : }
335 : : }
336 : : else
1496 akapila@postgresql.o 337 :UBC 0 : Assert(false);
338 : : }
339 : : else
1496 akapila@postgresql.o 340 :CBC 2320 : result = lappend_oid(result, relid);
341 : :
342 : 2914 : return result;
343 : : }
344 : :
345 : : /*
346 : : * Returns the relid of the topmost ancestor that is published via this
347 : : * publication if any and set its ancestor level to ancestor_level,
348 : : * otherwise returns InvalidOid.
349 : : *
350 : : * The ancestor_level value allows us to compare the results for multiple
351 : : * publications, and decide which value is higher up.
352 : : *
353 : : * Note that the list of ancestors should be ordered such that the topmost
354 : : * ancestor is at the end of the list.
355 : : */
356 : : Oid
1321 tomas.vondra@postgre 357 : 255 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
358 : : {
359 : : ListCell *lc;
1343 akapila@postgresql.o 360 : 255 : Oid topmost_relid = InvalidOid;
1321 tomas.vondra@postgre 361 : 255 : int level = 0;
362 : :
363 : : /*
364 : : * Find the "topmost" ancestor that is in this publication.
365 : : */
1343 akapila@postgresql.o 366 [ + - + + : 518 : foreach(lc, ancestors)
+ + ]
367 : : {
368 : 263 : Oid ancestor = lfirst_oid(lc);
369 : 263 : List *apubids = GetRelationPublications(ancestor);
370 : 263 : List *aschemaPubids = NIL;
371 : :
1321 tomas.vondra@postgre 372 : 263 : level++;
373 : :
1343 akapila@postgresql.o 374 [ + + ]: 263 : if (list_member_oid(apubids, puboid))
375 : : {
376 : 156 : topmost_relid = ancestor;
377 : :
1321 tomas.vondra@postgre 378 [ + + ]: 156 : if (ancestor_level)
379 : 43 : *ancestor_level = level;
380 : : }
381 : : else
382 : : {
1299 383 : 107 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
1343 akapila@postgresql.o 384 [ + + ]: 107 : if (list_member_oid(aschemaPubids, puboid))
385 : : {
386 : 5 : topmost_relid = ancestor;
387 : :
1321 tomas.vondra@postgre 388 [ + - ]: 5 : if (ancestor_level)
389 : 5 : *ancestor_level = level;
390 : : }
391 : : }
392 : :
1343 akapila@postgresql.o 393 : 263 : list_free(apubids);
394 : 263 : list_free(aschemaPubids);
395 : : }
396 : :
397 : 255 : return topmost_relid;
398 : : }
399 : :
400 : : /*
401 : : * attnumstoint2vector
402 : : * Convert a Bitmapset of AttrNumbers into an int2vector.
403 : : *
404 : : * AttrNumber numbers are 0-based, i.e., not offset by
405 : : * FirstLowInvalidHeapAttributeNumber.
406 : : */
407 : : static int2vector *
438 drowley@postgresql.o 408 : 166 : attnumstoint2vector(Bitmapset *attrs)
409 : : {
410 : : int2vector *result;
411 : 166 : int n = bms_num_members(attrs);
412 : 166 : int i = -1;
413 : 166 : int j = 0;
414 : :
415 : 166 : result = buildint2vector(NULL, n);
416 : :
417 [ + + ]: 452 : while ((i = bms_next_member(attrs, i)) >= 0)
418 : : {
419 [ - + ]: 286 : Assert(i <= PG_INT16_MAX);
420 : :
421 : 286 : result->values[j++] = (int16) i;
422 : : }
423 : :
424 : 166 : return result;
425 : : }
426 : :
427 : : /*
428 : : * Insert new publication / relation mapping.
429 : : */
430 : : ObjectAddress
1343 akapila@postgresql.o 431 : 574 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
432 : : bool if_not_exists)
433 : : {
434 : : Relation rel;
435 : : HeapTuple tup;
436 : : Datum values[Natts_pg_publication_rel];
437 : : bool nulls[Natts_pg_publication_rel];
438 : 574 : Relation targetrel = pri->relation;
439 : 574 : Oid relid = RelationGetRelid(targetrel);
440 : : Oid pubreloid;
441 : : Bitmapset *attnums;
3203 peter_e@gmx.net 442 : 574 : Publication *pub = GetPublication(pubid);
443 : : ObjectAddress myself,
444 : : referenced;
1496 akapila@postgresql.o 445 : 574 : List *relids = NIL;
446 : : int i;
447 : :
2471 andres@anarazel.de 448 : 574 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
449 : :
450 : : /*
451 : : * Check for duplicates. Note that this does not really prevent
452 : : * duplicates, it's here just to provide nicer error message in common
453 : : * case. The real protection is the unique key on the catalog.
454 : : */
3203 peter_e@gmx.net 455 [ + + ]: 574 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
456 : : ObjectIdGetDatum(pubid)))
457 : : {
2471 andres@anarazel.de 458 : 11 : table_close(rel, RowExclusiveLock);
459 : :
3203 peter_e@gmx.net 460 [ + + ]: 11 : if (if_not_exists)
461 : 8 : return InvalidObjectAddress;
462 : :
463 [ + - ]: 3 : ereport(ERROR,
464 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
465 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
466 : : RelationGetRelationName(targetrel), pub->name)));
467 : : }
468 : :
1343 akapila@postgresql.o 469 : 563 : check_publication_add_relation(targetrel);
470 : :
471 : : /* Validate and translate column names into a Bitmapset of attnums. */
438 drowley@postgresql.o 472 : 547 : attnums = pub_collist_validate(pri->relation, pri->columns);
473 : :
474 : : /* Form a tuple. */
3203 peter_e@gmx.net 475 : 535 : memset(values, 0, sizeof(values));
476 : 535 : memset(nulls, false, sizeof(nulls));
477 : :
1397 alvherre@alvh.no-ip. 478 : 535 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
479 : : Anum_pg_publication_rel_oid);
480 : 535 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
3203 peter_e@gmx.net 481 : 535 : values[Anum_pg_publication_rel_prpubid - 1] =
482 : 535 : ObjectIdGetDatum(pubid);
483 : 535 : values[Anum_pg_publication_rel_prrelid - 1] =
484 : 535 : ObjectIdGetDatum(relid);
485 : :
486 : : /* Add qualifications, if available */
1343 akapila@postgresql.o 487 [ + + ]: 535 : if (pri->whereClause != NULL)
488 : 165 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
489 : : else
490 : 370 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
491 : :
492 : : /* Add column list, if available */
1311 tomas.vondra@postgre 493 [ + + ]: 535 : if (pri->columns)
438 drowley@postgresql.o 494 : 166 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
495 : : else
1311 tomas.vondra@postgre 496 : 369 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
497 : :
3203 peter_e@gmx.net 498 : 535 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
499 : :
500 : : /* Insert tuple into catalog. */
2533 andres@anarazel.de 501 : 535 : CatalogTupleInsert(rel, tup);
3203 peter_e@gmx.net 502 : 535 : heap_freetuple(tup);
503 : :
504 : : /* Register dependencies as needed */
1397 alvherre@alvh.no-ip. 505 : 535 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
506 : :
507 : : /* Add dependency on the publication */
3203 peter_e@gmx.net 508 : 535 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
509 : 535 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
510 : :
511 : : /* Add dependency on the relation */
512 : 535 : ObjectAddressSet(referenced, RelationRelationId, relid);
513 : 535 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
514 : :
515 : : /* Add dependency on the objects mentioned in the qualifications */
1343 akapila@postgresql.o 516 [ + + ]: 535 : if (pri->whereClause)
517 : 165 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
518 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
519 : : false);
520 : :
521 : : /* Add dependency on the columns, if any are listed */
438 drowley@postgresql.o 522 : 535 : i = -1;
523 [ + + ]: 821 : while ((i = bms_next_member(attnums, i)) >= 0)
524 : : {
525 : 286 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
1311 tomas.vondra@postgre 526 : 286 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
527 : : }
528 : :
529 : : /* Close the table. */
2471 andres@anarazel.de 530 : 535 : table_close(rel, RowExclusiveLock);
531 : :
532 : : /*
533 : : * Invalidate relcache so that publication info is rebuilt.
534 : : *
535 : : * For the partitioned tables, we must invalidate all partitions contained
536 : : * in the respective partition hierarchies, not just the one explicitly
537 : : * mentioned in the publication. This is required because we implicitly
538 : : * publish the child tables when the parent table is published.
539 : : */
1496 akapila@postgresql.o 540 : 535 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
541 : : relid);
542 : :
543 : 535 : InvalidatePublicationRels(relids);
544 : :
3203 peter_e@gmx.net 545 : 535 : return myself;
546 : : }
547 : :
548 : : /*
549 : : * pub_collist_validate
550 : : * Process and validate the 'columns' list and ensure the columns are all
551 : : * valid to use for a publication. Checks for and raises an ERROR for
552 : : * any unknown columns, system columns, duplicate columns, or virtual
553 : : * generated columns.
554 : : *
555 : : * Looks up each column's attnum and returns a 0-based Bitmapset of the
556 : : * corresponding attnums.
557 : : */
558 : : Bitmapset *
438 drowley@postgresql.o 559 : 749 : pub_collist_validate(Relation targetrel, List *columns)
560 : : {
1311 tomas.vondra@postgre 561 : 749 : Bitmapset *set = NULL;
562 : : ListCell *lc;
262 peter@eisentraut.org 563 : 749 : TupleDesc tupdesc = RelationGetDescr(targetrel);
564 : :
1311 tomas.vondra@postgre 565 [ + + + + : 1171 : foreach(lc, columns)
+ + ]
566 : : {
567 : 440 : char *colname = strVal(lfirst(lc));
568 : 440 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
569 : :
570 [ + + ]: 440 : if (attnum == InvalidAttrNumber)
571 [ + - ]: 3 : ereport(ERROR,
572 : : errcode(ERRCODE_UNDEFINED_COLUMN),
573 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
574 : : colname, RelationGetRelationName(targetrel)));
575 : :
576 [ + + ]: 437 : if (!AttrNumberIsForUserDefinedAttr(attnum))
577 [ + - ]: 6 : ereport(ERROR,
578 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
579 : : errmsg("cannot use system column \"%s\" in publication column list",
580 : : colname));
581 : :
262 peter@eisentraut.org 582 [ + + ]: 431 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
583 [ + - ]: 3 : ereport(ERROR,
584 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
585 : : errmsg("cannot use virtual generated column \"%s\" in publication column list",
586 : : colname));
587 : :
1311 tomas.vondra@postgre 588 [ + + ]: 428 : if (bms_is_member(attnum, set))
589 [ + - ]: 6 : ereport(ERROR,
590 : : errcode(ERRCODE_DUPLICATE_OBJECT),
591 : : errmsg("duplicate column \"%s\" in publication column list",
592 : : colname));
593 : :
594 : 422 : set = bms_add_member(set, attnum);
595 : : }
596 : :
438 drowley@postgresql.o 597 : 731 : return set;
598 : : }
599 : :
600 : : /*
601 : : * Transform a column list (represented by an array Datum) to a bitmapset.
602 : : *
603 : : * If columns isn't NULL, add the column numbers to that set.
604 : : *
605 : : * If mcxt isn't NULL, build the bitmapset in that context.
606 : : */
607 : : Bitmapset *
1311 tomas.vondra@postgre 608 : 222 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
609 : : {
438 drowley@postgresql.o 610 : 222 : Bitmapset *result = columns;
611 : : ArrayType *arr;
612 : : int nelems;
613 : : int16 *elems;
1264 tgl@sss.pgh.pa.us 614 : 222 : MemoryContext oldcxt = NULL;
615 : :
1311 tomas.vondra@postgre 616 : 222 : arr = DatumGetArrayTypeP(pubcols);
617 : 222 : nelems = ARR_DIMS(arr)[0];
618 [ - + ]: 222 : elems = (int16 *) ARR_DATA_PTR(arr);
619 : :
620 : : /* If a memory context was specified, switch to it. */
621 [ + + ]: 222 : if (mcxt)
622 : 39 : oldcxt = MemoryContextSwitchTo(mcxt);
623 : :
624 [ + + ]: 613 : for (int i = 0; i < nelems; i++)
625 : 391 : result = bms_add_member(result, elems[i]);
626 : :
627 [ + + ]: 222 : if (mcxt)
628 : 39 : MemoryContextSwitchTo(oldcxt);
629 : :
630 : 222 : return result;
631 : : }
632 : :
633 : : /*
634 : : * Returns a bitmap representing the columns of the specified table.
635 : : *
636 : : * Generated columns are included if include_gencols_type is
637 : : * PUBLISH_GENCOLS_STORED.
638 : : */
639 : : Bitmapset *
277 akapila@postgresql.o 640 : 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
641 : : {
354 642 : 9 : Bitmapset *result = NULL;
643 : 9 : TupleDesc desc = RelationGetDescr(relation);
644 : :
645 [ + + ]: 30 : for (int i = 0; i < desc->natts; i++)
646 : : {
647 : 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
648 : :
277 649 [ + + ]: 21 : if (att->attisdropped)
354 650 : 1 : continue;
651 : :
277 652 [ + + ]: 20 : if (att->attgenerated)
653 : : {
654 : : /* We only support replication of STORED generated cols. */
655 [ + + ]: 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
656 : 1 : continue;
657 : :
658 : : /* User hasn't requested to replicate STORED generated cols. */
659 [ + - ]: 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
660 : 1 : continue;
661 : : }
662 : :
354 663 : 18 : result = bms_add_member(result, att->attnum);
664 : : }
665 : :
666 : 9 : return result;
667 : : }
668 : :
669 : : /*
670 : : * Insert new publication / schema mapping.
671 : : */
672 : : ObjectAddress
1299 tomas.vondra@postgre 673 : 131 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
674 : : {
675 : : Relation rel;
676 : : HeapTuple tup;
677 : : Datum values[Natts_pg_publication_namespace];
678 : : bool nulls[Natts_pg_publication_namespace];
679 : : Oid psschid;
1461 akapila@postgresql.o 680 : 131 : Publication *pub = GetPublication(pubid);
681 : 131 : List *schemaRels = NIL;
682 : : ObjectAddress myself,
683 : : referenced;
684 : :
685 : 131 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
686 : :
687 : : /*
688 : : * Check for duplicates. Note that this does not really prevent
689 : : * duplicates, it's here just to provide nicer error message in common
690 : : * case. The real protection is the unique key on the catalog.
691 : : */
1299 tomas.vondra@postgre 692 [ + + ]: 131 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
693 : : ObjectIdGetDatum(schemaid),
694 : : ObjectIdGetDatum(pubid)))
695 : : {
1461 akapila@postgresql.o 696 : 9 : table_close(rel, RowExclusiveLock);
697 : :
698 [ + + ]: 9 : if (if_not_exists)
699 : 6 : return InvalidObjectAddress;
700 : :
701 [ + - ]: 3 : ereport(ERROR,
702 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
703 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
704 : : get_namespace_name(schemaid), pub->name)));
705 : : }
706 : :
707 : 122 : check_publication_add_schema(schemaid);
708 : :
709 : : /* Form a tuple */
710 : 119 : memset(values, 0, sizeof(values));
711 : 119 : memset(nulls, false, sizeof(nulls));
712 : :
713 : 119 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
714 : : Anum_pg_publication_namespace_oid);
715 : 119 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
716 : 119 : values[Anum_pg_publication_namespace_pnpubid - 1] =
717 : 119 : ObjectIdGetDatum(pubid);
718 : 119 : values[Anum_pg_publication_namespace_pnnspid - 1] =
719 : 119 : ObjectIdGetDatum(schemaid);
720 : :
721 : 119 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
722 : :
723 : : /* Insert tuple into catalog */
724 : 119 : CatalogTupleInsert(rel, tup);
725 : 119 : heap_freetuple(tup);
726 : :
727 : 119 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
728 : :
729 : : /* Add dependency on the publication */
730 : 119 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
731 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
732 : :
733 : : /* Add dependency on the schema */
734 : 119 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
735 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
736 : :
737 : : /* Close the table */
738 : 119 : table_close(rel, RowExclusiveLock);
739 : :
740 : : /*
741 : : * Invalidate relcache so that publication info is rebuilt. See
742 : : * publication_add_relation for why we need to consider all the
743 : : * partitions.
744 : : */
1299 tomas.vondra@postgre 745 : 119 : schemaRels = GetSchemaPublicationRelations(schemaid,
746 : : PUBLICATION_PART_ALL);
1461 akapila@postgresql.o 747 : 119 : InvalidatePublicationRels(schemaRels);
748 : :
749 : 119 : return myself;
750 : : }
751 : :
752 : : /* Gets list of publication oids for a relation */
753 : : List *
3203 peter_e@gmx.net 754 : 6708 : GetRelationPublications(Oid relid)
755 : : {
3085 bruce@momjian.us 756 : 6708 : List *result = NIL;
757 : : CatCList *pubrellist;
758 : : int i;
759 : :
760 : : /* Find all publications associated with the relation. */
3203 peter_e@gmx.net 761 : 6708 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
762 : : ObjectIdGetDatum(relid));
763 [ + + ]: 7542 : for (i = 0; i < pubrellist->n_members; i++)
764 : : {
765 : 834 : HeapTuple tup = &pubrellist->members[i]->tuple;
766 : 834 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
767 : :
768 : 834 : result = lappend_oid(result, pubid);
769 : : }
770 : :
771 : 6708 : ReleaseSysCacheList(pubrellist);
772 : :
773 : 6708 : return result;
774 : : }
775 : :
776 : : /*
777 : : * Gets list of relation oids for a publication.
778 : : *
779 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
780 : : * should use GetAllPublicationRelations().
781 : : */
782 : : List *
1299 tomas.vondra@postgre 783 : 1176 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
784 : : {
785 : : List *result;
786 : : Relation pubrelsrel;
787 : : ScanKeyData scankey;
788 : : SysScanDesc scan;
789 : : HeapTuple tup;
790 : :
791 : : /* Find all publications associated with the relation. */
2471 andres@anarazel.de 792 : 1176 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
793 : :
3203 peter_e@gmx.net 794 : 1176 : ScanKeyInit(&scankey,
795 : : Anum_pg_publication_rel_prpubid,
796 : : BTEqualStrategyNumber, F_OIDEQ,
797 : : ObjectIdGetDatum(pubid));
798 : :
1384 alvherre@alvh.no-ip. 799 : 1176 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
800 : : true, NULL, 1, &scankey);
801 : :
3203 peter_e@gmx.net 802 : 1176 : result = NIL;
803 [ + + ]: 2723 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
804 : : {
805 : : Form_pg_publication_rel pubrel;
806 : :
807 : 1547 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1299 tomas.vondra@postgre 808 : 1547 : result = GetPubPartitionOptionRelations(result, pub_partopt,
809 : : pubrel->prrelid);
810 : : }
811 : :
3203 peter_e@gmx.net 812 : 1176 : systable_endscan(scan);
2471 andres@anarazel.de 813 : 1176 : table_close(pubrelsrel, AccessShareLock);
814 : :
815 : : /* Now sort and de-duplicate the result list */
1419 akapila@postgresql.o 816 : 1176 : list_sort(result, list_oid_cmp);
817 : 1176 : list_deduplicate_oid(result);
818 : :
3203 peter_e@gmx.net 819 : 1176 : return result;
820 : : }
821 : :
822 : : /*
823 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
824 : : */
825 : : List *
826 : 4532 : GetAllTablesPublications(void)
827 : : {
828 : : List *result;
829 : : Relation rel;
830 : : ScanKeyData scankey;
831 : : SysScanDesc scan;
832 : : HeapTuple tup;
833 : :
834 : : /* Find all publications that are marked as for all tables. */
2471 andres@anarazel.de 835 : 4532 : rel = table_open(PublicationRelationId, AccessShareLock);
836 : :
3203 peter_e@gmx.net 837 : 4532 : ScanKeyInit(&scankey,
838 : : Anum_pg_publication_puballtables,
839 : : BTEqualStrategyNumber, F_BOOLEQ,
840 : : BoolGetDatum(true));
841 : :
842 : 4532 : scan = systable_beginscan(rel, InvalidOid, false,
843 : : NULL, 1, &scankey);
844 : :
845 : 4532 : result = NIL;
846 [ + + ]: 4640 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
847 : : {
2350 tgl@sss.pgh.pa.us 848 : 108 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
849 : :
2533 andres@anarazel.de 850 : 108 : result = lappend_oid(result, oid);
851 : : }
852 : :
3203 peter_e@gmx.net 853 : 4532 : systable_endscan(scan);
2471 andres@anarazel.de 854 : 4532 : table_close(rel, AccessShareLock);
855 : :
3203 peter_e@gmx.net 856 : 4532 : return result;
857 : : }
858 : :
859 : : /*
860 : : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
861 : : * publication(s).
862 : : *
863 : : * If the publication publishes partition changes via their respective root
864 : : * partitioned tables, we must exclude partitions in favor of including the
865 : : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
866 : : * publication.
867 : : */
868 : : List *
18 akapila@postgresql.o 869 :GNC 172 : GetAllPublicationRelations(char relkind, bool pubviaroot)
870 : : {
871 : : Relation classRel;
872 : : ScanKeyData key[1];
873 : : TableScanDesc scan;
874 : : HeapTuple tuple;
3203 peter_e@gmx.net 875 :CBC 172 : List *result = NIL;
876 : :
18 akapila@postgresql.o 877 [ - + - - ]:GNC 172 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
878 : :
2471 andres@anarazel.de 879 :CBC 172 : classRel = table_open(RelationRelationId, AccessShareLock);
880 : :
3203 peter_e@gmx.net 881 : 172 : ScanKeyInit(&key[0],
882 : : Anum_pg_class_relkind,
883 : : BTEqualStrategyNumber, F_CHAREQ,
884 : : CharGetDatum(relkind));
885 : :
2422 andres@anarazel.de 886 : 172 : scan = table_beginscan_catalog(classRel, 1, key);
887 : :
3203 peter_e@gmx.net 888 [ + + ]: 12669 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
889 : : {
3085 bruce@momjian.us 890 : 12497 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
2533 andres@anarazel.de 891 : 12497 : Oid relid = relForm->oid;
892 : :
2028 peter@eisentraut.org 893 [ + + ]: 12497 : if (is_publishable_class(relid, relForm) &&
894 [ + + + + ]: 795 : !(relForm->relispartition && pubviaroot))
3203 peter_e@gmx.net 895 : 704 : result = lappend_oid(result, relid);
896 : : }
897 : :
2422 andres@anarazel.de 898 : 172 : table_endscan(scan);
899 : :
2028 peter@eisentraut.org 900 [ + + ]: 172 : if (pubviaroot)
901 : : {
902 : 13 : ScanKeyInit(&key[0],
903 : : Anum_pg_class_relkind,
904 : : BTEqualStrategyNumber, F_CHAREQ,
905 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
906 : :
907 : 13 : scan = table_beginscan_catalog(classRel, 1, key);
908 : :
909 [ + + ]: 78 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
910 : : {
911 : 65 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
912 : 65 : Oid relid = relForm->oid;
913 : :
914 [ + - ]: 65 : if (is_publishable_class(relid, relForm) &&
915 [ + + ]: 65 : !relForm->relispartition)
916 : 52 : result = lappend_oid(result, relid);
917 : : }
918 : :
919 : 13 : table_endscan(scan);
920 : : }
921 : :
2025 922 : 172 : table_close(classRel, AccessShareLock);
3203 peter_e@gmx.net 923 : 172 : return result;
924 : : }
925 : :
926 : : /*
927 : : * Gets the list of schema oids for a publication.
928 : : *
929 : : * This should only be used FOR TABLES IN SCHEMA publications.
930 : : */
931 : : List *
1299 tomas.vondra@postgre 932 : 1130 : GetPublicationSchemas(Oid pubid)
933 : : {
1461 akapila@postgresql.o 934 : 1130 : List *result = NIL;
935 : : Relation pubschsrel;
936 : : ScanKeyData scankey;
937 : : SysScanDesc scan;
938 : : HeapTuple tup;
939 : :
940 : : /* Find all schemas associated with the publication */
941 : 1130 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
942 : :
1299 tomas.vondra@postgre 943 : 1130 : ScanKeyInit(&scankey,
944 : : Anum_pg_publication_namespace_pnpubid,
945 : : BTEqualStrategyNumber, F_OIDEQ,
946 : : ObjectIdGetDatum(pubid));
947 : :
1461 akapila@postgresql.o 948 : 1130 : scan = systable_beginscan(pubschsrel,
949 : : PublicationNamespacePnnspidPnpubidIndexId,
950 : : true, NULL, 1, &scankey);
951 [ + + ]: 1180 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
952 : : {
953 : : Form_pg_publication_namespace pubsch;
954 : :
955 : 50 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
956 : :
957 : 50 : result = lappend_oid(result, pubsch->pnnspid);
958 : : }
959 : :
960 : 1130 : systable_endscan(scan);
961 : 1130 : table_close(pubschsrel, AccessShareLock);
962 : :
963 : 1130 : return result;
964 : : }
965 : :
966 : : /*
967 : : * Gets the list of publication oids associated with a specified schema.
968 : : */
969 : : List *
1299 tomas.vondra@postgre 970 : 6479 : GetSchemaPublications(Oid schemaid)
971 : : {
1461 akapila@postgresql.o 972 : 6479 : List *result = NIL;
973 : : CatCList *pubschlist;
974 : : int i;
975 : :
976 : : /* Find all publications associated with the schema */
977 : 6479 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
978 : : ObjectIdGetDatum(schemaid));
979 [ + + ]: 6537 : for (i = 0; i < pubschlist->n_members; i++)
980 : : {
981 : 58 : HeapTuple tup = &pubschlist->members[i]->tuple;
982 : 58 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
983 : :
984 : 58 : result = lappend_oid(result, pubid);
985 : : }
986 : :
987 : 6479 : ReleaseSysCacheList(pubschlist);
988 : :
989 : 6479 : return result;
990 : : }
991 : :
992 : : /*
993 : : * Get the list of publishable relation oids for a specified schema.
994 : : */
995 : : List *
1299 tomas.vondra@postgre 996 : 250 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
997 : : {
998 : : Relation classRel;
999 : : ScanKeyData key[1];
1000 : : TableScanDesc scan;
1001 : : HeapTuple tuple;
1461 akapila@postgresql.o 1002 : 250 : List *result = NIL;
1003 : :
1004 [ - + ]: 250 : Assert(OidIsValid(schemaid));
1005 : :
1006 : 250 : classRel = table_open(RelationRelationId, AccessShareLock);
1007 : :
1008 : 250 : ScanKeyInit(&key[0],
1009 : : Anum_pg_class_relnamespace,
1010 : : BTEqualStrategyNumber, F_OIDEQ,
1011 : : ObjectIdGetDatum(schemaid));
1012 : :
1013 : : /* get all the relations present in the specified schema */
1014 : 250 : scan = table_beginscan_catalog(classRel, 1, key);
1015 [ + + ]: 12912 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1016 : : {
1017 : 12662 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1018 : 12662 : Oid relid = relForm->oid;
1019 : : char relkind;
1020 : :
1021 [ + + ]: 12662 : if (!is_publishable_class(relid, relForm))
1022 : 4327 : continue;
1023 : :
1024 : 8335 : relkind = get_rel_relkind(relid);
1299 tomas.vondra@postgre 1025 [ + + ]: 8335 : if (relkind == RELKIND_RELATION)
1026 : 7150 : result = lappend_oid(result, relid);
1027 [ + + ]: 1185 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1028 : : {
1461 akapila@postgresql.o 1029 : 391 : List *partitionrels = NIL;
1030 : :
1031 : : /*
1032 : : * It is quite possible that some of the partitions are in a
1033 : : * different schema than the parent table, so we need to get such
1034 : : * partitions separately.
1035 : : */
1036 : 391 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1037 : : pub_partopt,
1038 : : relForm->oid);
1039 : 391 : result = list_concat_unique_oid(result, partitionrels);
1040 : : }
1041 : : }
1042 : :
1043 : 250 : table_endscan(scan);
1044 : 250 : table_close(classRel, AccessShareLock);
1045 : 250 : return result;
1046 : : }
1047 : :
1048 : : /*
1049 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1050 : : * publication.
1051 : : */
1052 : : List *
1299 tomas.vondra@postgre 1053 : 928 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1054 : : {
1461 akapila@postgresql.o 1055 : 928 : List *result = NIL;
1299 tomas.vondra@postgre 1056 : 928 : List *pubschemalist = GetPublicationSchemas(pubid);
1057 : : ListCell *cell;
1058 : :
1461 akapila@postgresql.o 1059 [ + + + + : 963 : foreach(cell, pubschemalist)
+ + ]
1060 : : {
1061 : 35 : Oid schemaid = lfirst_oid(cell);
1062 : 35 : List *schemaRels = NIL;
1063 : :
1299 tomas.vondra@postgre 1064 : 35 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1461 akapila@postgresql.o 1065 : 35 : result = list_concat(result, schemaRels);
1066 : : }
1067 : :
1068 : 928 : return result;
1069 : : }
1070 : :
1071 : : /*
1072 : : * Get publication using oid
1073 : : *
1074 : : * The Publication struct and its data are palloc'ed here.
1075 : : */
1076 : : Publication *
3203 peter_e@gmx.net 1077 : 4741 : GetPublication(Oid pubid)
1078 : : {
1079 : : HeapTuple tup;
1080 : : Publication *pub;
1081 : : Form_pg_publication pubform;
1082 : :
1083 : 4741 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1084 [ - + ]: 4741 : if (!HeapTupleIsValid(tup))
3203 peter_e@gmx.net 1085 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1086 : :
3203 peter_e@gmx.net 1087 :CBC 4741 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1088 : :
1089 : 4741 : pub = (Publication *) palloc(sizeof(Publication));
1090 : 4741 : pub->oid = pubid;
1091 : 4741 : pub->name = pstrdup(NameStr(pubform->pubname));
1092 : 4741 : pub->alltables = pubform->puballtables;
18 akapila@postgresql.o 1093 :GNC 4741 : pub->allsequences = pubform->puballsequences;
3203 peter_e@gmx.net 1094 :CBC 4741 : pub->pubactions.pubinsert = pubform->pubinsert;
1095 : 4741 : pub->pubactions.pubupdate = pubform->pubupdate;
1096 : 4741 : pub->pubactions.pubdelete = pubform->pubdelete;
2760 1097 : 4741 : pub->pubactions.pubtruncate = pubform->pubtruncate;
2028 peter@eisentraut.org 1098 : 4741 : pub->pubviaroot = pubform->pubviaroot;
272 akapila@postgresql.o 1099 : 4741 : pub->pubgencols_type = pubform->pubgencols;
1100 : :
3203 peter_e@gmx.net 1101 : 4741 : ReleaseSysCache(tup);
1102 : :
1103 : 4741 : return pub;
1104 : : }
1105 : :
1106 : : /*
1107 : : * Get Publication using name.
1108 : : */
1109 : : Publication *
1110 : 1497 : GetPublicationByName(const char *pubname, bool missing_ok)
1111 : : {
1112 : : Oid oid;
1113 : :
2135 alvherre@alvh.no-ip. 1114 : 1497 : oid = get_publication_oid(pubname, missing_ok);
1115 : :
1116 [ + + ]: 1497 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1117 : : }
1118 : :
1119 : : /*
1120 : : * Get information of the tables in the given publication array.
1121 : : *
1122 : : * Returns pubid, relid, column list, row filter for each table.
1123 : : */
1124 : : Datum
3203 peter_e@gmx.net 1125 : 3064 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1126 : : {
1127 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1128 : : FuncCallContext *funcctx;
943 akapila@postgresql.o 1129 : 3064 : List *table_infos = NIL;
1130 : :
1131 : : /* stuff done only on the first call of the function */
3203 peter_e@gmx.net 1132 [ + + ]: 3064 : if (SRF_IS_FIRSTCALL())
1133 : : {
1134 : : TupleDesc tupdesc;
1135 : : MemoryContext oldcontext;
1136 : : ArrayType *arr;
1137 : : Datum *elems;
1138 : : int nelems,
1139 : : i;
943 akapila@postgresql.o 1140 : 992 : bool viaroot = false;
1141 : :
1142 : : /* create a function context for cross-call persistence */
3203 peter_e@gmx.net 1143 : 992 : funcctx = SRF_FIRSTCALL_INIT();
1144 : :
1145 : : /* switch to memory context appropriate for multiple function calls */
1146 : 992 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1147 : :
1148 : : /*
1149 : : * Deconstruct the parameter into elements where each element is a
1150 : : * publication name.
1151 : : */
943 akapila@postgresql.o 1152 : 992 : arr = PG_GETARG_ARRAYTYPE_P(0);
381 alvherre@alvh.no-ip. 1153 : 992 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1154 : :
1155 : : /* Get Oids of tables from each publication. */
943 akapila@postgresql.o 1156 [ + + ]: 2029 : for (i = 0; i < nelems; i++)
1157 : : {
1158 : : Publication *pub_elem;
1159 : 1037 : List *pub_elem_tables = NIL;
1160 : : ListCell *lc;
1161 : :
1162 : 1037 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1163 : :
1164 : : /*
1165 : : * Publications support partitioned tables. If
1166 : : * publish_via_partition_root is false, all changes are replicated
1167 : : * using leaf partition identity and schema, so we only need
1168 : : * those. Otherwise, get the partitioned table itself.
1169 : : */
1170 [ + + ]: 1037 : if (pub_elem->alltables)
18 akapila@postgresql.o 1171 :GNC 172 : pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION,
1172 : 172 : pub_elem->pubviaroot);
1173 : : else
1174 : : {
1175 : : List *relids,
1176 : : *schemarelids;
1177 : :
943 akapila@postgresql.o 1178 :CBC 865 : relids = GetPublicationRelations(pub_elem->oid,
1179 : 865 : pub_elem->pubviaroot ?
1180 : 865 : PUBLICATION_PART_ROOT :
1181 : : PUBLICATION_PART_LEAF);
1182 : 865 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1183 : 865 : pub_elem->pubviaroot ?
1184 : 865 : PUBLICATION_PART_ROOT :
1185 : : PUBLICATION_PART_LEAF);
1186 : 865 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1187 : : }
1188 : :
1189 : : /*
1190 : : * Record the published table and the corresponding publication so
1191 : : * that we can get row filters and column lists later.
1192 : : *
1193 : : * When a table is published by multiple publications, to obtain
1194 : : * all row filters and column lists, the structure related to this
1195 : : * table will be recorded multiple times.
1196 : : */
1197 [ + + + + : 3149 : foreach(lc, pub_elem_tables)
+ + ]
1198 : : {
1199 : 2112 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1200 : :
1201 : 2112 : table_info->relid = lfirst_oid(lc);
1202 : 2112 : table_info->pubid = pub_elem->oid;
1203 : 2112 : table_infos = lappend(table_infos, table_info);
1204 : : }
1205 : :
1206 : : /* At least one publication is using publish_via_partition_root. */
1207 [ + + ]: 1037 : if (pub_elem->pubviaroot)
1208 : 184 : viaroot = true;
1209 : : }
1210 : :
1211 : : /*
1212 : : * If the publication publishes partition changes via their respective
1213 : : * root partitioned tables, we must exclude partitions in favor of
1214 : : * including the root partitioned tables. Otherwise, the function
1215 : : * could return both the child and parent tables which could cause
1216 : : * data of the child table to be double-published on the subscriber
1217 : : * side.
1218 : : */
1219 [ + + ]: 992 : if (viaroot)
1220 : 176 : filter_partitions(table_infos);
1221 : :
1222 : : /* Construct a tuple descriptor for the result rows. */
1222 michael@paquier.xyz 1223 : 992 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
943 akapila@postgresql.o 1224 : 992 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1225 : : OIDOID, -1, 0);
1226 : 992 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1227 : : OIDOID, -1, 0);
1228 : 992 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1229 : : INT2VECTOROID, -1, 0);
1230 : 992 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1231 : : PG_NODE_TREEOID, -1, 0);
1232 : :
1257 1233 : 992 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
333 peter@eisentraut.org 1234 : 992 : funcctx->user_fctx = table_infos;
1235 : :
3203 peter_e@gmx.net 1236 : 992 : MemoryContextSwitchTo(oldcontext);
1237 : : }
1238 : :
1239 : : /* stuff done on every call of the function */
1240 : 3064 : funcctx = SRF_PERCALL_SETUP();
943 akapila@postgresql.o 1241 : 3064 : table_infos = (List *) funcctx->user_fctx;
1242 : :
1243 [ + + ]: 3064 : if (funcctx->call_cntr < list_length(table_infos))
1244 : : {
1257 1245 : 2072 : HeapTuple pubtuple = NULL;
1246 : : HeapTuple rettuple;
1247 : : Publication *pub;
943 1248 : 2072 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1249 : 2072 : Oid relid = table_info->relid;
1130 1250 : 2072 : Oid schemaid = get_rel_namespace(relid);
1199 peter@eisentraut.org 1251 : 2072 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1252 : 2072 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1253 : :
1254 : : /*
1255 : : * Form tuple with appropriate data.
1256 : : */
1257 : :
943 akapila@postgresql.o 1258 : 2072 : pub = GetPublication(table_info->pubid);
1259 : :
1260 : 2072 : values[0] = ObjectIdGetDatum(pub->oid);
1261 : 2072 : values[1] = ObjectIdGetDatum(relid);
1262 : :
1263 : : /*
1264 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1265 : : * FOR TABLES IN SCHEMA publications.
1266 : : */
1267 [ + + ]: 2072 : if (!pub->alltables &&
1130 1268 [ + + ]: 1316 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1269 : : ObjectIdGetDatum(schemaid),
1270 : : ObjectIdGetDatum(pub->oid)))
1271 : 1269 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1272 : : ObjectIdGetDatum(relid),
1273 : : ObjectIdGetDatum(pub->oid));
1274 : :
1257 1275 [ + + ]: 2072 : if (HeapTupleIsValid(pubtuple))
1276 : : {
1277 : : /* Lookup the column list attribute. */
943 1278 : 1156 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1279 : : Anum_pg_publication_rel_prattrs,
1280 : : &(nulls[2]));
1281 : :
1282 : : /* Null indicates no filter. */
1283 : 1156 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1284 : : Anum_pg_publication_rel_prqual,
1285 : : &(nulls[3]));
1286 : : }
1287 : : else
1288 : : {
1257 1289 : 916 : nulls[2] = true;
943 1290 : 916 : nulls[3] = true;
1291 : : }
1292 : :
1293 : : /* Show all columns when the column list is not specified. */
1294 [ + + ]: 2072 : if (nulls[2])
1295 : : {
1018 1296 : 1946 : Relation rel = table_open(relid, AccessShareLock);
1297 : 1946 : int nattnums = 0;
1298 : : int16 *attnums;
1299 : 1946 : TupleDesc desc = RelationGetDescr(rel);
1300 : : int i;
1301 : :
1302 : 1946 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1303 : :
1304 [ + + ]: 5347 : for (i = 0; i < desc->natts; i++)
1305 : : {
1306 : 3401 : Form_pg_attribute att = TupleDescAttr(desc, i);
1307 : :
277 1308 [ + + ]: 3401 : if (att->attisdropped)
1018 1309 : 6 : continue;
1310 : :
277 1311 [ + + ]: 3395 : if (att->attgenerated)
1312 : : {
1313 : : /* We only support replication of STORED generated cols. */
1314 [ + + ]: 48 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1315 : 36 : continue;
1316 : :
1317 : : /*
1318 : : * User hasn't requested to replicate STORED generated
1319 : : * cols.
1320 : : */
1321 [ + + ]: 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1322 : 9 : continue;
1323 : : }
1324 : :
1018 1325 : 3350 : attnums[nattnums++] = att->attnum;
1326 : : }
1327 : :
1328 [ + + ]: 1946 : if (nattnums > 0)
1329 : : {
943 1330 : 1924 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1331 : 1924 : nulls[2] = false;
1332 : : }
1333 : :
1018 1334 : 1946 : table_close(rel, AccessShareLock);
1335 : : }
1336 : :
1257 1337 : 2072 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1338 : :
1339 : 2072 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1340 : : }
1341 : :
3203 peter_e@gmx.net 1342 : 992 : SRF_RETURN_DONE(funcctx);
1343 : : }
1344 : :
1345 : : /*
1346 : : * Returns Oids of sequences in a publication.
1347 : : */
1348 : : Datum
18 akapila@postgresql.o 1349 :GNC 201 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1350 : : {
1351 : : FuncCallContext *funcctx;
1352 : 201 : List *sequences = NIL;
1353 : :
1354 : : /* stuff done only on the first call of the function */
1355 [ + - ]: 201 : if (SRF_IS_FIRSTCALL())
1356 : : {
1357 : 201 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1358 : : Publication *publication;
1359 : : MemoryContext oldcontext;
1360 : :
1361 : : /* create a function context for cross-call persistence */
1362 : 201 : funcctx = SRF_FIRSTCALL_INIT();
1363 : :
1364 : : /* switch to memory context appropriate for multiple function calls */
1365 : 201 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1366 : :
1367 : 201 : publication = GetPublicationByName(pubname, false);
1368 : :
1369 [ - + ]: 201 : if (publication->allsequences)
18 akapila@postgresql.o 1370 :UNC 0 : sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false);
1371 : :
18 akapila@postgresql.o 1372 :GNC 201 : funcctx->user_fctx = (void *) sequences;
1373 : :
1374 : 201 : MemoryContextSwitchTo(oldcontext);
1375 : : }
1376 : :
1377 : : /* stuff done on every call of the function */
1378 : 201 : funcctx = SRF_PERCALL_SETUP();
1379 : 201 : sequences = (List *) funcctx->user_fctx;
1380 : :
1381 [ - + ]: 201 : if (funcctx->call_cntr < list_length(sequences))
1382 : : {
18 akapila@postgresql.o 1383 :UNC 0 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1384 : :
1385 : 0 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1386 : : }
1387 : :
18 akapila@postgresql.o 1388 :GNC 201 : SRF_RETURN_DONE(funcctx);
1389 : : }
|