Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_subscription.h"
23 : : #include "catalog/pg_subscription_rel.h"
24 : : #include "catalog/pg_type.h"
25 : : #include "miscadmin.h"
26 : : #include "storage/lmgr.h"
27 : : #include "utils/array.h"
28 : : #include "utils/builtins.h"
29 : : #include "utils/fmgroids.h"
30 : : #include "utils/lsyscache.h"
31 : : #include "utils/pg_lsn.h"
32 : : #include "utils/rel.h"
33 : : #include "utils/syscache.h"
34 : :
35 : : static List *textarray_to_stringlist(ArrayType *textarray);
36 : :
37 : : /*
38 : : * Add a comma-separated list of publication names to the 'dest' string.
39 : : */
40 : : void
417 michael@paquier.xyz 41 :CBC 515 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : : {
43 : : ListCell *lc;
44 : 515 : bool first = true;
45 : :
46 [ - + ]: 515 : Assert(publications != NIL);
47 : :
48 [ + - + + : 1333 : foreach(lc, publications)
+ + ]
49 : : {
50 : 818 : char *pubname = strVal(lfirst(lc));
51 : :
52 [ + + ]: 818 : if (first)
53 : 515 : first = false;
54 : : else
55 : 303 : appendStringInfoString(dest, ", ");
56 : :
57 [ + + ]: 818 : if (quote_literal)
58 : 808 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : : else
60 : : {
61 : 10 : appendStringInfoChar(dest, '"');
62 : 10 : appendStringInfoString(dest, pubname);
63 : 10 : appendStringInfoChar(dest, '"');
64 : : }
65 : : }
66 : 515 : }
67 : :
68 : : /*
69 : : * Fetch the subscription from the syscache.
70 : : */
71 : : Subscription *
3253 peter_e@gmx.net 72 : 881 : GetSubscription(Oid subid, bool missing_ok)
73 : : {
74 : : HeapTuple tup;
75 : : Subscription *sub;
76 : : Form_pg_subscription subform;
77 : : Datum datum;
78 : : bool isnull;
79 : :
80 : 881 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 : :
82 [ + + ]: 881 : if (!HeapTupleIsValid(tup))
83 : : {
84 [ + - ]: 61 : if (missing_ok)
85 : 61 : return NULL;
86 : :
3253 peter_e@gmx.net 87 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : : }
89 : :
3253 peter_e@gmx.net 90 :CBC 820 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 : :
6 michael@paquier.xyz 92 :GNC 820 : sub = palloc_object(Subscription);
3253 peter_e@gmx.net 93 :CBC 820 : sub->oid = subid;
94 : 820 : sub->dbid = subform->subdbid;
1349 akapila@postgresql.o 95 : 820 : sub->skiplsn = subform->subskiplsn;
3253 peter_e@gmx.net 96 : 820 : sub->name = pstrdup(NameStr(subform->subname));
97 : 820 : sub->owner = subform->subowner;
98 : 820 : sub->enabled = subform->subenabled;
1977 tgl@sss.pgh.pa.us 99 : 820 : sub->binary = subform->subbinary;
1930 akapila@postgresql.o 100 : 820 : sub->stream = subform->substream;
1616 101 : 820 : sub->twophasestate = subform->subtwophasestate;
1373 102 : 820 : sub->disableonerr = subform->subdisableonerr;
992 rhaas@postgresql.org 103 : 820 : sub->passwordrequired = subform->subpasswordrequired;
987 104 : 820 : sub->runasowner = subform->subrunasowner;
686 akapila@postgresql.o 105 : 820 : sub->failover = subform->subfailover;
146 akapila@postgresql.o 106 :GNC 820 : sub->retaindeadtuples = subform->subretaindeadtuples;
105 107 : 820 : sub->maxretention = subform->submaxretention;
108 : 820 : sub->retentionactive = subform->subretentionactive;
109 : :
110 : : /* Get conninfo */
997 dgustafsson@postgres 111 :CBC 820 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
112 : : tup,
113 : : Anum_pg_subscription_subconninfo);
3168 peter_e@gmx.net 114 : 820 : sub->conninfo = TextDatumGetCString(datum);
115 : :
116 : : /* Get slotname */
3253 117 : 820 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
118 : : tup,
119 : : Anum_pg_subscription_subslotname,
120 : : &isnull);
3143 121 [ + + ]: 820 : if (!isnull)
122 : 787 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
123 : : else
124 : 33 : sub->slotname = NULL;
125 : :
126 : : /* Get synccommit */
997 dgustafsson@postgres 127 : 820 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
128 : : tup,
129 : : Anum_pg_subscription_subsynccommit);
3168 peter_e@gmx.net 130 : 820 : sub->synccommit = TextDatumGetCString(datum);
131 : :
132 : : /* Get publications */
997 dgustafsson@postgres 133 : 820 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134 : : tup,
135 : : Anum_pg_subscription_subpublications);
3253 peter_e@gmx.net 136 : 820 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
137 : :
138 : : /* Get origin */
997 dgustafsson@postgres 139 : 820 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
140 : : tup,
141 : : Anum_pg_subscription_suborigin);
1244 akapila@postgresql.o 142 : 820 : sub->origin = TextDatumGetCString(datum);
143 : :
144 : : /* Is the subscription owner a superuser? */
791 145 : 820 : sub->ownersuperuser = superuser_arg(sub->owner);
146 : :
3253 peter_e@gmx.net 147 : 820 : ReleaseSysCache(tup);
148 : :
149 : 820 : return sub;
150 : : }
151 : :
152 : : /*
153 : : * Return number of subscriptions defined in given database.
154 : : * Used by dropdb() to check if database can indeed be dropped.
155 : : */
156 : : int
157 : 46 : CountDBSubscriptions(Oid dbid)
158 : : {
3135 bruce@momjian.us 159 : 46 : int nsubs = 0;
160 : : Relation rel;
161 : : ScanKeyData scankey;
162 : : SysScanDesc scan;
163 : : HeapTuple tup;
164 : :
2521 andres@anarazel.de 165 : 46 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
166 : :
3253 peter_e@gmx.net 167 : 46 : ScanKeyInit(&scankey,
168 : : Anum_pg_subscription_subdbid,
169 : : BTEqualStrategyNumber, F_OIDEQ,
170 : : ObjectIdGetDatum(dbid));
171 : :
172 : 46 : scan = systable_beginscan(rel, InvalidOid, false,
173 : : NULL, 1, &scankey);
174 : :
175 [ - + ]: 46 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3253 peter_e@gmx.net 176 :UBC 0 : nsubs++;
177 : :
3253 peter_e@gmx.net 178 :CBC 46 : systable_endscan(scan);
179 : :
2521 andres@anarazel.de 180 : 46 : table_close(rel, NoLock);
181 : :
3253 peter_e@gmx.net 182 : 46 : return nsubs;
183 : : }
184 : :
185 : : /*
186 : : * Free memory allocated by subscription struct.
187 : : */
188 : : void
189 : 42 : FreeSubscription(Subscription *sub)
190 : : {
191 : 42 : pfree(sub->name);
192 : 42 : pfree(sub->conninfo);
3143 193 [ + - ]: 42 : if (sub->slotname)
194 : 42 : pfree(sub->slotname);
3253 195 : 42 : list_free_deep(sub->publications);
196 : 42 : pfree(sub);
197 : 42 : }
198 : :
199 : : /*
200 : : * Disable the given subscription.
201 : : */
202 : : void
1373 akapila@postgresql.o 203 : 4 : DisableSubscription(Oid subid)
204 : : {
205 : : Relation rel;
206 : : bool nulls[Natts_pg_subscription];
207 : : bool replaces[Natts_pg_subscription];
208 : : Datum values[Natts_pg_subscription];
209 : : HeapTuple tup;
210 : :
211 : : /* Look up the subscription in the catalog */
212 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
213 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
214 : :
215 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
1373 akapila@postgresql.o 216 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
217 : :
1373 akapila@postgresql.o 218 :CBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
219 : :
220 : : /* Form a new tuple. */
221 : 4 : memset(values, 0, sizeof(values));
222 : 4 : memset(nulls, false, sizeof(nulls));
223 : 4 : memset(replaces, false, sizeof(replaces));
224 : :
225 : : /* Set the subscription to disabled. */
226 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
227 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
228 : :
229 : : /* Update the catalog */
230 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
231 : : replaces);
232 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
233 : 4 : heap_freetuple(tup);
234 : :
235 : 4 : table_close(rel, NoLock);
236 : 4 : }
237 : :
238 : : /*
239 : : * Convert text array to list of strings.
240 : : *
241 : : * Note: the resulting list of strings is pallocated here.
242 : : */
243 : : static List *
3253 peter_e@gmx.net 244 : 820 : textarray_to_stringlist(ArrayType *textarray)
245 : : {
246 : : Datum *elems;
247 : : int nelems,
248 : : i;
3135 bruce@momjian.us 249 : 820 : List *res = NIL;
250 : :
1264 peter@eisentraut.org 251 : 820 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
252 : :
3253 peter_e@gmx.net 253 [ - + ]: 820 : if (nelems == 0)
3253 peter_e@gmx.net 254 :UBC 0 : return NIL;
255 : :
3253 peter_e@gmx.net 256 [ + + ]:CBC 2018 : for (i = 0; i < nelems; i++)
3168 257 : 1198 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
258 : :
3253 259 : 820 : return res;
260 : : }
261 : :
262 : : /*
263 : : * Add new state record for a subscription table.
264 : : *
265 : : * If retain_lock is true, then don't release the locks taken in this function.
266 : : * We normally release the locks at the end of transaction but in binary-upgrade
267 : : * mode, we expect to release those immediately.
268 : : */
269 : : void
2811 270 : 213 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
271 : : XLogRecPtr sublsn, bool retain_lock)
272 : : {
273 : : Relation rel;
274 : : HeapTuple tup;
275 : : bool nulls[Natts_pg_subscription_rel];
276 : : Datum values[Natts_pg_subscription_rel];
277 : :
3088 278 : 213 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
279 : :
2521 andres@anarazel.de 280 : 213 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
281 : :
282 : : /* Try finding existing mapping. */
3190 peter_e@gmx.net 283 : 213 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
284 : : ObjectIdGetDatum(relid),
285 : : ObjectIdGetDatum(subid));
2811 286 [ - + ]: 213 : if (HeapTupleIsValid(tup))
54 akapila@postgresql.o 287 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
288 : : relid, subid);
289 : :
290 : : /* Form the tuple. */
2811 peter_e@gmx.net 291 :CBC 213 : memset(values, 0, sizeof(values));
292 : 213 : memset(nulls, false, sizeof(nulls));
293 : 213 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
294 : 213 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
295 : 213 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
40 alvherre@kurilemu.de 296 [ + + ]:GNC 213 : if (XLogRecPtrIsValid(sublsn))
2811 peter_e@gmx.net 297 :CBC 1 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
298 : : else
299 : 212 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
300 : :
301 : 213 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302 : :
303 : : /* Insert tuple into catalog. */
2583 andres@anarazel.de 304 : 213 : CatalogTupleInsert(rel, tup);
305 : :
2811 peter_e@gmx.net 306 : 213 : heap_freetuple(tup);
307 : :
308 : : /* Cleanup. */
714 akapila@postgresql.o 309 [ + + ]: 213 : if (retain_lock)
310 : : {
311 : 211 : table_close(rel, NoLock);
312 : : }
313 : : else
314 : : {
315 : 2 : table_close(rel, RowExclusiveLock);
316 : 2 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
317 : : }
2811 peter_e@gmx.net 318 : 213 : }
319 : :
320 : : /*
321 : : * Update the state of a subscription table.
322 : : */
323 : : void
324 : 762 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
325 : : XLogRecPtr sublsn, bool already_locked)
326 : : {
327 : : Relation rel;
328 : : HeapTuple tup;
329 : : bool nulls[Natts_pg_subscription_rel];
330 : : Datum values[Natts_pg_subscription_rel];
331 : : bool replaces[Natts_pg_subscription_rel];
332 : :
137 akapila@postgresql.o 333 [ + + ]: 762 : if (already_locked)
334 : : {
335 : : #ifdef USE_ASSERT_CHECKING
336 : : LOCKTAG tag;
337 : :
338 [ - + ]: 180 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
339 : : RowExclusiveLock, true));
340 : 180 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
341 [ - + ]: 180 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
342 : : #endif
343 : :
344 : 180 : rel = table_open(SubscriptionRelRelationId, NoLock);
345 : : }
346 : : else
347 : : {
348 : 582 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
349 : 581 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
350 : : }
351 : :
352 : : /* Try finding existing mapping. */
2811 peter_e@gmx.net 353 : 761 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
354 : : ObjectIdGetDatum(relid),
355 : : ObjectIdGetDatum(subid));
356 [ - + ]: 761 : if (!HeapTupleIsValid(tup))
41 akapila@postgresql.o 357 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u does not exist",
358 : : relid, subid);
359 : :
360 : : /* Update the tuple. */
2811 peter_e@gmx.net 361 :CBC 761 : memset(values, 0, sizeof(values));
362 : 761 : memset(nulls, false, sizeof(nulls));
363 : 761 : memset(replaces, false, sizeof(replaces));
364 : :
365 : 761 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
366 : 761 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
367 : :
368 : 761 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
40 alvherre@kurilemu.de 369 [ + + ]:GNC 761 : if (XLogRecPtrIsValid(sublsn))
2811 peter_e@gmx.net 370 :CBC 374 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
371 : : else
372 : 387 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
373 : :
374 : 761 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
375 : : replaces);
376 : :
377 : : /* Update the catalog. */
378 : 761 : CatalogTupleUpdate(rel, &tup->t_self, tup);
379 : :
380 : : /* Cleanup. */
2521 andres@anarazel.de 381 : 761 : table_close(rel, NoLock);
3190 peter_e@gmx.net 382 : 761 : }
383 : :
384 : : /*
385 : : * Get state of subscription table.
386 : : *
387 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
388 : : */
389 : : char
1888 alvherre@alvh.no-ip. 390 : 1232 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
391 : : {
392 : : HeapTuple tup;
393 : : char substate;
394 : : bool isnull;
395 : : Datum d;
396 : : Relation rel;
397 : :
398 : : /*
399 : : * This is to avoid the race condition with AlterSubscription which tries
400 : : * to remove this relstate.
401 : : */
1768 akapila@postgresql.o 402 : 1232 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
403 : :
404 : : /* Try finding the mapping. */
3190 peter_e@gmx.net 405 : 1232 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
406 : : ObjectIdGetDatum(relid),
407 : : ObjectIdGetDatum(subid));
408 : :
409 [ + + ]: 1232 : if (!HeapTupleIsValid(tup))
410 : : {
1755 akapila@postgresql.o 411 : 23 : table_close(rel, AccessShareLock);
1888 alvherre@alvh.no-ip. 412 : 23 : *sublsn = InvalidXLogRecPtr;
413 : 23 : return SUBREL_STATE_UNKNOWN;
414 : : }
415 : :
416 : : /* Get the state. */
417 : 1209 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
418 : :
419 : : /* Get the LSN */
3190 peter_e@gmx.net 420 : 1209 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
421 : : Anum_pg_subscription_rel_srsublsn, &isnull);
422 [ + + ]: 1209 : if (isnull)
423 : 662 : *sublsn = InvalidXLogRecPtr;
424 : : else
425 : 547 : *sublsn = DatumGetLSN(d);
426 : :
427 : : /* Cleanup */
428 : 1209 : ReleaseSysCache(tup);
429 : :
1768 akapila@postgresql.o 430 : 1209 : table_close(rel, AccessShareLock);
431 : :
3190 peter_e@gmx.net 432 : 1209 : return substate;
433 : : }
434 : :
435 : : /*
436 : : * Drop subscription relation mapping. These can be for a particular
437 : : * subscription, or for a particular relation, or both.
438 : : */
439 : : void
440 : 24002 : RemoveSubscriptionRel(Oid subid, Oid relid)
441 : : {
442 : : Relation rel;
443 : : TableScanDesc scan;
444 : : ScanKeyData skey[2];
445 : : HeapTuple tup;
446 : 24002 : int nkeys = 0;
447 : :
2521 andres@anarazel.de 448 : 24002 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
449 : :
3190 peter_e@gmx.net 450 [ + + ]: 24002 : if (OidIsValid(subid))
451 : : {
452 : 136 : ScanKeyInit(&skey[nkeys++],
453 : : Anum_pg_subscription_rel_srsubid,
454 : : BTEqualStrategyNumber,
455 : : F_OIDEQ,
456 : : ObjectIdGetDatum(subid));
457 : : }
458 : :
459 [ + + ]: 24002 : if (OidIsValid(relid))
460 : : {
461 : 23887 : ScanKeyInit(&skey[nkeys++],
462 : : Anum_pg_subscription_rel_srrelid,
463 : : BTEqualStrategyNumber,
464 : : F_OIDEQ,
465 : : ObjectIdGetDatum(relid));
466 : : }
467 : :
468 : : /* Do the search and delete what we found. */
2472 andres@anarazel.de 469 : 24002 : scan = table_beginscan_catalog(rel, nkeys, skey);
3190 peter_e@gmx.net 470 [ + + ]: 24120 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
471 : : {
472 : : Form_pg_subscription_rel subrel;
473 : :
1768 akapila@postgresql.o 474 : 118 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
475 : :
476 : : /*
477 : : * We don't allow to drop the relation mapping when the table
478 : : * synchronization is in progress unless the caller updates the
479 : : * corresponding subscription as well. This is to ensure that we don't
480 : : * leave tablesync slots or origins in the system when the
481 : : * corresponding table is dropped. For sequences, however, it's ok to
482 : : * drop them since no separate slots or origins are created during
483 : : * synchronization.
484 : : */
54 akapila@postgresql.o 485 [ + + ]:GNC 118 : if (!OidIsValid(subid) &&
486 [ - + - - ]: 16 : subrel->srsubstate != SUBREL_STATE_READY &&
54 akapila@postgresql.o 487 :UNC 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
488 : : {
1768 akapila@postgresql.o 489 [ # # ]:UBC 0 : ereport(ERROR,
490 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
491 : : errmsg("could not drop relation mapping for subscription \"%s\"",
492 : : get_subscription_name(subrel->srsubid, false)),
493 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
494 : : get_rel_name(relid), subrel->srsubstate),
495 : :
496 : : /*
497 : : * translator: first %s is a SQL ALTER command and second %s is a
498 : : * SQL DROP command
499 : : */
500 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
501 : : "ALTER SUBSCRIPTION ... ENABLE",
502 : : "DROP SUBSCRIPTION ...")));
503 : : }
504 : :
3107 tgl@sss.pgh.pa.us 505 :CBC 118 : CatalogTupleDelete(rel, &tup->t_self);
506 : : }
2472 andres@anarazel.de 507 : 24002 : table_endscan(scan);
508 : :
2521 509 : 24002 : table_close(rel, RowExclusiveLock);
3190 peter_e@gmx.net 510 : 24002 : }
511 : :
512 : : /*
513 : : * Does the subscription have any tables?
514 : : *
515 : : * Use this function only to know true/false, and when you have no need for the
516 : : * List returned by GetSubscriptionRelations.
517 : : */
518 : : bool
61 akapila@postgresql.o 519 :GNC 264 : HasSubscriptionTables(Oid subid)
520 : : {
521 : : Relation rel;
522 : : ScanKeyData skey[1];
523 : : SysScanDesc scan;
524 : : HeapTuple tup;
54 525 : 264 : bool has_subtables = false;
526 : :
1616 akapila@postgresql.o 527 :CBC 264 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
528 : :
529 : 264 : ScanKeyInit(&skey[0],
530 : : Anum_pg_subscription_rel_srsubid,
531 : : BTEqualStrategyNumber, F_OIDEQ,
532 : : ObjectIdGetDatum(subid));
533 : :
534 : 264 : scan = systable_beginscan(rel, InvalidOid, false,
535 : : NULL, 1, skey);
536 : :
54 akapila@postgresql.o 537 [ + + ]:GNC 301 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
538 : : {
539 : : Form_pg_subscription_rel subrel;
540 : : char relkind;
541 : :
542 : 286 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
543 : 286 : relkind = get_rel_relkind(subrel->srrelid);
544 : :
545 [ + + + + ]: 286 : if (relkind == RELKIND_RELATION ||
546 : : relkind == RELKIND_PARTITIONED_TABLE)
547 : : {
548 : 249 : has_subtables = true;
549 : 249 : break;
550 : : }
551 : : }
552 : :
553 : : /* Cleanup */
1616 akapila@postgresql.o 554 :CBC 264 : systable_endscan(scan);
555 : 264 : table_close(rel, AccessShareLock);
556 : :
54 akapila@postgresql.o 557 :GNC 264 : return has_subtables;
558 : : }
559 : :
560 : : /*
561 : : * Get the relations for the subscription.
562 : : *
563 : : * If not_ready is true, return only the relations that are not in a ready
564 : : * state, otherwise return all the relations of the subscription. The
565 : : * returned list is palloc'ed in the current memory context.
566 : : */
567 : : List *
568 : 1124 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
569 : : bool not_ready)
570 : : {
3190 peter_e@gmx.net 571 :CBC 1124 : List *res = NIL;
572 : : Relation rel;
573 : : HeapTuple tup;
574 : 1124 : int nkeys = 0;
575 : : ScanKeyData skey[2];
576 : : SysScanDesc scan;
577 : :
578 : : /* One or both of 'tables' and 'sequences' must be true. */
54 akapila@postgresql.o 579 [ + + - + ]:GNC 1124 : Assert(tables || sequences);
580 : :
2521 andres@anarazel.de 581 :CBC 1124 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
582 : :
3190 peter_e@gmx.net 583 : 1124 : ScanKeyInit(&skey[nkeys++],
584 : : Anum_pg_subscription_rel_srsubid,
585 : : BTEqualStrategyNumber, F_OIDEQ,
586 : : ObjectIdGetDatum(subid));
587 : :
1238 michael@paquier.xyz 588 [ + + ]: 1124 : if (not_ready)
589 : 1086 : ScanKeyInit(&skey[nkeys++],
590 : : Anum_pg_subscription_rel_srsubstate,
591 : : BTEqualStrategyNumber, F_CHARNE,
592 : : CharGetDatum(SUBREL_STATE_READY));
593 : :
3190 peter_e@gmx.net 594 : 1124 : scan = systable_beginscan(rel, InvalidOid, false,
595 : : NULL, nkeys, skey);
596 : :
597 [ + + ]: 3022 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
598 : : {
599 : : Form_pg_subscription_rel subrel;
600 : : SubscriptionRelState *relstate;
601 : : Datum d;
602 : : bool isnull;
603 : : char relkind;
604 : :
605 : 1898 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
606 : :
607 : : /* Relation is either a sequence or a table */
54 akapila@postgresql.o 608 :GNC 1898 : relkind = get_rel_relkind(subrel->srrelid);
609 [ + + + + : 1898 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
- + ]
610 : : relkind == RELKIND_PARTITIONED_TABLE);
611 : :
612 : : /* Skip sequences if they were not requested */
613 [ + + - + ]: 1898 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
54 akapila@postgresql.o 614 :UNC 0 : continue;
615 : :
616 : : /* Skip tables if they were not requested */
54 akapila@postgresql.o 617 [ + + + + ]:GNC 1898 : if ((relkind == RELKIND_RELATION ||
618 [ - + ]: 1870 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
54 akapila@postgresql.o 619 :UNC 0 : continue;
620 : :
6 michael@paquier.xyz 621 :GNC 1898 : relstate = palloc_object(SubscriptionRelState);
3190 peter_e@gmx.net 622 :CBC 1898 : relstate->relid = subrel->srrelid;
623 : 1898 : relstate->state = subrel->srsubstate;
1975 tgl@sss.pgh.pa.us 624 : 1898 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
625 : : Anum_pg_subscription_rel_srsublsn, &isnull);
626 [ + + ]: 1898 : if (isnull)
627 : 1561 : relstate->lsn = InvalidXLogRecPtr;
628 : : else
629 : 337 : relstate->lsn = DatumGetLSN(d);
630 : :
3190 peter_e@gmx.net 631 : 1898 : res = lappend(res, relstate);
632 : : }
633 : :
634 : : /* Cleanup */
635 : 1124 : systable_endscan(scan);
2521 andres@anarazel.de 636 : 1124 : table_close(rel, AccessShareLock);
637 : :
3190 peter_e@gmx.net 638 : 1124 : return res;
639 : : }
640 : :
641 : : /*
642 : : * Update the dead tuple retention status for the given subscription.
643 : : */
644 : : void
105 akapila@postgresql.o 645 :GNC 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
646 : : {
647 : : Relation rel;
648 : : bool nulls[Natts_pg_subscription];
649 : : bool replaces[Natts_pg_subscription];
650 : : Datum values[Natts_pg_subscription];
651 : : HeapTuple tup;
652 : :
653 : : /* Look up the subscription in the catalog */
654 : 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
655 : 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
656 : :
657 [ - + ]: 2 : if (!HeapTupleIsValid(tup))
105 akapila@postgresql.o 658 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
659 : :
105 akapila@postgresql.o 660 :GNC 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
661 : :
662 : : /* Form a new tuple. */
663 : 2 : memset(values, 0, sizeof(values));
664 : 2 : memset(nulls, false, sizeof(nulls));
665 : 2 : memset(replaces, false, sizeof(replaces));
666 : :
667 : : /* Set the subscription to disabled. */
668 : 2 : values[Anum_pg_subscription_subretentionactive - 1] = active;
669 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
670 : :
671 : : /* Update the catalog */
672 : 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
673 : : replaces);
674 : 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
675 : 2 : heap_freetuple(tup);
676 : :
677 : 2 : table_close(rel, NoLock);
678 : 2 : }
|