Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : /*
52 : : * Check if relation can be in given publication and throws appropriate
53 : : * error if not.
54 : : */
55 : : static void
3152 peter_e@gmx.net 56 :CBC 562 : check_publication_add_relation(Relation targetrel)
57 : : {
58 : : /* Must be a regular or partitioned table */
2006 peter@eisentraut.org 59 [ + + ]: 562 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
1248 tomas.vondra@postgre 60 [ + + ]: 72 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
3152 peter_e@gmx.net 61 [ + - ]: 7 : ereport(ERROR,
62 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63 : : errmsg("cannot add relation \"%s\" to publication",
64 : : RelationGetRelationName(targetrel)),
65 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
66 : :
67 : : /* Can't be system table */
68 [ + + ]: 555 : if (IsCatalogRelation(targetrel))
69 [ + - ]: 3 : ereport(ERROR,
70 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : : errmsg("cannot add relation \"%s\" to publication",
72 : : RelationGetRelationName(targetrel)),
73 : : errdetail("This operation is not supported for system tables.")));
74 : :
75 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
1389 dgustafsson@postgres 76 [ + + ]: 552 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3152 peter_e@gmx.net 77 [ + - ]: 3 : ereport(ERROR,
78 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : : errmsg("cannot add relation \"%s\" to publication",
80 : : RelationGetRelationName(targetrel)),
81 : : errdetail("This operation is not supported for temporary tables.")));
1389 dgustafsson@postgres 82 [ + + ]: 549 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
83 [ + - ]: 3 : ereport(ERROR,
84 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : : errmsg("cannot add relation \"%s\" to publication",
86 : : RelationGetRelationName(targetrel)),
87 : : errdetail("This operation is not supported for unlogged tables.")));
3152 peter_e@gmx.net 88 : 546 : }
89 : :
90 : : /*
91 : : * Check if schema can be in given publication and throw appropriate error if
92 : : * not.
93 : : */
94 : : static void
1410 akapila@postgresql.o 95 : 122 : check_publication_add_schema(Oid schemaid)
96 : : {
97 : : /* Can't be system namespace */
98 [ + + - + ]: 122 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
99 [ + - ]: 3 : ereport(ERROR,
100 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
101 : : errmsg("cannot add schema \"%s\" to publication",
102 : : get_namespace_name(schemaid)),
103 : : errdetail("This operation is not supported for system schemas.")));
104 : :
105 : : /* Can't be temporary namespace */
106 [ - + ]: 119 : if (isAnyTempNamespace(schemaid))
1410 akapila@postgresql.o 107 [ # # ]:UBC 0 : ereport(ERROR,
108 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : : errmsg("cannot add schema \"%s\" to publication",
110 : : get_namespace_name(schemaid)),
111 : : errdetail("Temporary schemas cannot be replicated.")));
1410 akapila@postgresql.o 112 :CBC 119 : }
113 : :
114 : : /*
115 : : * Returns if relation represented by oid and Form_pg_class entry
116 : : * is publishable.
117 : : *
118 : : * Does same checks as check_publication_add_relation() above, but does not
119 : : * need relation to be opened and also does not throw errors.
120 : : *
121 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
122 : : * ie all tables created during initdb. This mainly affects the preinstalled
123 : : * information_schema. IsCatalogRelationOid() only excludes tables with
124 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
125 : : * but really we should get rid of the FirstNormalObjectId test not
126 : : * IsCatalogRelationOid. We can't do so today because we don't want
127 : : * information_schema tables to be considered publishable; but this test
128 : : * is really inadequate for that, since the information_schema could be
129 : : * dropped and reloaded and then it'll be considered publishable. The best
130 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
131 : : * and depend on that instead of OID checks.
132 : : */
133 : : static bool
3152 peter_e@gmx.net 134 : 302461 : is_publishable_class(Oid relid, Form_pg_class reltuple)
135 : : {
2006 peter@eisentraut.org 136 : 308503 : return (reltuple->relkind == RELKIND_RELATION ||
1248 tomas.vondra@postgre 137 [ + + ]: 6042 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
2313 tgl@sss.pgh.pa.us 138 [ + + ]: 297171 : !IsCatalogRelationOid(relid) &&
3152 peter_e@gmx.net 139 [ + + + + : 604922 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
140 : : relid >= FirstNormalObjectId;
141 : : }
142 : :
143 : : /*
144 : : * Another variant of is_publishable_class(), taking a Relation.
145 : : */
146 : : bool
1135 akapila@postgresql.o 147 : 274255 : is_publishable_relation(Relation rel)
148 : : {
149 : 274255 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
150 : : }
151 : :
152 : : /*
153 : : * SQL-callable variant of the above
154 : : *
155 : : * This returns null when the relation does not exist. This is intended to be
156 : : * used for example in psql to avoid gratuitous errors when there are
157 : : * concurrent catalog changes.
158 : : */
159 : : Datum
1131 160 : 3028 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
161 : : {
162 : 3028 : Oid relid = PG_GETARG_OID(0);
163 : : HeapTuple tuple;
164 : : bool result;
165 : :
166 : 3028 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
167 [ - + ]: 3028 : if (!HeapTupleIsValid(tuple))
1131 akapila@postgresql.o 168 :UBC 0 : PG_RETURN_NULL();
1131 akapila@postgresql.o 169 :CBC 3028 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
170 : 3028 : ReleaseSysCache(tuple);
171 : 3028 : PG_RETURN_BOOL(result);
172 : : }
173 : :
174 : : /*
175 : : * Returns true if the ancestor is in the list of published relations.
176 : : * Otherwise, returns false.
177 : : */
178 : : static bool
892 179 : 101 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
180 : : {
181 : : ListCell *lc;
182 : :
183 [ + - + + : 348 : foreach(lc, table_infos)
+ + ]
184 : : {
185 : 287 : Oid relid = ((published_rel *) lfirst(lc))->relid;
186 : :
187 [ + + ]: 287 : if (relid == ancestor)
188 : 40 : return true;
189 : : }
190 : :
191 : 61 : return false;
192 : : }
193 : :
194 : : /*
195 : : * Filter out the partitions whose parent tables are also present in the list.
196 : : */
197 : : static void
198 : 176 : filter_partitions(List *table_infos)
199 : : {
200 : : ListCell *lc;
201 : :
202 [ + - + + : 517 : foreach(lc, table_infos)
+ + ]
203 : : {
1410 204 : 341 : bool skip = false;
205 : 341 : List *ancestors = NIL;
206 : : ListCell *lc2;
892 207 : 341 : published_rel *table_info = (published_rel *) lfirst(lc);
208 : :
209 [ + + ]: 341 : if (get_rel_relispartition(table_info->relid))
210 : 101 : ancestors = get_partition_ancestors(table_info->relid);
211 : :
1410 212 [ + + + + : 402 : foreach(lc2, ancestors)
+ + ]
213 : : {
214 : 101 : Oid ancestor = lfirst_oid(lc2);
215 : :
892 216 [ + + ]: 101 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
217 : : {
1410 218 : 40 : skip = true;
219 : 40 : break;
220 : : }
221 : : }
222 : :
892 223 [ + + ]: 341 : if (skip)
224 : 40 : table_infos = foreach_delete_current(table_infos, lc);
225 : : }
1410 226 : 176 : }
227 : :
228 : : /*
229 : : * Returns true if any schema is associated with the publication, false if no
230 : : * schema is associated with the publication.
231 : : */
232 : : bool
1368 233 : 141 : is_schema_publication(Oid pubid)
234 : : {
235 : : Relation pubschsrel;
236 : : ScanKeyData scankey;
237 : : SysScanDesc scan;
238 : : HeapTuple tup;
239 : 141 : bool result = false;
240 : :
241 : 141 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
242 : 141 : ScanKeyInit(&scankey,
243 : : Anum_pg_publication_namespace_pnpubid,
244 : : BTEqualStrategyNumber, F_OIDEQ,
245 : : ObjectIdGetDatum(pubid));
246 : :
247 : 141 : scan = systable_beginscan(pubschsrel,
248 : : PublicationNamespacePnnspidPnpubidIndexId,
249 : : true, NULL, 1, &scankey);
250 : 141 : tup = systable_getnext(scan);
251 : 141 : result = HeapTupleIsValid(tup);
252 : :
253 : 141 : systable_endscan(scan);
254 : 141 : table_close(pubschsrel, AccessShareLock);
255 : :
256 : 141 : return result;
257 : : }
258 : :
259 : : /*
260 : : * Returns true if the relation has column list associated with the
261 : : * publication, false otherwise.
262 : : *
263 : : * If a column list is found, the corresponding bitmap is returned through the
264 : : * cols parameter, if provided. The bitmap is constructed within the given
265 : : * memory context (mcxt).
266 : : */
267 : : bool
303 268 : 792 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
269 : : Bitmapset **cols)
270 : : {
271 : : HeapTuple cftuple;
272 : 792 : bool found = false;
273 : :
274 [ + + ]: 792 : if (pub->alltables)
275 : 192 : return false;
276 : :
277 : 600 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
278 : : ObjectIdGetDatum(relid),
279 : : ObjectIdGetDatum(pub->oid));
280 [ + + ]: 600 : if (HeapTupleIsValid(cftuple))
281 : : {
282 : : Datum cfdatum;
283 : : bool isnull;
284 : :
285 : : /* Lookup the column list attribute. */
286 : 548 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
287 : : Anum_pg_publication_rel_prattrs, &isnull);
288 : :
289 : : /* Was a column list found? */
290 [ + + ]: 548 : if (!isnull)
291 : : {
292 : : /* Build the column list bitmap in the given memory context. */
293 [ + + ]: 157 : if (cols)
294 : 154 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
295 : :
296 : 157 : found = true;
297 : : }
298 : :
299 : 548 : ReleaseSysCache(cftuple);
300 : : }
301 : :
302 : 600 : return found;
303 : : }
304 : :
305 : : /*
306 : : * Gets the relations based on the publication partition option for a specified
307 : : * relation.
308 : : */
309 : : List *
1445 310 : 2909 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
311 : : Oid relid)
312 : : {
313 [ + + + + ]: 2909 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
314 : : pub_partopt != PUBLICATION_PART_ROOT)
315 : 594 : {
316 : 594 : List *all_parts = find_all_inheritors(relid, NoLock,
317 : : NULL);
318 : :
319 [ + + ]: 594 : if (pub_partopt == PUBLICATION_PART_ALL)
320 : 493 : result = list_concat(result, all_parts);
321 [ + - ]: 101 : else if (pub_partopt == PUBLICATION_PART_LEAF)
322 : : {
323 : : ListCell *lc;
324 : :
325 [ + - + + : 369 : foreach(lc, all_parts)
+ + ]
326 : : {
327 : 268 : Oid partOid = lfirst_oid(lc);
328 : :
329 [ + + ]: 268 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
330 : 167 : result = lappend_oid(result, partOid);
331 : : }
332 : : }
333 : : else
1445 akapila@postgresql.o 334 :UBC 0 : Assert(false);
335 : : }
336 : : else
1445 akapila@postgresql.o 337 :CBC 2315 : result = lappend_oid(result, relid);
338 : :
339 : 2909 : return result;
340 : : }
341 : :
342 : : /*
343 : : * Returns the relid of the topmost ancestor that is published via this
344 : : * publication if any and set its ancestor level to ancestor_level,
345 : : * otherwise returns InvalidOid.
346 : : *
347 : : * The ancestor_level value allows us to compare the results for multiple
348 : : * publications, and decide which value is higher up.
349 : : *
350 : : * Note that the list of ancestors should be ordered such that the topmost
351 : : * ancestor is at the end of the list.
352 : : */
353 : : Oid
1270 tomas.vondra@postgre 354 : 255 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
355 : : {
356 : : ListCell *lc;
1292 akapila@postgresql.o 357 : 255 : Oid topmost_relid = InvalidOid;
1270 tomas.vondra@postgre 358 : 255 : int level = 0;
359 : :
360 : : /*
361 : : * Find the "topmost" ancestor that is in this publication.
362 : : */
1292 akapila@postgresql.o 363 [ + - + + : 518 : foreach(lc, ancestors)
+ + ]
364 : : {
365 : 263 : Oid ancestor = lfirst_oid(lc);
366 : 263 : List *apubids = GetRelationPublications(ancestor);
367 : 263 : List *aschemaPubids = NIL;
368 : :
1270 tomas.vondra@postgre 369 : 263 : level++;
370 : :
1292 akapila@postgresql.o 371 [ + + ]: 263 : if (list_member_oid(apubids, puboid))
372 : : {
373 : 156 : topmost_relid = ancestor;
374 : :
1270 tomas.vondra@postgre 375 [ + + ]: 156 : if (ancestor_level)
376 : 43 : *ancestor_level = level;
377 : : }
378 : : else
379 : : {
1248 380 : 107 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
1292 akapila@postgresql.o 381 [ + + ]: 107 : if (list_member_oid(aschemaPubids, puboid))
382 : : {
383 : 5 : topmost_relid = ancestor;
384 : :
1270 tomas.vondra@postgre 385 [ + - ]: 5 : if (ancestor_level)
386 : 5 : *ancestor_level = level;
387 : : }
388 : : }
389 : :
1292 akapila@postgresql.o 390 : 263 : list_free(apubids);
391 : 263 : list_free(aschemaPubids);
392 : : }
393 : :
394 : 255 : return topmost_relid;
395 : : }
396 : :
397 : : /*
398 : : * attnumstoint2vector
399 : : * Convert a Bitmapset of AttrNumbers into an int2vector.
400 : : *
401 : : * AttrNumber numbers are 0-based, i.e., not offset by
402 : : * FirstLowInvalidHeapAttributeNumber.
403 : : */
404 : : static int2vector *
387 drowley@postgresql.o 405 : 166 : attnumstoint2vector(Bitmapset *attrs)
406 : : {
407 : : int2vector *result;
408 : 166 : int n = bms_num_members(attrs);
409 : 166 : int i = -1;
410 : 166 : int j = 0;
411 : :
412 : 166 : result = buildint2vector(NULL, n);
413 : :
414 [ + + ]: 452 : while ((i = bms_next_member(attrs, i)) >= 0)
415 : : {
416 [ - + ]: 286 : Assert(i <= PG_INT16_MAX);
417 : :
418 : 286 : result->values[j++] = (int16) i;
419 : : }
420 : :
421 : 166 : return result;
422 : : }
423 : :
424 : : /*
425 : : * Insert new publication / relation mapping.
426 : : */
427 : : ObjectAddress
1292 akapila@postgresql.o 428 : 573 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
429 : : bool if_not_exists)
430 : : {
431 : : Relation rel;
432 : : HeapTuple tup;
433 : : Datum values[Natts_pg_publication_rel];
434 : : bool nulls[Natts_pg_publication_rel];
435 : 573 : Relation targetrel = pri->relation;
436 : 573 : Oid relid = RelationGetRelid(targetrel);
437 : : Oid pubreloid;
438 : : Bitmapset *attnums;
3152 peter_e@gmx.net 439 : 573 : Publication *pub = GetPublication(pubid);
440 : : ObjectAddress myself,
441 : : referenced;
1445 akapila@postgresql.o 442 : 573 : List *relids = NIL;
443 : : int i;
444 : :
2420 andres@anarazel.de 445 : 573 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
446 : :
447 : : /*
448 : : * Check for duplicates. Note that this does not really prevent
449 : : * duplicates, it's here just to provide nicer error message in common
450 : : * case. The real protection is the unique key on the catalog.
451 : : */
3152 peter_e@gmx.net 452 [ + + ]: 573 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
453 : : ObjectIdGetDatum(pubid)))
454 : : {
2420 andres@anarazel.de 455 : 11 : table_close(rel, RowExclusiveLock);
456 : :
3152 peter_e@gmx.net 457 [ + + ]: 11 : if (if_not_exists)
458 : 8 : return InvalidObjectAddress;
459 : :
460 [ + - ]: 3 : ereport(ERROR,
461 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
462 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
463 : : RelationGetRelationName(targetrel), pub->name)));
464 : : }
465 : :
1292 akapila@postgresql.o 466 : 562 : check_publication_add_relation(targetrel);
467 : :
468 : : /* Validate and translate column names into a Bitmapset of attnums. */
387 drowley@postgresql.o 469 : 546 : attnums = pub_collist_validate(pri->relation, pri->columns);
470 : :
471 : : /* Form a tuple. */
3152 peter_e@gmx.net 472 : 534 : memset(values, 0, sizeof(values));
473 : 534 : memset(nulls, false, sizeof(nulls));
474 : :
1346 alvherre@alvh.no-ip. 475 : 534 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
476 : : Anum_pg_publication_rel_oid);
477 : 534 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
3152 peter_e@gmx.net 478 : 534 : values[Anum_pg_publication_rel_prpubid - 1] =
479 : 534 : ObjectIdGetDatum(pubid);
480 : 534 : values[Anum_pg_publication_rel_prrelid - 1] =
481 : 534 : ObjectIdGetDatum(relid);
482 : :
483 : : /* Add qualifications, if available */
1292 akapila@postgresql.o 484 [ + + ]: 534 : if (pri->whereClause != NULL)
485 : 165 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
486 : : else
487 : 369 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
488 : :
489 : : /* Add column list, if available */
1260 tomas.vondra@postgre 490 [ + + ]: 534 : if (pri->columns)
387 drowley@postgresql.o 491 : 166 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
492 : : else
1260 tomas.vondra@postgre 493 : 368 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
494 : :
3152 peter_e@gmx.net 495 : 534 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
496 : :
497 : : /* Insert tuple into catalog. */
2482 andres@anarazel.de 498 : 534 : CatalogTupleInsert(rel, tup);
3152 peter_e@gmx.net 499 : 534 : heap_freetuple(tup);
500 : :
501 : : /* Register dependencies as needed */
1346 alvherre@alvh.no-ip. 502 : 534 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
503 : :
504 : : /* Add dependency on the publication */
3152 peter_e@gmx.net 505 : 534 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
506 : 534 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
507 : :
508 : : /* Add dependency on the relation */
509 : 534 : ObjectAddressSet(referenced, RelationRelationId, relid);
510 : 534 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
511 : :
512 : : /* Add dependency on the objects mentioned in the qualifications */
1292 akapila@postgresql.o 513 [ + + ]: 534 : if (pri->whereClause)
514 : 165 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
515 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
516 : : false);
517 : :
518 : : /* Add dependency on the columns, if any are listed */
387 drowley@postgresql.o 519 : 534 : i = -1;
520 [ + + ]: 820 : while ((i = bms_next_member(attnums, i)) >= 0)
521 : : {
522 : 286 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
1260 tomas.vondra@postgre 523 : 286 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
524 : : }
525 : :
526 : : /* Close the table. */
2420 andres@anarazel.de 527 : 534 : table_close(rel, RowExclusiveLock);
528 : :
529 : : /*
530 : : * Invalidate relcache so that publication info is rebuilt.
531 : : *
532 : : * For the partitioned tables, we must invalidate all partitions contained
533 : : * in the respective partition hierarchies, not just the one explicitly
534 : : * mentioned in the publication. This is required because we implicitly
535 : : * publish the child tables when the parent table is published.
536 : : */
1445 akapila@postgresql.o 537 : 534 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538 : : relid);
539 : :
540 : 534 : InvalidatePublicationRels(relids);
541 : :
3152 peter_e@gmx.net 542 : 534 : return myself;
543 : : }
544 : :
545 : : /*
546 : : * pub_collist_validate
547 : : * Process and validate the 'columns' list and ensure the columns are all
548 : : * valid to use for a publication. Checks for and raises an ERROR for
549 : : * any unknown columns, system columns, duplicate columns, or virtual
550 : : * generated columns.
551 : : *
552 : : * Looks up each column's attnum and returns a 0-based Bitmapset of the
553 : : * corresponding attnums.
554 : : */
555 : : Bitmapset *
387 drowley@postgresql.o 556 : 748 : pub_collist_validate(Relation targetrel, List *columns)
557 : : {
1260 tomas.vondra@postgre 558 : 748 : Bitmapset *set = NULL;
559 : : ListCell *lc;
211 peter@eisentraut.org 560 : 748 : TupleDesc tupdesc = RelationGetDescr(targetrel);
561 : :
1260 tomas.vondra@postgre 562 [ + + + + : 1170 : foreach(lc, columns)
+ + ]
563 : : {
564 : 440 : char *colname = strVal(lfirst(lc));
565 : 440 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
566 : :
567 [ + + ]: 440 : if (attnum == InvalidAttrNumber)
568 [ + - ]: 3 : ereport(ERROR,
569 : : errcode(ERRCODE_UNDEFINED_COLUMN),
570 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
571 : : colname, RelationGetRelationName(targetrel)));
572 : :
573 [ + + ]: 437 : if (!AttrNumberIsForUserDefinedAttr(attnum))
574 [ + - ]: 6 : ereport(ERROR,
575 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
576 : : errmsg("cannot use system column \"%s\" in publication column list",
577 : : colname));
578 : :
211 peter@eisentraut.org 579 [ + + ]: 431 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
580 [ + - ]: 3 : ereport(ERROR,
581 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
582 : : errmsg("cannot use virtual generated column \"%s\" in publication column list",
583 : : colname));
584 : :
1260 tomas.vondra@postgre 585 [ + + ]: 428 : if (bms_is_member(attnum, set))
586 [ + - ]: 6 : ereport(ERROR,
587 : : errcode(ERRCODE_DUPLICATE_OBJECT),
588 : : errmsg("duplicate column \"%s\" in publication column list",
589 : : colname));
590 : :
591 : 422 : set = bms_add_member(set, attnum);
592 : : }
593 : :
387 drowley@postgresql.o 594 : 730 : return set;
595 : : }
596 : :
597 : : /*
598 : : * Transform a column list (represented by an array Datum) to a bitmapset.
599 : : *
600 : : * If columns isn't NULL, add the column numbers to that set.
601 : : *
602 : : * If mcxt isn't NULL, build the bitmapset in that context.
603 : : */
604 : : Bitmapset *
1260 tomas.vondra@postgre 605 : 222 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
606 : : {
387 drowley@postgresql.o 607 : 222 : Bitmapset *result = columns;
608 : : ArrayType *arr;
609 : : int nelems;
610 : : int16 *elems;
1213 tgl@sss.pgh.pa.us 611 : 222 : MemoryContext oldcxt = NULL;
612 : :
1260 tomas.vondra@postgre 613 : 222 : arr = DatumGetArrayTypeP(pubcols);
614 : 222 : nelems = ARR_DIMS(arr)[0];
615 [ - + ]: 222 : elems = (int16 *) ARR_DATA_PTR(arr);
616 : :
617 : : /* If a memory context was specified, switch to it. */
618 [ + + ]: 222 : if (mcxt)
619 : 39 : oldcxt = MemoryContextSwitchTo(mcxt);
620 : :
621 [ + + ]: 613 : for (int i = 0; i < nelems; i++)
622 : 391 : result = bms_add_member(result, elems[i]);
623 : :
624 [ + + ]: 222 : if (mcxt)
625 : 39 : MemoryContextSwitchTo(oldcxt);
626 : :
627 : 222 : return result;
628 : : }
629 : :
630 : : /*
631 : : * Returns a bitmap representing the columns of the specified table.
632 : : *
633 : : * Generated columns are included if include_gencols_type is
634 : : * PUBLISH_GENCOLS_STORED.
635 : : */
636 : : Bitmapset *
226 akapila@postgresql.o 637 : 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
638 : : {
303 639 : 9 : Bitmapset *result = NULL;
640 : 9 : TupleDesc desc = RelationGetDescr(relation);
641 : :
642 [ + + ]: 30 : for (int i = 0; i < desc->natts; i++)
643 : : {
644 : 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
645 : :
226 646 [ + + ]: 21 : if (att->attisdropped)
303 647 : 1 : continue;
648 : :
226 649 [ + + ]: 20 : if (att->attgenerated)
650 : : {
651 : : /* We only support replication of STORED generated cols. */
652 [ + + ]: 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
653 : 1 : continue;
654 : :
655 : : /* User hasn't requested to replicate STORED generated cols. */
656 [ + - ]: 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
657 : 1 : continue;
658 : : }
659 : :
303 660 : 18 : result = bms_add_member(result, att->attnum);
661 : : }
662 : :
663 : 9 : return result;
664 : : }
665 : :
666 : : /*
667 : : * Insert new publication / schema mapping.
668 : : */
669 : : ObjectAddress
1248 tomas.vondra@postgre 670 : 131 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
671 : : {
672 : : Relation rel;
673 : : HeapTuple tup;
674 : : Datum values[Natts_pg_publication_namespace];
675 : : bool nulls[Natts_pg_publication_namespace];
676 : : Oid psschid;
1410 akapila@postgresql.o 677 : 131 : Publication *pub = GetPublication(pubid);
678 : 131 : List *schemaRels = NIL;
679 : : ObjectAddress myself,
680 : : referenced;
681 : :
682 : 131 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
683 : :
684 : : /*
685 : : * Check for duplicates. Note that this does not really prevent
686 : : * duplicates, it's here just to provide nicer error message in common
687 : : * case. The real protection is the unique key on the catalog.
688 : : */
1248 tomas.vondra@postgre 689 [ + + ]: 131 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
690 : : ObjectIdGetDatum(schemaid),
691 : : ObjectIdGetDatum(pubid)))
692 : : {
1410 akapila@postgresql.o 693 : 9 : table_close(rel, RowExclusiveLock);
694 : :
695 [ + + ]: 9 : if (if_not_exists)
696 : 6 : return InvalidObjectAddress;
697 : :
698 [ + - ]: 3 : ereport(ERROR,
699 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
700 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
701 : : get_namespace_name(schemaid), pub->name)));
702 : : }
703 : :
704 : 122 : check_publication_add_schema(schemaid);
705 : :
706 : : /* Form a tuple */
707 : 119 : memset(values, 0, sizeof(values));
708 : 119 : memset(nulls, false, sizeof(nulls));
709 : :
710 : 119 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
711 : : Anum_pg_publication_namespace_oid);
712 : 119 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
713 : 119 : values[Anum_pg_publication_namespace_pnpubid - 1] =
714 : 119 : ObjectIdGetDatum(pubid);
715 : 119 : values[Anum_pg_publication_namespace_pnnspid - 1] =
716 : 119 : ObjectIdGetDatum(schemaid);
717 : :
718 : 119 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
719 : :
720 : : /* Insert tuple into catalog */
721 : 119 : CatalogTupleInsert(rel, tup);
722 : 119 : heap_freetuple(tup);
723 : :
724 : 119 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
725 : :
726 : : /* Add dependency on the publication */
727 : 119 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
728 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
729 : :
730 : : /* Add dependency on the schema */
731 : 119 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
732 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
733 : :
734 : : /* Close the table */
735 : 119 : table_close(rel, RowExclusiveLock);
736 : :
737 : : /*
738 : : * Invalidate relcache so that publication info is rebuilt. See
739 : : * publication_add_relation for why we need to consider all the
740 : : * partitions.
741 : : */
1248 tomas.vondra@postgre 742 : 119 : schemaRels = GetSchemaPublicationRelations(schemaid,
743 : : PUBLICATION_PART_ALL);
1410 akapila@postgresql.o 744 : 119 : InvalidatePublicationRels(schemaRels);
745 : :
746 : 119 : return myself;
747 : : }
748 : :
749 : : /* Gets list of publication oids for a relation */
750 : : List *
3152 peter_e@gmx.net 751 : 6643 : GetRelationPublications(Oid relid)
752 : : {
3034 bruce@momjian.us 753 : 6643 : List *result = NIL;
754 : : CatCList *pubrellist;
755 : : int i;
756 : :
757 : : /* Find all publications associated with the relation. */
3152 peter_e@gmx.net 758 : 6643 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
759 : : ObjectIdGetDatum(relid));
760 [ + + ]: 7465 : for (i = 0; i < pubrellist->n_members; i++)
761 : : {
762 : 822 : HeapTuple tup = &pubrellist->members[i]->tuple;
763 : 822 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
764 : :
765 : 822 : result = lappend_oid(result, pubid);
766 : : }
767 : :
768 : 6643 : ReleaseSysCacheList(pubrellist);
769 : :
770 : 6643 : return result;
771 : : }
772 : :
773 : : /*
774 : : * Gets list of relation oids for a publication.
775 : : *
776 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES
777 : : * should use GetAllTablesPublicationRelations().
778 : : */
779 : : List *
1248 tomas.vondra@postgre 780 : 1173 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
781 : : {
782 : : List *result;
783 : : Relation pubrelsrel;
784 : : ScanKeyData scankey;
785 : : SysScanDesc scan;
786 : : HeapTuple tup;
787 : :
788 : : /* Find all publications associated with the relation. */
2420 andres@anarazel.de 789 : 1173 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
790 : :
3152 peter_e@gmx.net 791 : 1173 : ScanKeyInit(&scankey,
792 : : Anum_pg_publication_rel_prpubid,
793 : : BTEqualStrategyNumber, F_OIDEQ,
794 : : ObjectIdGetDatum(pubid));
795 : :
1333 alvherre@alvh.no-ip. 796 : 1173 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
797 : : true, NULL, 1, &scankey);
798 : :
3152 peter_e@gmx.net 799 : 1173 : result = NIL;
800 [ + + ]: 2717 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
801 : : {
802 : : Form_pg_publication_rel pubrel;
803 : :
804 : 1544 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1248 tomas.vondra@postgre 805 : 1544 : result = GetPubPartitionOptionRelations(result, pub_partopt,
806 : : pubrel->prrelid);
807 : : }
808 : :
3152 peter_e@gmx.net 809 : 1173 : systable_endscan(scan);
2420 andres@anarazel.de 810 : 1173 : table_close(pubrelsrel, AccessShareLock);
811 : :
812 : : /* Now sort and de-duplicate the result list */
1368 akapila@postgresql.o 813 : 1173 : list_sort(result, list_oid_cmp);
814 : 1173 : list_deduplicate_oid(result);
815 : :
3152 peter_e@gmx.net 816 : 1173 : return result;
817 : : }
818 : :
819 : : /*
820 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
821 : : */
822 : : List *
823 : 4485 : GetAllTablesPublications(void)
824 : : {
825 : : List *result;
826 : : Relation rel;
827 : : ScanKeyData scankey;
828 : : SysScanDesc scan;
829 : : HeapTuple tup;
830 : :
831 : : /* Find all publications that are marked as for all tables. */
2420 andres@anarazel.de 832 : 4485 : rel = table_open(PublicationRelationId, AccessShareLock);
833 : :
3152 peter_e@gmx.net 834 : 4485 : ScanKeyInit(&scankey,
835 : : Anum_pg_publication_puballtables,
836 : : BTEqualStrategyNumber, F_BOOLEQ,
837 : : BoolGetDatum(true));
838 : :
839 : 4485 : scan = systable_beginscan(rel, InvalidOid, false,
840 : : NULL, 1, &scankey);
841 : :
842 : 4485 : result = NIL;
843 [ + + ]: 4593 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
844 : : {
2299 tgl@sss.pgh.pa.us 845 : 108 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
846 : :
2482 andres@anarazel.de 847 : 108 : result = lappend_oid(result, oid);
848 : : }
849 : :
3152 peter_e@gmx.net 850 : 4485 : systable_endscan(scan);
2420 andres@anarazel.de 851 : 4485 : table_close(rel, AccessShareLock);
852 : :
3152 peter_e@gmx.net 853 : 4485 : return result;
854 : : }
855 : :
856 : : /*
857 : : * Gets list of all relation published by FOR ALL TABLES publication(s).
858 : : *
859 : : * If the publication publishes partition changes via their respective root
860 : : * partitioned tables, we must exclude partitions in favor of including the
861 : : * root partitioned tables.
862 : : */
863 : : List *
1977 peter@eisentraut.org 864 : 172 : GetAllTablesPublicationRelations(bool pubviaroot)
865 : : {
866 : : Relation classRel;
867 : : ScanKeyData key[1];
868 : : TableScanDesc scan;
869 : : HeapTuple tuple;
3152 peter_e@gmx.net 870 : 172 : List *result = NIL;
871 : :
2420 andres@anarazel.de 872 : 172 : classRel = table_open(RelationRelationId, AccessShareLock);
873 : :
3152 peter_e@gmx.net 874 : 172 : ScanKeyInit(&key[0],
875 : : Anum_pg_class_relkind,
876 : : BTEqualStrategyNumber, F_CHAREQ,
877 : : CharGetDatum(RELKIND_RELATION));
878 : :
2371 andres@anarazel.de 879 : 172 : scan = table_beginscan_catalog(classRel, 1, key);
880 : :
3152 peter_e@gmx.net 881 [ + + ]: 12669 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
882 : : {
3034 bruce@momjian.us 883 : 12497 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
2482 andres@anarazel.de 884 : 12497 : Oid relid = relForm->oid;
885 : :
1977 peter@eisentraut.org 886 [ + + ]: 12497 : if (is_publishable_class(relid, relForm) &&
887 [ + + + + ]: 795 : !(relForm->relispartition && pubviaroot))
3152 peter_e@gmx.net 888 : 704 : result = lappend_oid(result, relid);
889 : : }
890 : :
2371 andres@anarazel.de 891 : 172 : table_endscan(scan);
892 : :
1977 peter@eisentraut.org 893 [ + + ]: 172 : if (pubviaroot)
894 : : {
895 : 13 : ScanKeyInit(&key[0],
896 : : Anum_pg_class_relkind,
897 : : BTEqualStrategyNumber, F_CHAREQ,
898 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
899 : :
900 : 13 : scan = table_beginscan_catalog(classRel, 1, key);
901 : :
902 [ + + ]: 78 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
903 : : {
904 : 65 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
905 : 65 : Oid relid = relForm->oid;
906 : :
907 [ + - ]: 65 : if (is_publishable_class(relid, relForm) &&
908 [ + + ]: 65 : !relForm->relispartition)
909 : 52 : result = lappend_oid(result, relid);
910 : : }
911 : :
912 : 13 : table_endscan(scan);
913 : : }
914 : :
1974 915 : 172 : table_close(classRel, AccessShareLock);
3152 peter_e@gmx.net 916 : 172 : return result;
917 : : }
918 : :
919 : : /*
920 : : * Gets the list of schema oids for a publication.
921 : : *
922 : : * This should only be used FOR TABLES IN SCHEMA publications.
923 : : */
924 : : List *
1248 tomas.vondra@postgre 925 : 1127 : GetPublicationSchemas(Oid pubid)
926 : : {
1410 akapila@postgresql.o 927 : 1127 : List *result = NIL;
928 : : Relation pubschsrel;
929 : : ScanKeyData scankey;
930 : : SysScanDesc scan;
931 : : HeapTuple tup;
932 : :
933 : : /* Find all schemas associated with the publication */
934 : 1127 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
935 : :
1248 tomas.vondra@postgre 936 : 1127 : ScanKeyInit(&scankey,
937 : : Anum_pg_publication_namespace_pnpubid,
938 : : BTEqualStrategyNumber, F_OIDEQ,
939 : : ObjectIdGetDatum(pubid));
940 : :
1410 akapila@postgresql.o 941 : 1127 : scan = systable_beginscan(pubschsrel,
942 : : PublicationNamespacePnnspidPnpubidIndexId,
943 : : true, NULL, 1, &scankey);
944 [ + + ]: 1177 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
945 : : {
946 : : Form_pg_publication_namespace pubsch;
947 : :
948 : 50 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
949 : :
950 : 50 : result = lappend_oid(result, pubsch->pnnspid);
951 : : }
952 : :
953 : 1127 : systable_endscan(scan);
954 : 1127 : table_close(pubschsrel, AccessShareLock);
955 : :
956 : 1127 : return result;
957 : : }
958 : :
959 : : /*
960 : : * Gets the list of publication oids associated with a specified schema.
961 : : */
962 : : List *
1248 tomas.vondra@postgre 963 : 6414 : GetSchemaPublications(Oid schemaid)
964 : : {
1410 akapila@postgresql.o 965 : 6414 : List *result = NIL;
966 : : CatCList *pubschlist;
967 : : int i;
968 : :
969 : : /* Find all publications associated with the schema */
970 : 6414 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
971 : : ObjectIdGetDatum(schemaid));
972 [ + + ]: 6472 : for (i = 0; i < pubschlist->n_members; i++)
973 : : {
974 : 58 : HeapTuple tup = &pubschlist->members[i]->tuple;
975 : 58 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
976 : :
977 : 58 : result = lappend_oid(result, pubid);
978 : : }
979 : :
980 : 6414 : ReleaseSysCacheList(pubschlist);
981 : :
982 : 6414 : return result;
983 : : }
984 : :
985 : : /*
986 : : * Get the list of publishable relation oids for a specified schema.
987 : : */
988 : : List *
1248 tomas.vondra@postgre 989 : 250 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
990 : : {
991 : : Relation classRel;
992 : : ScanKeyData key[1];
993 : : TableScanDesc scan;
994 : : HeapTuple tuple;
1410 akapila@postgresql.o 995 : 250 : List *result = NIL;
996 : :
997 [ - + ]: 250 : Assert(OidIsValid(schemaid));
998 : :
999 : 250 : classRel = table_open(RelationRelationId, AccessShareLock);
1000 : :
1001 : 250 : ScanKeyInit(&key[0],
1002 : : Anum_pg_class_relnamespace,
1003 : : BTEqualStrategyNumber, F_OIDEQ,
1004 : : ObjectIdGetDatum(schemaid));
1005 : :
1006 : : /* get all the relations present in the specified schema */
1007 : 250 : scan = table_beginscan_catalog(classRel, 1, key);
1008 [ + + ]: 12866 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1009 : : {
1010 : 12616 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1011 : 12616 : Oid relid = relForm->oid;
1012 : : char relkind;
1013 : :
1014 [ + + ]: 12616 : if (!is_publishable_class(relid, relForm))
1015 : 5122 : continue;
1016 : :
1017 : 7494 : relkind = get_rel_relkind(relid);
1248 tomas.vondra@postgre 1018 [ + + ]: 7494 : if (relkind == RELKIND_RELATION)
1019 : 7103 : result = lappend_oid(result, relid);
1020 [ + - ]: 391 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1021 : : {
1410 akapila@postgresql.o 1022 : 391 : List *partitionrels = NIL;
1023 : :
1024 : : /*
1025 : : * It is quite possible that some of the partitions are in a
1026 : : * different schema than the parent table, so we need to get such
1027 : : * partitions separately.
1028 : : */
1029 : 391 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1030 : : pub_partopt,
1031 : : relForm->oid);
1032 : 391 : result = list_concat_unique_oid(result, partitionrels);
1033 : : }
1034 : : }
1035 : :
1036 : 250 : table_endscan(scan);
1037 : 250 : table_close(classRel, AccessShareLock);
1038 : 250 : return result;
1039 : : }
1040 : :
1041 : : /*
1042 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1043 : : * publication.
1044 : : */
1045 : : List *
1248 tomas.vondra@postgre 1046 : 925 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1047 : : {
1410 akapila@postgresql.o 1048 : 925 : List *result = NIL;
1248 tomas.vondra@postgre 1049 : 925 : List *pubschemalist = GetPublicationSchemas(pubid);
1050 : : ListCell *cell;
1051 : :
1410 akapila@postgresql.o 1052 [ + + + + : 960 : foreach(cell, pubschemalist)
+ + ]
1053 : : {
1054 : 35 : Oid schemaid = lfirst_oid(cell);
1055 : 35 : List *schemaRels = NIL;
1056 : :
1248 tomas.vondra@postgre 1057 : 35 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1410 akapila@postgresql.o 1058 : 35 : result = list_concat(result, schemaRels);
1059 : : }
1060 : :
1061 : 925 : return result;
1062 : : }
1063 : :
1064 : : /*
1065 : : * Get publication using oid
1066 : : *
1067 : : * The Publication struct and its data are palloc'ed here.
1068 : : */
1069 : : Publication *
3152 peter_e@gmx.net 1070 : 4529 : GetPublication(Oid pubid)
1071 : : {
1072 : : HeapTuple tup;
1073 : : Publication *pub;
1074 : : Form_pg_publication pubform;
1075 : :
1076 : 4529 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1077 [ - + ]: 4529 : if (!HeapTupleIsValid(tup))
3152 peter_e@gmx.net 1078 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1079 : :
3152 peter_e@gmx.net 1080 :CBC 4529 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1081 : :
1082 : 4529 : pub = (Publication *) palloc(sizeof(Publication));
1083 : 4529 : pub->oid = pubid;
1084 : 4529 : pub->name = pstrdup(NameStr(pubform->pubname));
1085 : 4529 : pub->alltables = pubform->puballtables;
1086 : 4529 : pub->pubactions.pubinsert = pubform->pubinsert;
1087 : 4529 : pub->pubactions.pubupdate = pubform->pubupdate;
1088 : 4529 : pub->pubactions.pubdelete = pubform->pubdelete;
2709 1089 : 4529 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1977 peter@eisentraut.org 1090 : 4529 : pub->pubviaroot = pubform->pubviaroot;
221 akapila@postgresql.o 1091 : 4529 : pub->pubgencols_type = pubform->pubgencols;
1092 : :
3152 peter_e@gmx.net 1093 : 4529 : ReleaseSysCache(tup);
1094 : :
1095 : 4529 : return pub;
1096 : : }
1097 : :
1098 : : /*
1099 : : * Get Publication using name.
1100 : : */
1101 : : Publication *
1102 : 1287 : GetPublicationByName(const char *pubname, bool missing_ok)
1103 : : {
1104 : : Oid oid;
1105 : :
2084 alvherre@alvh.no-ip. 1106 : 1287 : oid = get_publication_oid(pubname, missing_ok);
1107 : :
1108 [ + + ]: 1287 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1109 : : }
1110 : :
1111 : : /*
1112 : : * Get information of the tables in the given publication array.
1113 : : *
1114 : : * Returns pubid, relid, column list, row filter for each table.
1115 : : */
1116 : : Datum
3152 peter_e@gmx.net 1117 : 3059 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1118 : : {
1119 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1120 : : FuncCallContext *funcctx;
892 akapila@postgresql.o 1121 : 3059 : List *table_infos = NIL;
1122 : :
1123 : : /* stuff done only on the first call of the function */
3152 peter_e@gmx.net 1124 [ + + ]: 3059 : if (SRF_IS_FIRSTCALL())
1125 : : {
1126 : : TupleDesc tupdesc;
1127 : : MemoryContext oldcontext;
1128 : : ArrayType *arr;
1129 : : Datum *elems;
1130 : : int nelems,
1131 : : i;
892 akapila@postgresql.o 1132 : 989 : bool viaroot = false;
1133 : :
1134 : : /* create a function context for cross-call persistence */
3152 peter_e@gmx.net 1135 : 989 : funcctx = SRF_FIRSTCALL_INIT();
1136 : :
1137 : : /* switch to memory context appropriate for multiple function calls */
1138 : 989 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1139 : :
1140 : : /*
1141 : : * Deconstruct the parameter into elements where each element is a
1142 : : * publication name.
1143 : : */
892 akapila@postgresql.o 1144 : 989 : arr = PG_GETARG_ARRAYTYPE_P(0);
330 alvherre@alvh.no-ip. 1145 : 989 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1146 : :
1147 : : /* Get Oids of tables from each publication. */
892 akapila@postgresql.o 1148 [ + + ]: 2023 : for (i = 0; i < nelems; i++)
1149 : : {
1150 : : Publication *pub_elem;
1151 : 1034 : List *pub_elem_tables = NIL;
1152 : : ListCell *lc;
1153 : :
1154 : 1034 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1155 : :
1156 : : /*
1157 : : * Publications support partitioned tables. If
1158 : : * publish_via_partition_root is false, all changes are replicated
1159 : : * using leaf partition identity and schema, so we only need
1160 : : * those. Otherwise, get the partitioned table itself.
1161 : : */
1162 [ + + ]: 1034 : if (pub_elem->alltables)
1163 : 172 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1164 : : else
1165 : : {
1166 : : List *relids,
1167 : : *schemarelids;
1168 : :
1169 : 862 : relids = GetPublicationRelations(pub_elem->oid,
1170 : 862 : pub_elem->pubviaroot ?
1171 : 862 : PUBLICATION_PART_ROOT :
1172 : : PUBLICATION_PART_LEAF);
1173 : 862 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1174 : 862 : pub_elem->pubviaroot ?
1175 : 862 : PUBLICATION_PART_ROOT :
1176 : : PUBLICATION_PART_LEAF);
1177 : 862 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1178 : : }
1179 : :
1180 : : /*
1181 : : * Record the published table and the corresponding publication so
1182 : : * that we can get row filters and column lists later.
1183 : : *
1184 : : * When a table is published by multiple publications, to obtain
1185 : : * all row filters and column lists, the structure related to this
1186 : : * table will be recorded multiple times.
1187 : : */
1188 [ + + + + : 3144 : foreach(lc, pub_elem_tables)
+ + ]
1189 : : {
1190 : 2110 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1191 : :
1192 : 2110 : table_info->relid = lfirst_oid(lc);
1193 : 2110 : table_info->pubid = pub_elem->oid;
1194 : 2110 : table_infos = lappend(table_infos, table_info);
1195 : : }
1196 : :
1197 : : /* At least one publication is using publish_via_partition_root. */
1198 [ + + ]: 1034 : if (pub_elem->pubviaroot)
1199 : 184 : viaroot = true;
1200 : : }
1201 : :
1202 : : /*
1203 : : * If the publication publishes partition changes via their respective
1204 : : * root partitioned tables, we must exclude partitions in favor of
1205 : : * including the root partitioned tables. Otherwise, the function
1206 : : * could return both the child and parent tables which could cause
1207 : : * data of the child table to be double-published on the subscriber
1208 : : * side.
1209 : : */
1210 [ + + ]: 989 : if (viaroot)
1211 : 176 : filter_partitions(table_infos);
1212 : :
1213 : : /* Construct a tuple descriptor for the result rows. */
1171 michael@paquier.xyz 1214 : 989 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
892 akapila@postgresql.o 1215 : 989 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1216 : : OIDOID, -1, 0);
1217 : 989 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1218 : : OIDOID, -1, 0);
1219 : 989 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1220 : : INT2VECTOROID, -1, 0);
1221 : 989 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1222 : : PG_NODE_TREEOID, -1, 0);
1223 : :
1206 1224 : 989 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
282 peter@eisentraut.org 1225 : 989 : funcctx->user_fctx = table_infos;
1226 : :
3152 peter_e@gmx.net 1227 : 989 : MemoryContextSwitchTo(oldcontext);
1228 : : }
1229 : :
1230 : : /* stuff done on every call of the function */
1231 : 3059 : funcctx = SRF_PERCALL_SETUP();
892 akapila@postgresql.o 1232 : 3059 : table_infos = (List *) funcctx->user_fctx;
1233 : :
1234 [ + + ]: 3059 : if (funcctx->call_cntr < list_length(table_infos))
1235 : : {
1206 1236 : 2070 : HeapTuple pubtuple = NULL;
1237 : : HeapTuple rettuple;
1238 : : Publication *pub;
892 1239 : 2070 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1240 : 2070 : Oid relid = table_info->relid;
1079 1241 : 2070 : Oid schemaid = get_rel_namespace(relid);
1148 peter@eisentraut.org 1242 : 2070 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1243 : 2070 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1244 : :
1245 : : /*
1246 : : * Form tuple with appropriate data.
1247 : : */
1248 : :
892 akapila@postgresql.o 1249 : 2070 : pub = GetPublication(table_info->pubid);
1250 : :
1251 : 2070 : values[0] = ObjectIdGetDatum(pub->oid);
1252 : 2070 : values[1] = ObjectIdGetDatum(relid);
1253 : :
1254 : : /*
1255 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1256 : : * FOR TABLES IN SCHEMA publications.
1257 : : */
1258 [ + + ]: 2070 : if (!pub->alltables &&
1079 1259 [ + + ]: 1314 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1260 : : ObjectIdGetDatum(schemaid),
1261 : : ObjectIdGetDatum(pub->oid)))
1262 : 1267 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1263 : : ObjectIdGetDatum(relid),
1264 : : ObjectIdGetDatum(pub->oid));
1265 : :
1206 1266 [ + + ]: 2070 : if (HeapTupleIsValid(pubtuple))
1267 : : {
1268 : : /* Lookup the column list attribute. */
892 1269 : 1154 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1270 : : Anum_pg_publication_rel_prattrs,
1271 : : &(nulls[2]));
1272 : :
1273 : : /* Null indicates no filter. */
1274 : 1154 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1275 : : Anum_pg_publication_rel_prqual,
1276 : : &(nulls[3]));
1277 : : }
1278 : : else
1279 : : {
1206 1280 : 916 : nulls[2] = true;
892 1281 : 916 : nulls[3] = true;
1282 : : }
1283 : :
1284 : : /* Show all columns when the column list is not specified. */
1285 [ + + ]: 2070 : if (nulls[2])
1286 : : {
967 1287 : 1946 : Relation rel = table_open(relid, AccessShareLock);
1288 : 1946 : int nattnums = 0;
1289 : : int16 *attnums;
1290 : 1946 : TupleDesc desc = RelationGetDescr(rel);
1291 : : int i;
1292 : :
1293 : 1946 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1294 : :
1295 [ + + ]: 5346 : for (i = 0; i < desc->natts; i++)
1296 : : {
1297 : 3400 : Form_pg_attribute att = TupleDescAttr(desc, i);
1298 : :
226 1299 [ + + ]: 3400 : if (att->attisdropped)
967 1300 : 6 : continue;
1301 : :
226 1302 [ + + ]: 3394 : if (att->attgenerated)
1303 : : {
1304 : : /* We only support replication of STORED generated cols. */
1305 [ + + ]: 48 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1306 : 36 : continue;
1307 : :
1308 : : /*
1309 : : * User hasn't requested to replicate STORED generated
1310 : : * cols.
1311 : : */
1312 [ + + ]: 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1313 : 9 : continue;
1314 : : }
1315 : :
967 1316 : 3349 : attnums[nattnums++] = att->attnum;
1317 : : }
1318 : :
1319 [ + + ]: 1946 : if (nattnums > 0)
1320 : : {
892 1321 : 1924 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1322 : 1924 : nulls[2] = false;
1323 : : }
1324 : :
967 1325 : 1946 : table_close(rel, AccessShareLock);
1326 : : }
1327 : :
1206 1328 : 2070 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1329 : :
1330 : 2070 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1331 : : }
1332 : :
3152 peter_e@gmx.net 1333 : 989 : SRF_RETURN_DONE(funcctx);
1334 : : }
|