Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_foreign_server.h"
23 : : #include "catalog/pg_subscription.h"
24 : : #include "catalog/pg_subscription_rel.h"
25 : : #include "catalog/pg_type.h"
26 : : #include "foreign/foreign.h"
27 : : #include "miscadmin.h"
28 : : #include "storage/lmgr.h"
29 : : #include "storage/lock.h"
30 : : #include "utils/acl.h"
31 : : #include "utils/array.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/fmgroids.h"
34 : : #include "utils/lsyscache.h"
35 : : #include "utils/memutils.h"
36 : : #include "utils/pg_lsn.h"
37 : : #include "utils/rel.h"
38 : : #include "utils/syscache.h"
39 : :
40 : : static List *textarray_to_stringlist(ArrayType *textarray);
41 : :
42 : : /*
43 : : * Add a comma-separated list of publication names to the 'dest' string.
44 : : */
45 : : void
557 michael@paquier.xyz 46 :CBC 543 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
47 : : {
48 : : ListCell *lc;
49 : 543 : bool first = true;
50 : :
51 [ - + ]: 543 : Assert(publications != NIL);
52 : :
53 [ + - + + : 1394 : foreach(lc, publications)
+ + ]
54 : : {
55 : 851 : char *pubname = strVal(lfirst(lc));
56 : :
57 [ + + ]: 851 : if (first)
58 : 543 : first = false;
59 : : else
60 : 308 : appendStringInfoString(dest, ", ");
61 : :
62 [ + + ]: 851 : if (quote_literal)
63 : 841 : appendStringInfoString(dest, quote_literal_cstr(pubname));
64 : : else
65 : : {
66 : 10 : appendStringInfoChar(dest, '"');
67 : 10 : appendStringInfoString(dest, pubname);
68 : 10 : appendStringInfoChar(dest, '"');
69 : : }
70 : : }
71 : 543 : }
72 : :
73 : : /*
74 : : * Fetch the subscription from the syscache.
75 : : */
76 : : Subscription *
60 jdavis@postgresql.or 77 :GNC 990 : GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
78 : : {
79 : : HeapTuple tup;
80 : : Subscription *sub;
81 : : Form_pg_subscription subform;
82 : : Datum datum;
83 : : bool isnull;
84 : : MemoryContext cxt;
85 : : MemoryContext oldcxt;
86 : :
3393 peter_e@gmx.net 87 :CBC 990 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
88 : :
89 [ + + ]: 990 : if (!HeapTupleIsValid(tup))
90 : : {
91 [ + - ]: 66 : if (missing_ok)
92 : 66 : return NULL;
93 : :
3393 peter_e@gmx.net 94 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
95 : : }
96 : :
42 jdavis@postgresql.or 97 :GNC 924 : cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
98 : : ALLOCSET_SMALL_SIZES);
99 : 924 : oldcxt = MemoryContextSwitchTo(cxt);
100 : :
3393 peter_e@gmx.net 101 :CBC 924 : subform = (Form_pg_subscription) GETSTRUCT(tup);
102 : :
146 michael@paquier.xyz 103 :GNC 924 : sub = palloc_object(Subscription);
42 jdavis@postgresql.or 104 : 924 : sub->cxt = cxt;
3393 peter_e@gmx.net 105 :CBC 924 : sub->oid = subid;
106 : 924 : sub->dbid = subform->subdbid;
1489 akapila@postgresql.o 107 : 924 : sub->skiplsn = subform->subskiplsn;
3393 peter_e@gmx.net 108 : 924 : sub->name = pstrdup(NameStr(subform->subname));
109 : 924 : sub->owner = subform->subowner;
110 : 924 : sub->enabled = subform->subenabled;
2117 tgl@sss.pgh.pa.us 111 : 924 : sub->binary = subform->subbinary;
2070 akapila@postgresql.o 112 : 924 : sub->stream = subform->substream;
1756 113 : 924 : sub->twophasestate = subform->subtwophasestate;
1513 114 : 924 : sub->disableonerr = subform->subdisableonerr;
1132 rhaas@postgresql.org 115 : 924 : sub->passwordrequired = subform->subpasswordrequired;
1127 116 : 924 : sub->runasowner = subform->subrunasowner;
826 akapila@postgresql.o 117 : 924 : sub->failover = subform->subfailover;
286 akapila@postgresql.o 118 :GNC 924 : sub->retaindeadtuples = subform->subretaindeadtuples;
245 119 : 924 : sub->maxretention = subform->submaxretention;
120 : 924 : sub->retentionactive = subform->subretentionactive;
121 : :
122 : : /* Get conninfo */
60 jdavis@postgresql.or 123 [ + + ]: 924 : if (OidIsValid(subform->subserver))
124 : : {
125 : : AclResult aclresult;
126 : : ForeignServer *server;
127 : :
42 128 : 9 : server = GetForeignServer(subform->subserver);
129 : :
130 : : /* recheck ACL if requested */
60 131 [ + + ]: 9 : if (aclcheck)
132 : : {
133 : 4 : aclresult = object_aclcheck(ForeignServerRelationId,
134 : : subform->subserver,
135 : : subform->subowner, ACL_USAGE);
136 : :
137 [ - + ]: 4 : if (aclresult != ACLCHECK_OK)
60 jdavis@postgresql.or 138 [ # # ]:UNC 0 : ereport(ERROR,
139 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
140 : : errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
141 : : GetUserNameFromId(subform->subowner, false),
142 : : server->servername)));
143 : : }
144 : :
60 jdavis@postgresql.or 145 :GNC 9 : sub->conninfo = ForeignServerConnectionString(subform->subowner,
146 : : server);
147 : : }
148 : : else
149 : : {
150 : 915 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
151 : : tup,
152 : : Anum_pg_subscription_subconninfo);
153 : 915 : sub->conninfo = TextDatumGetCString(datum);
154 : : }
155 : :
156 : : /* Get slotname */
3393 peter_e@gmx.net 157 :CBC 924 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
158 : : tup,
159 : : Anum_pg_subscription_subslotname,
160 : : &isnull);
3283 161 [ + + ]: 924 : if (!isnull)
162 : 880 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
163 : : else
164 : 44 : sub->slotname = NULL;
165 : :
166 : : /* Get synccommit */
1137 dgustafsson@postgres 167 : 924 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
168 : : tup,
169 : : Anum_pg_subscription_subsynccommit);
3308 peter_e@gmx.net 170 : 924 : sub->synccommit = TextDatumGetCString(datum);
171 : :
172 : : /* Get walrcvtimeout */
74 fujii@postgresql.org 173 :GNC 924 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
174 : : tup,
175 : : Anum_pg_subscription_subwalrcvtimeout);
176 : 924 : sub->walrcvtimeout = TextDatumGetCString(datum);
177 : :
178 : : /* Get publications */
1137 dgustafsson@postgres 179 :CBC 924 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
180 : : tup,
181 : : Anum_pg_subscription_subpublications);
3393 peter_e@gmx.net 182 : 924 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
183 : :
184 : : /* Get origin */
1137 dgustafsson@postgres 185 : 924 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
186 : : tup,
187 : : Anum_pg_subscription_suborigin);
1384 akapila@postgresql.o 188 : 924 : sub->origin = TextDatumGetCString(datum);
189 : :
190 : : /* Is the subscription owner a superuser? */
931 191 : 924 : sub->ownersuperuser = superuser_arg(sub->owner);
192 : :
3393 peter_e@gmx.net 193 : 924 : ReleaseSysCache(tup);
194 : :
42 jdavis@postgresql.or 195 :GNC 924 : MemoryContextSwitchTo(oldcxt);
196 : :
3393 peter_e@gmx.net 197 :CBC 924 : return sub;
198 : : }
199 : :
200 : : /*
201 : : * Return number of subscriptions defined in given database.
202 : : * Used by dropdb() to check if database can indeed be dropped.
203 : : */
204 : : int
205 : 49 : CountDBSubscriptions(Oid dbid)
206 : : {
3275 bruce@momjian.us 207 : 49 : int nsubs = 0;
208 : : Relation rel;
209 : : ScanKeyData scankey;
210 : : SysScanDesc scan;
211 : : HeapTuple tup;
212 : :
2661 andres@anarazel.de 213 : 49 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
214 : :
3393 peter_e@gmx.net 215 : 49 : ScanKeyInit(&scankey,
216 : : Anum_pg_subscription_subdbid,
217 : : BTEqualStrategyNumber, F_OIDEQ,
218 : : ObjectIdGetDatum(dbid));
219 : :
220 : 49 : scan = systable_beginscan(rel, InvalidOid, false,
221 : : NULL, 1, &scankey);
222 : :
223 [ - + ]: 49 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3393 peter_e@gmx.net 224 :UBC 0 : nsubs++;
225 : :
3393 peter_e@gmx.net 226 :CBC 49 : systable_endscan(scan);
227 : :
2661 andres@anarazel.de 228 : 49 : table_close(rel, NoLock);
229 : :
3393 peter_e@gmx.net 230 : 49 : return nsubs;
231 : : }
232 : :
233 : : /*
234 : : * Disable the given subscription.
235 : : */
236 : : void
1513 akapila@postgresql.o 237 : 4 : DisableSubscription(Oid subid)
238 : : {
239 : : Relation rel;
240 : : bool nulls[Natts_pg_subscription];
241 : : bool replaces[Natts_pg_subscription];
242 : : Datum values[Natts_pg_subscription];
243 : : HeapTuple tup;
244 : :
245 : : /* Look up the subscription in the catalog */
246 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
247 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
248 : :
249 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
1513 akapila@postgresql.o 250 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
251 : :
1513 akapila@postgresql.o 252 :CBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
253 : :
254 : : /* Form a new tuple. */
255 : 4 : memset(values, 0, sizeof(values));
256 : 4 : memset(nulls, false, sizeof(nulls));
257 : 4 : memset(replaces, false, sizeof(replaces));
258 : :
259 : : /* Set the subscription to disabled. */
260 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
261 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
262 : :
263 : : /* Update the catalog */
264 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
265 : : replaces);
266 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
267 : 4 : heap_freetuple(tup);
268 : :
269 : 4 : table_close(rel, NoLock);
270 : 4 : }
271 : :
272 : : /*
273 : : * Convert text array to list of strings.
274 : : *
275 : : * Note: the resulting list of strings is pallocated here.
276 : : */
277 : : static List *
3393 peter_e@gmx.net 278 : 924 : textarray_to_stringlist(ArrayType *textarray)
279 : : {
280 : : Datum *elems;
281 : : int nelems,
282 : : i;
3275 bruce@momjian.us 283 : 924 : List *res = NIL;
284 : :
1404 peter@eisentraut.org 285 : 924 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
286 : :
3393 peter_e@gmx.net 287 [ - + ]: 924 : if (nelems == 0)
3393 peter_e@gmx.net 288 :UBC 0 : return NIL;
289 : :
3393 peter_e@gmx.net 290 [ + + ]:CBC 2269 : for (i = 0; i < nelems; i++)
3308 291 : 1345 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
292 : :
3393 293 : 924 : return res;
294 : : }
295 : :
296 : : /*
297 : : * Add new state record for a subscription table.
298 : : *
299 : : * If retain_lock is true, then don't release the locks taken in this function.
300 : : * We normally release the locks at the end of transaction but in binary-upgrade
301 : : * mode, we expect to release those immediately.
302 : : */
303 : : void
2951 304 : 228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
305 : : XLogRecPtr sublsn, bool retain_lock)
306 : : {
307 : : Relation rel;
308 : : HeapTuple tup;
309 : : bool nulls[Natts_pg_subscription_rel];
310 : : Datum values[Natts_pg_subscription_rel];
311 : :
3228 312 : 228 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
313 : :
2661 andres@anarazel.de 314 : 228 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
315 : :
316 : : /* Try finding existing mapping. */
3330 peter_e@gmx.net 317 : 228 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
318 : : ObjectIdGetDatum(relid),
319 : : ObjectIdGetDatum(subid));
2951 320 [ - + ]: 228 : if (HeapTupleIsValid(tup))
194 akapila@postgresql.o 321 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
322 : : relid, subid);
323 : :
324 : : /* Form the tuple. */
2951 peter_e@gmx.net 325 :CBC 228 : memset(values, 0, sizeof(values));
326 : 228 : memset(nulls, false, sizeof(nulls));
327 : 228 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
328 : 228 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
329 : 228 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
180 alvherre@kurilemu.de 330 [ + + ]:GNC 228 : if (XLogRecPtrIsValid(sublsn))
2951 peter_e@gmx.net 331 :CBC 2 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
332 : : else
333 : 226 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
334 : :
335 : 228 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
336 : :
337 : : /* Insert tuple into catalog. */
2723 andres@anarazel.de 338 : 228 : CatalogTupleInsert(rel, tup);
339 : :
2951 peter_e@gmx.net 340 : 228 : heap_freetuple(tup);
341 : :
342 : : /* Cleanup. */
854 akapila@postgresql.o 343 [ + + ]: 228 : if (retain_lock)
344 : : {
345 : 225 : table_close(rel, NoLock);
346 : : }
347 : : else
348 : : {
349 : 3 : table_close(rel, RowExclusiveLock);
350 : 3 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
351 : : }
2951 peter_e@gmx.net 352 : 228 : }
353 : :
354 : : /*
355 : : * Update the state of a subscription table.
356 : : */
357 : : void
358 : 819 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
359 : : XLogRecPtr sublsn, bool already_locked)
360 : : {
361 : : Relation rel;
362 : : HeapTuple tup;
363 : : bool nulls[Natts_pg_subscription_rel];
364 : : Datum values[Natts_pg_subscription_rel];
365 : : bool replaces[Natts_pg_subscription_rel];
366 : :
277 akapila@postgresql.o 367 [ + + ]: 819 : if (already_locked)
368 : : {
369 : : #ifdef USE_ASSERT_CHECKING
370 : : LOCKTAG tag;
371 : :
372 [ - + ]: 190 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
373 : : RowExclusiveLock, true));
374 : 190 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
375 [ - + ]: 190 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
376 : : #endif
377 : :
378 : 190 : rel = table_open(SubscriptionRelRelationId, NoLock);
379 : : }
380 : : else
381 : : {
382 : 629 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
383 : 629 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
384 : : }
385 : :
386 : : /* Try finding existing mapping. */
2951 peter_e@gmx.net 387 : 819 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
388 : : ObjectIdGetDatum(relid),
389 : : ObjectIdGetDatum(subid));
390 [ - + ]: 819 : if (!HeapTupleIsValid(tup))
181 akapila@postgresql.o 391 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
392 : : relid, subid);
393 : :
394 : : /* Update the tuple. */
2951 peter_e@gmx.net 395 :CBC 819 : memset(values, 0, sizeof(values));
396 : 819 : memset(nulls, false, sizeof(nulls));
397 : 819 : memset(replaces, false, sizeof(replaces));
398 : :
399 : 819 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
400 : 819 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
401 : :
402 : 819 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
180 alvherre@kurilemu.de 403 [ + + ]:GNC 819 : if (XLogRecPtrIsValid(sublsn))
2951 peter_e@gmx.net 404 :CBC 401 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
405 : : else
406 : 418 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
407 : :
408 : 819 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
409 : : replaces);
410 : :
411 : : /* Update the catalog. */
412 : 819 : CatalogTupleUpdate(rel, &tup->t_self, tup);
413 : :
414 : : /* Cleanup. */
2661 andres@anarazel.de 415 : 819 : table_close(rel, NoLock);
3330 peter_e@gmx.net 416 : 819 : }
417 : :
418 : : /*
419 : : * Get state of subscription table.
420 : : *
421 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
422 : : */
423 : : char
2028 alvherre@alvh.no-ip. 424 : 1299 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
425 : : {
426 : : HeapTuple tup;
427 : : char substate;
428 : : bool isnull;
429 : : Datum d;
430 : : Relation rel;
431 : :
432 : : /*
433 : : * This is to avoid the race condition with AlterSubscription which tries
434 : : * to remove this relstate.
435 : : */
1908 akapila@postgresql.o 436 : 1299 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
437 : :
438 : : /* Try finding the mapping. */
3330 peter_e@gmx.net 439 : 1299 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
440 : : ObjectIdGetDatum(relid),
441 : : ObjectIdGetDatum(subid));
442 : :
443 [ + + ]: 1299 : if (!HeapTupleIsValid(tup))
444 : : {
1895 akapila@postgresql.o 445 : 33 : table_close(rel, AccessShareLock);
2028 alvherre@alvh.no-ip. 446 : 33 : *sublsn = InvalidXLogRecPtr;
447 : 33 : return SUBREL_STATE_UNKNOWN;
448 : : }
449 : :
450 : : /* Get the state. */
451 : 1266 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
452 : :
453 : : /* Get the LSN */
3330 peter_e@gmx.net 454 : 1266 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
455 : : Anum_pg_subscription_rel_srsublsn, &isnull);
456 [ + + ]: 1266 : if (isnull)
457 : 710 : *sublsn = InvalidXLogRecPtr;
458 : : else
459 : 556 : *sublsn = DatumGetLSN(d);
460 : :
461 : : /* Cleanup */
462 : 1266 : ReleaseSysCache(tup);
463 : :
1908 akapila@postgresql.o 464 : 1266 : table_close(rel, AccessShareLock);
465 : :
3330 peter_e@gmx.net 466 : 1266 : return substate;
467 : : }
468 : :
469 : : /*
470 : : * Drop subscription relation mapping. These can be for a particular
471 : : * subscription, or for a particular relation, or both.
472 : : */
473 : : void
474 : 33482 : RemoveSubscriptionRel(Oid subid, Oid relid)
475 : : {
476 : : Relation rel;
477 : : TableScanDesc scan;
478 : : ScanKeyData skey[2];
479 : : HeapTuple tup;
480 : 33482 : int nkeys = 0;
481 : :
2661 andres@anarazel.de 482 : 33482 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
483 : :
3330 peter_e@gmx.net 484 [ + + ]: 33482 : if (OidIsValid(subid))
485 : : {
486 : 163 : ScanKeyInit(&skey[nkeys++],
487 : : Anum_pg_subscription_rel_srsubid,
488 : : BTEqualStrategyNumber,
489 : : F_OIDEQ,
490 : : ObjectIdGetDatum(subid));
491 : : }
492 : :
493 [ + + ]: 33482 : if (OidIsValid(relid))
494 : : {
495 : 33340 : ScanKeyInit(&skey[nkeys++],
496 : : Anum_pg_subscription_rel_srrelid,
497 : : BTEqualStrategyNumber,
498 : : F_OIDEQ,
499 : : ObjectIdGetDatum(relid));
500 : : }
501 : :
502 : : /* Do the search and delete what we found. */
2612 andres@anarazel.de 503 : 33482 : scan = table_beginscan_catalog(rel, nkeys, skey);
3330 peter_e@gmx.net 504 [ + + ]: 33611 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
505 : : {
506 : : Form_pg_subscription_rel subrel;
507 : :
1908 akapila@postgresql.o 508 : 129 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
509 : :
510 : : /*
511 : : * We don't allow to drop the relation mapping when the table
512 : : * synchronization is in progress unless the caller updates the
513 : : * corresponding subscription as well. This is to ensure that we don't
514 : : * leave tablesync slots or origins in the system when the
515 : : * corresponding table is dropped. For sequences, however, it's ok to
516 : : * drop them since no separate slots or origins are created during
517 : : * synchronization.
518 : : */
194 akapila@postgresql.o 519 [ + + ]:GNC 129 : if (!OidIsValid(subid) &&
520 [ - + - - ]: 16 : subrel->srsubstate != SUBREL_STATE_READY &&
194 akapila@postgresql.o 521 :UNC 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
522 : : {
1908 akapila@postgresql.o 523 [ # # ]:UBC 0 : ereport(ERROR,
524 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
525 : : errmsg("could not drop relation mapping for subscription \"%s\"",
526 : : get_subscription_name(subrel->srsubid, false)),
527 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
528 : : get_rel_name(relid), subrel->srsubstate),
529 : :
530 : : /*
531 : : * translator: first %s is a SQL ALTER command and second %s is a
532 : : * SQL DROP command
533 : : */
534 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
535 : : "ALTER SUBSCRIPTION ... ENABLE",
536 : : "DROP SUBSCRIPTION ...")));
537 : : }
538 : :
3247 tgl@sss.pgh.pa.us 539 :CBC 129 : CatalogTupleDelete(rel, &tup->t_self);
540 : : }
2612 andres@anarazel.de 541 : 33482 : table_endscan(scan);
542 : :
2661 543 : 33482 : table_close(rel, RowExclusiveLock);
3330 peter_e@gmx.net 544 : 33482 : }
545 : :
546 : : /*
547 : : * Does the subscription have any tables?
548 : : *
549 : : * Use this function only to know true/false, and when you have no need for the
550 : : * List returned by GetSubscriptionRelations.
551 : : */
552 : : bool
201 akapila@postgresql.o 553 :GNC 263 : HasSubscriptionTables(Oid subid)
554 : : {
555 : : Relation rel;
556 : : ScanKeyData skey[1];
557 : : SysScanDesc scan;
558 : : HeapTuple tup;
194 559 : 263 : bool has_subtables = false;
560 : :
1756 akapila@postgresql.o 561 :CBC 263 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
562 : :
563 : 263 : ScanKeyInit(&skey[0],
564 : : Anum_pg_subscription_rel_srsubid,
565 : : BTEqualStrategyNumber, F_OIDEQ,
566 : : ObjectIdGetDatum(subid));
567 : :
568 : 263 : scan = systable_beginscan(rel, InvalidOid, false,
569 : : NULL, 1, skey);
570 : :
194 akapila@postgresql.o 571 [ + + ]:GNC 307 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
572 : : {
573 : : Form_pg_subscription_rel subrel;
574 : : char relkind;
575 : :
576 : 290 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
577 : 290 : relkind = get_rel_relkind(subrel->srrelid);
578 : :
579 [ + + + + ]: 290 : if (relkind == RELKIND_RELATION ||
580 : : relkind == RELKIND_PARTITIONED_TABLE)
581 : : {
582 : 246 : has_subtables = true;
583 : 246 : break;
584 : : }
585 : : }
586 : :
587 : : /* Cleanup */
1756 akapila@postgresql.o 588 :CBC 263 : systable_endscan(scan);
589 : 263 : table_close(rel, AccessShareLock);
590 : :
194 akapila@postgresql.o 591 :GNC 263 : return has_subtables;
592 : : }
593 : :
594 : : /*
595 : : * Get the relations for the subscription.
596 : : *
597 : : * If not_ready is true, return only the relations that are not in a ready
598 : : * state, otherwise return all the relations of the subscription. The
599 : : * returned list is palloc'ed in the current memory context.
600 : : */
601 : : List *
602 : 1183 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
603 : : bool not_ready)
604 : : {
3330 peter_e@gmx.net 605 :CBC 1183 : List *res = NIL;
606 : : Relation rel;
607 : : HeapTuple tup;
608 : 1183 : int nkeys = 0;
609 : : ScanKeyData skey[2];
610 : : SysScanDesc scan;
611 : :
612 : : /* One or both of 'tables' and 'sequences' must be true. */
194 akapila@postgresql.o 613 [ + + - + ]:GNC 1183 : Assert(tables || sequences);
614 : :
2661 andres@anarazel.de 615 :CBC 1183 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
616 : :
3330 peter_e@gmx.net 617 : 1183 : ScanKeyInit(&skey[nkeys++],
618 : : Anum_pg_subscription_rel_srsubid,
619 : : BTEqualStrategyNumber, F_OIDEQ,
620 : : ObjectIdGetDatum(subid));
621 : :
1378 michael@paquier.xyz 622 [ + + ]: 1183 : if (not_ready)
623 : 1143 : ScanKeyInit(&skey[nkeys++],
624 : : Anum_pg_subscription_rel_srsubstate,
625 : : BTEqualStrategyNumber, F_CHARNE,
626 : : CharGetDatum(SUBREL_STATE_READY));
627 : :
3330 peter_e@gmx.net 628 : 1183 : scan = systable_beginscan(rel, InvalidOid, false,
629 : : NULL, nkeys, skey);
630 : :
631 [ + + ]: 3143 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
632 : : {
633 : : Form_pg_subscription_rel subrel;
634 : : SubscriptionRelState *relstate;
635 : : Datum d;
636 : : bool isnull;
637 : : char relkind;
638 : :
639 : 1960 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
640 : :
641 : : /* Relation is either a sequence or a table */
194 akapila@postgresql.o 642 :GNC 1960 : relkind = get_rel_relkind(subrel->srrelid);
643 [ + + + + : 1960 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
- + ]
644 : : relkind == RELKIND_PARTITIONED_TABLE);
645 : :
646 : : /* Skip sequences if they were not requested */
647 [ + + - + ]: 1960 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
194 akapila@postgresql.o 648 :UNC 0 : continue;
649 : :
650 : : /* Skip tables if they were not requested */
194 akapila@postgresql.o 651 [ + + + + ]:GNC 1960 : if ((relkind == RELKIND_RELATION ||
652 [ - + ]: 1921 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
194 akapila@postgresql.o 653 :UNC 0 : continue;
654 : :
146 michael@paquier.xyz 655 :GNC 1960 : relstate = palloc_object(SubscriptionRelState);
3330 peter_e@gmx.net 656 :CBC 1960 : relstate->relid = subrel->srrelid;
657 : 1960 : relstate->state = subrel->srsubstate;
2115 tgl@sss.pgh.pa.us 658 : 1960 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
659 : : Anum_pg_subscription_rel_srsublsn, &isnull);
660 [ + + ]: 1960 : if (isnull)
661 : 1607 : relstate->lsn = InvalidXLogRecPtr;
662 : : else
663 : 353 : relstate->lsn = DatumGetLSN(d);
664 : :
3330 peter_e@gmx.net 665 : 1960 : res = lappend(res, relstate);
666 : : }
667 : :
668 : : /* Cleanup */
669 : 1183 : systable_endscan(scan);
2661 andres@anarazel.de 670 : 1183 : table_close(rel, AccessShareLock);
671 : :
3330 peter_e@gmx.net 672 : 1183 : return res;
673 : : }
674 : :
675 : : /*
676 : : * Update the dead tuple retention status for the given subscription.
677 : : */
678 : : void
245 akapila@postgresql.o 679 :GNC 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
680 : : {
681 : : Relation rel;
682 : : bool nulls[Natts_pg_subscription];
683 : : bool replaces[Natts_pg_subscription];
684 : : Datum values[Natts_pg_subscription];
685 : : HeapTuple tup;
686 : :
687 : : /* Look up the subscription in the catalog */
688 : 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
689 : 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
690 : :
691 [ - + ]: 2 : if (!HeapTupleIsValid(tup))
245 akapila@postgresql.o 692 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
693 : :
245 akapila@postgresql.o 694 :GNC 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
695 : :
696 : : /* Form a new tuple. */
697 : 2 : memset(values, 0, sizeof(values));
698 : 2 : memset(nulls, false, sizeof(nulls));
699 : 2 : memset(replaces, false, sizeof(replaces));
700 : :
701 : : /* Set the subscription to disabled. */
15 peter@eisentraut.org 702 : 2 : values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
245 akapila@postgresql.o 703 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
704 : :
705 : : /* Update the catalog */
706 : 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
707 : : replaces);
708 : 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
709 : 2 : heap_freetuple(tup);
710 : :
711 : 2 : table_close(rel, NoLock);
712 : 2 : }
|