Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_subscription.h"
23 : : #include "catalog/pg_subscription_rel.h"
24 : : #include "catalog/pg_type.h"
25 : : #include "miscadmin.h"
26 : : #include "storage/lmgr.h"
27 : : #include "utils/array.h"
28 : : #include "utils/builtins.h"
29 : : #include "utils/fmgroids.h"
30 : : #include "utils/lsyscache.h"
31 : : #include "utils/pg_lsn.h"
32 : : #include "utils/rel.h"
33 : : #include "utils/syscache.h"
34 : :
35 : : static List *textarray_to_stringlist(ArrayType *textarray);
36 : :
37 : : /*
38 : : * Add a comma-separated list of publication names to the 'dest' string.
39 : : */
40 : : void
367 michael@paquier.xyz 41 :CBC 505 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : : {
43 : : ListCell *lc;
44 : 505 : bool first = true;
45 : :
46 [ - + ]: 505 : Assert(publications != NIL);
47 : :
48 [ + - + + : 1303 : foreach(lc, publications)
+ + ]
49 : : {
50 : 798 : char *pubname = strVal(lfirst(lc));
51 : :
52 [ + + ]: 798 : if (first)
53 : 505 : first = false;
54 : : else
55 : 293 : appendStringInfoString(dest, ", ");
56 : :
57 [ + + ]: 798 : if (quote_literal)
58 : 788 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : : else
60 : : {
61 : 10 : appendStringInfoChar(dest, '"');
62 : 10 : appendStringInfoString(dest, pubname);
63 : 10 : appendStringInfoChar(dest, '"');
64 : : }
65 : : }
66 : 505 : }
67 : :
68 : : /*
69 : : * Fetch the subscription from the syscache.
70 : : */
71 : : Subscription *
3203 peter_e@gmx.net 72 : 864 : GetSubscription(Oid subid, bool missing_ok)
73 : : {
74 : : HeapTuple tup;
75 : : Subscription *sub;
76 : : Form_pg_subscription subform;
77 : : Datum datum;
78 : : bool isnull;
79 : :
80 : 864 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 : :
82 [ + + ]: 864 : if (!HeapTupleIsValid(tup))
83 : : {
84 [ + - ]: 58 : if (missing_ok)
85 : 58 : return NULL;
86 : :
3203 peter_e@gmx.net 87 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : : }
89 : :
3203 peter_e@gmx.net 90 :CBC 806 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 : :
92 : 806 : sub = (Subscription *) palloc(sizeof(Subscription));
93 : 806 : sub->oid = subid;
94 : 806 : sub->dbid = subform->subdbid;
1299 akapila@postgresql.o 95 : 806 : sub->skiplsn = subform->subskiplsn;
3203 peter_e@gmx.net 96 : 806 : sub->name = pstrdup(NameStr(subform->subname));
97 : 806 : sub->owner = subform->subowner;
98 : 806 : sub->enabled = subform->subenabled;
1927 tgl@sss.pgh.pa.us 99 : 806 : sub->binary = subform->subbinary;
1880 akapila@postgresql.o 100 : 806 : sub->stream = subform->substream;
1566 101 : 806 : sub->twophasestate = subform->subtwophasestate;
1323 102 : 806 : sub->disableonerr = subform->subdisableonerr;
942 rhaas@postgresql.org 103 : 806 : sub->passwordrequired = subform->subpasswordrequired;
937 104 : 806 : sub->runasowner = subform->subrunasowner;
636 akapila@postgresql.o 105 : 806 : sub->failover = subform->subfailover;
96 akapila@postgresql.o 106 :GNC 806 : sub->retaindeadtuples = subform->subretaindeadtuples;
55 107 : 806 : sub->maxretention = subform->submaxretention;
108 : 806 : sub->retentionactive = subform->subretentionactive;
109 : :
110 : : /* Get conninfo */
947 dgustafsson@postgres 111 :CBC 806 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
112 : : tup,
113 : : Anum_pg_subscription_subconninfo);
3118 peter_e@gmx.net 114 : 806 : sub->conninfo = TextDatumGetCString(datum);
115 : :
116 : : /* Get slotname */
3203 117 : 806 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
118 : : tup,
119 : : Anum_pg_subscription_subslotname,
120 : : &isnull);
3093 121 [ + + ]: 806 : if (!isnull)
122 : 773 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
123 : : else
124 : 33 : sub->slotname = NULL;
125 : :
126 : : /* Get synccommit */
947 dgustafsson@postgres 127 : 806 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
128 : : tup,
129 : : Anum_pg_subscription_subsynccommit);
3118 peter_e@gmx.net 130 : 806 : sub->synccommit = TextDatumGetCString(datum);
131 : :
132 : : /* Get publications */
947 dgustafsson@postgres 133 : 806 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134 : : tup,
135 : : Anum_pg_subscription_subpublications);
3203 peter_e@gmx.net 136 : 806 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
137 : :
138 : : /* Get origin */
947 dgustafsson@postgres 139 : 806 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
140 : : tup,
141 : : Anum_pg_subscription_suborigin);
1194 akapila@postgresql.o 142 : 806 : sub->origin = TextDatumGetCString(datum);
143 : :
144 : : /* Is the subscription owner a superuser? */
741 145 : 806 : sub->ownersuperuser = superuser_arg(sub->owner);
146 : :
3203 peter_e@gmx.net 147 : 806 : ReleaseSysCache(tup);
148 : :
149 : 806 : return sub;
150 : : }
151 : :
152 : : /*
153 : : * Return number of subscriptions defined in given database.
154 : : * Used by dropdb() to check if database can indeed be dropped.
155 : : */
156 : : int
157 : 44 : CountDBSubscriptions(Oid dbid)
158 : : {
3085 bruce@momjian.us 159 : 44 : int nsubs = 0;
160 : : Relation rel;
161 : : ScanKeyData scankey;
162 : : SysScanDesc scan;
163 : : HeapTuple tup;
164 : :
2471 andres@anarazel.de 165 : 44 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
166 : :
3203 peter_e@gmx.net 167 : 44 : ScanKeyInit(&scankey,
168 : : Anum_pg_subscription_subdbid,
169 : : BTEqualStrategyNumber, F_OIDEQ,
170 : : ObjectIdGetDatum(dbid));
171 : :
172 : 44 : scan = systable_beginscan(rel, InvalidOid, false,
173 : : NULL, 1, &scankey);
174 : :
175 [ - + ]: 44 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3203 peter_e@gmx.net 176 :UBC 0 : nsubs++;
177 : :
3203 peter_e@gmx.net 178 :CBC 44 : systable_endscan(scan);
179 : :
2471 andres@anarazel.de 180 : 44 : table_close(rel, NoLock);
181 : :
3203 peter_e@gmx.net 182 : 44 : return nsubs;
183 : : }
184 : :
185 : : /*
186 : : * Free memory allocated by subscription struct.
187 : : */
188 : : void
189 : 44 : FreeSubscription(Subscription *sub)
190 : : {
191 : 44 : pfree(sub->name);
192 : 44 : pfree(sub->conninfo);
3093 193 [ + - ]: 44 : if (sub->slotname)
194 : 44 : pfree(sub->slotname);
3203 195 : 44 : list_free_deep(sub->publications);
196 : 44 : pfree(sub);
197 : 44 : }
198 : :
199 : : /*
200 : : * Disable the given subscription.
201 : : */
202 : : void
1323 akapila@postgresql.o 203 : 4 : DisableSubscription(Oid subid)
204 : : {
205 : : Relation rel;
206 : : bool nulls[Natts_pg_subscription];
207 : : bool replaces[Natts_pg_subscription];
208 : : Datum values[Natts_pg_subscription];
209 : : HeapTuple tup;
210 : :
211 : : /* Look up the subscription in the catalog */
212 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
213 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
214 : :
215 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
1323 akapila@postgresql.o 216 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
217 : :
1323 akapila@postgresql.o 218 :CBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
219 : :
220 : : /* Form a new tuple. */
221 : 4 : memset(values, 0, sizeof(values));
222 : 4 : memset(nulls, false, sizeof(nulls));
223 : 4 : memset(replaces, false, sizeof(replaces));
224 : :
225 : : /* Set the subscription to disabled. */
226 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
227 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
228 : :
229 : : /* Update the catalog */
230 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
231 : : replaces);
232 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
233 : 4 : heap_freetuple(tup);
234 : :
235 : 4 : table_close(rel, NoLock);
236 : 4 : }
237 : :
238 : : /*
239 : : * Convert text array to list of strings.
240 : : *
241 : : * Note: the resulting list of strings is pallocated here.
242 : : */
243 : : static List *
3203 peter_e@gmx.net 244 : 806 : textarray_to_stringlist(ArrayType *textarray)
245 : : {
246 : : Datum *elems;
247 : : int nelems,
248 : : i;
3085 bruce@momjian.us 249 : 806 : List *res = NIL;
250 : :
1214 peter@eisentraut.org 251 : 806 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
252 : :
3203 peter_e@gmx.net 253 [ - + ]: 806 : if (nelems == 0)
3203 peter_e@gmx.net 254 :UBC 0 : return NIL;
255 : :
3203 peter_e@gmx.net 256 [ + + ]:CBC 1971 : for (i = 0; i < nelems; i++)
3118 257 : 1165 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
258 : :
3203 259 : 806 : return res;
260 : : }
261 : :
262 : : /*
263 : : * Add new state record for a subscription table.
264 : : *
265 : : * If retain_lock is true, then don't release the locks taken in this function.
266 : : * We normally release the locks at the end of transaction but in binary-upgrade
267 : : * mode, we expect to release those immediately.
268 : : */
269 : : void
2761 270 : 204 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
271 : : XLogRecPtr sublsn, bool retain_lock)
272 : : {
273 : : Relation rel;
274 : : HeapTuple tup;
275 : : bool nulls[Natts_pg_subscription_rel];
276 : : Datum values[Natts_pg_subscription_rel];
277 : :
3038 278 : 204 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
279 : :
2471 andres@anarazel.de 280 : 204 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
281 : :
282 : : /* Try finding existing mapping. */
3140 peter_e@gmx.net 283 : 204 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
284 : : ObjectIdGetDatum(relid),
285 : : ObjectIdGetDatum(subid));
2761 286 [ - + ]: 204 : if (HeapTupleIsValid(tup))
4 akapila@postgresql.o 287 [ # # ]:UNC 0 : elog(ERROR, "subscription relation %u in subscription %u already exists",
288 : : relid, subid);
289 : :
290 : : /* Form the tuple. */
2761 peter_e@gmx.net 291 :CBC 204 : memset(values, 0, sizeof(values));
292 : 204 : memset(nulls, false, sizeof(nulls));
293 : 204 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
294 : 204 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
295 : 204 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
296 [ + + ]: 204 : if (sublsn != InvalidXLogRecPtr)
297 : 1 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
298 : : else
299 : 203 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
300 : :
301 : 204 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302 : :
303 : : /* Insert tuple into catalog. */
2533 andres@anarazel.de 304 : 204 : CatalogTupleInsert(rel, tup);
305 : :
2761 peter_e@gmx.net 306 : 204 : heap_freetuple(tup);
307 : :
308 : : /* Cleanup. */
664 akapila@postgresql.o 309 [ + + ]: 204 : if (retain_lock)
310 : : {
311 : 202 : table_close(rel, NoLock);
312 : : }
313 : : else
314 : : {
315 : 2 : table_close(rel, RowExclusiveLock);
316 : 2 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
317 : : }
2761 peter_e@gmx.net 318 : 204 : }
319 : :
320 : : /*
321 : : * Update the state of a subscription table.
322 : : */
323 : : void
324 : 740 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
325 : : XLogRecPtr sublsn, bool already_locked)
326 : : {
327 : : Relation rel;
328 : : HeapTuple tup;
329 : : bool nulls[Natts_pg_subscription_rel];
330 : : Datum values[Natts_pg_subscription_rel];
331 : : bool replaces[Natts_pg_subscription_rel];
332 : :
87 akapila@postgresql.o 333 [ + + ]: 740 : if (already_locked)
334 : : {
335 : : #ifdef USE_ASSERT_CHECKING
336 : : LOCKTAG tag;
337 : :
338 [ - + ]: 179 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
339 : : RowExclusiveLock, true));
340 : 179 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
341 [ - + ]: 179 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
342 : : #endif
343 : :
344 : 179 : rel = table_open(SubscriptionRelRelationId, NoLock);
345 : : }
346 : : else
347 : : {
348 : 561 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
349 : 561 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
350 : : }
351 : :
352 : : /* Try finding existing mapping. */
2761 peter_e@gmx.net 353 : 740 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
354 : : ObjectIdGetDatum(relid),
355 : : ObjectIdGetDatum(subid));
356 [ - + ]: 740 : if (!HeapTupleIsValid(tup))
2761 peter_e@gmx.net 357 [ # # ]:UBC 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
358 : : relid, subid);
359 : :
360 : : /* Update the tuple. */
2761 peter_e@gmx.net 361 :CBC 740 : memset(values, 0, sizeof(values));
362 : 740 : memset(nulls, false, sizeof(nulls));
363 : 740 : memset(replaces, false, sizeof(replaces));
364 : :
365 : 740 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
366 : 740 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
367 : :
368 : 740 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
369 [ + + ]: 740 : if (sublsn != InvalidXLogRecPtr)
370 : 362 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
371 : : else
372 : 378 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
373 : :
374 : 740 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
375 : : replaces);
376 : :
377 : : /* Update the catalog. */
378 : 740 : CatalogTupleUpdate(rel, &tup->t_self, tup);
379 : :
380 : : /* Cleanup. */
2471 andres@anarazel.de 381 : 740 : table_close(rel, NoLock);
3140 peter_e@gmx.net 382 : 740 : }
383 : :
384 : : /*
385 : : * Get state of subscription table.
386 : : *
387 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
388 : : */
389 : : char
1838 alvherre@alvh.no-ip. 390 : 1212 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
391 : : {
392 : : HeapTuple tup;
393 : : char substate;
394 : : bool isnull;
395 : : Datum d;
396 : : Relation rel;
397 : :
398 : : /*
399 : : * This is to avoid the race condition with AlterSubscription which tries
400 : : * to remove this relstate.
401 : : */
1718 akapila@postgresql.o 402 : 1212 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
403 : :
404 : : /* Try finding the mapping. */
3140 peter_e@gmx.net 405 : 1212 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
406 : : ObjectIdGetDatum(relid),
407 : : ObjectIdGetDatum(subid));
408 : :
409 [ + + ]: 1212 : if (!HeapTupleIsValid(tup))
410 : : {
1705 akapila@postgresql.o 411 : 24 : table_close(rel, AccessShareLock);
1838 alvherre@alvh.no-ip. 412 : 24 : *sublsn = InvalidXLogRecPtr;
413 : 24 : return SUBREL_STATE_UNKNOWN;
414 : : }
415 : :
416 : : /* Get the state. */
417 : 1188 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
418 : :
419 : : /* Get the LSN */
3140 peter_e@gmx.net 420 : 1188 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
421 : : Anum_pg_subscription_rel_srsublsn, &isnull);
422 [ + + ]: 1188 : if (isnull)
423 : 645 : *sublsn = InvalidXLogRecPtr;
424 : : else
425 : 543 : *sublsn = DatumGetLSN(d);
426 : :
427 : : /* Cleanup */
428 : 1188 : ReleaseSysCache(tup);
429 : :
1718 akapila@postgresql.o 430 : 1188 : table_close(rel, AccessShareLock);
431 : :
3140 peter_e@gmx.net 432 : 1188 : return substate;
433 : : }
434 : :
435 : : /*
436 : : * Drop subscription relation mapping. These can be for a particular
437 : : * subscription, or for a particular relation, or both.
438 : : */
439 : : void
440 : 22607 : RemoveSubscriptionRel(Oid subid, Oid relid)
441 : : {
442 : : Relation rel;
443 : : TableScanDesc scan;
444 : : ScanKeyData skey[2];
445 : : HeapTuple tup;
446 : 22607 : int nkeys = 0;
447 : :
2471 andres@anarazel.de 448 : 22607 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
449 : :
3140 peter_e@gmx.net 450 [ + + ]: 22607 : if (OidIsValid(subid))
451 : : {
452 : 134 : ScanKeyInit(&skey[nkeys++],
453 : : Anum_pg_subscription_rel_srsubid,
454 : : BTEqualStrategyNumber,
455 : : F_OIDEQ,
456 : : ObjectIdGetDatum(subid));
457 : : }
458 : :
459 [ + + ]: 22607 : if (OidIsValid(relid))
460 : : {
461 : 22494 : ScanKeyInit(&skey[nkeys++],
462 : : Anum_pg_subscription_rel_srrelid,
463 : : BTEqualStrategyNumber,
464 : : F_OIDEQ,
465 : : ObjectIdGetDatum(relid));
466 : : }
467 : :
468 : : /* Do the search and delete what we found. */
2422 andres@anarazel.de 469 : 22607 : scan = table_beginscan_catalog(rel, nkeys, skey);
3140 peter_e@gmx.net 470 [ + + ]: 22720 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
471 : : {
472 : : Form_pg_subscription_rel subrel;
473 : :
1718 akapila@postgresql.o 474 : 113 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
475 : :
476 : : /*
477 : : * We don't allow to drop the relation mapping when the table
478 : : * synchronization is in progress unless the caller updates the
479 : : * corresponding subscription as well. This is to ensure that we don't
480 : : * leave tablesync slots or origins in the system when the
481 : : * corresponding table is dropped. For sequences, however, it's ok to
482 : : * drop them since no separate slots or origins are created during
483 : : * synchronization.
484 : : */
4 akapila@postgresql.o 485 [ + + ]:GNC 113 : if (!OidIsValid(subid) &&
486 [ - + - - ]: 16 : subrel->srsubstate != SUBREL_STATE_READY &&
4 akapila@postgresql.o 487 :UNC 0 : get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
488 : : {
1718 akapila@postgresql.o 489 [ # # ]:UBC 0 : ereport(ERROR,
490 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
491 : : errmsg("could not drop relation mapping for subscription \"%s\"",
492 : : get_subscription_name(subrel->srsubid, false)),
493 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
494 : : get_rel_name(relid), subrel->srsubstate),
495 : :
496 : : /*
497 : : * translator: first %s is a SQL ALTER command and second %s is a
498 : : * SQL DROP command
499 : : */
500 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
501 : : "ALTER SUBSCRIPTION ... ENABLE",
502 : : "DROP SUBSCRIPTION ...")));
503 : : }
504 : :
3057 tgl@sss.pgh.pa.us 505 :CBC 113 : CatalogTupleDelete(rel, &tup->t_self);
506 : : }
2422 andres@anarazel.de 507 : 22607 : table_endscan(scan);
508 : :
2471 509 : 22607 : table_close(rel, RowExclusiveLock);
3140 peter_e@gmx.net 510 : 22607 : }
511 : :
512 : : /*
513 : : * Does the subscription have any tables?
514 : : *
515 : : * Use this function only to know true/false, and when you have no need for the
516 : : * List returned by GetSubscriptionRelations.
517 : : */
518 : : bool
11 akapila@postgresql.o 519 :GNC 246 : HasSubscriptionTables(Oid subid)
520 : : {
521 : : Relation rel;
522 : : ScanKeyData skey[1];
523 : : SysScanDesc scan;
524 : : HeapTuple tup;
4 525 : 246 : bool has_subtables = false;
526 : :
1566 akapila@postgresql.o 527 :CBC 246 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
528 : :
529 : 246 : ScanKeyInit(&skey[0],
530 : : Anum_pg_subscription_rel_srsubid,
531 : : BTEqualStrategyNumber, F_OIDEQ,
532 : : ObjectIdGetDatum(subid));
533 : :
534 : 246 : scan = systable_beginscan(rel, InvalidOid, false,
535 : : NULL, 1, skey);
536 : :
4 akapila@postgresql.o 537 [ + + ]:GNC 247 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
538 : : {
539 : : Form_pg_subscription_rel subrel;
540 : : char relkind;
541 : :
542 : 237 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
543 : 237 : relkind = get_rel_relkind(subrel->srrelid);
544 : :
545 [ + + + + ]: 237 : if (relkind == RELKIND_RELATION ||
546 : : relkind == RELKIND_PARTITIONED_TABLE)
547 : : {
548 : 236 : has_subtables = true;
549 : 236 : break;
550 : : }
551 : : }
552 : :
553 : : /* Cleanup */
1566 akapila@postgresql.o 554 :CBC 246 : systable_endscan(scan);
555 : 246 : table_close(rel, AccessShareLock);
556 : :
4 akapila@postgresql.o 557 :GNC 246 : return has_subtables;
558 : : }
559 : :
560 : : /*
561 : : * Get the relations for the subscription.
562 : : *
563 : : * If not_ready is true, return only the relations that are not in a ready
564 : : * state, otherwise return all the relations of the subscription. The
565 : : * returned list is palloc'ed in the current memory context.
566 : : */
567 : : List *
568 : 1029 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
569 : : bool not_ready)
570 : : {
3140 peter_e@gmx.net 571 :CBC 1029 : List *res = NIL;
572 : : Relation rel;
573 : : HeapTuple tup;
574 : 1029 : int nkeys = 0;
575 : : ScanKeyData skey[2];
576 : : SysScanDesc scan;
577 : :
578 : : /* One or both of 'tables' and 'sequences' must be true. */
4 akapila@postgresql.o 579 [ - + - - ]:GNC 1029 : Assert(tables || sequences);
580 : :
2471 andres@anarazel.de 581 :CBC 1029 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
582 : :
3140 peter_e@gmx.net 583 : 1029 : ScanKeyInit(&skey[nkeys++],
584 : : Anum_pg_subscription_rel_srsubid,
585 : : BTEqualStrategyNumber, F_OIDEQ,
586 : : ObjectIdGetDatum(subid));
587 : :
1188 michael@paquier.xyz 588 [ + + ]: 1029 : if (not_ready)
589 : 995 : ScanKeyInit(&skey[nkeys++],
590 : : Anum_pg_subscription_rel_srsubstate,
591 : : BTEqualStrategyNumber, F_CHARNE,
592 : : CharGetDatum(SUBREL_STATE_READY));
593 : :
3140 peter_e@gmx.net 594 : 1029 : scan = systable_beginscan(rel, InvalidOid, false,
595 : : NULL, nkeys, skey);
596 : :
597 [ + + ]: 2621 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
598 : : {
599 : : Form_pg_subscription_rel subrel;
600 : : SubscriptionRelState *relstate;
601 : : Datum d;
602 : : bool isnull;
603 : : char relkind;
604 : :
605 : 1592 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
606 : :
607 : : /* Relation is either a sequence or a table */
4 akapila@postgresql.o 608 :GNC 1592 : relkind = get_rel_relkind(subrel->srrelid);
609 [ + + + + : 1592 : Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
- + ]
610 : : relkind == RELKIND_PARTITIONED_TABLE);
611 : :
612 : : /* Skip sequences if they were not requested */
613 [ + + + - ]: 1592 : if ((relkind == RELKIND_SEQUENCE) && !sequences)
614 : 1 : continue;
615 : :
616 : : /* Skip tables if they were not requested */
617 [ + + + - ]: 1591 : if ((relkind == RELKIND_RELATION ||
618 [ - + ]: 1591 : relkind == RELKIND_PARTITIONED_TABLE) && !tables)
4 akapila@postgresql.o 619 :UNC 0 : continue;
620 : :
3085 bruce@momjian.us 621 :CBC 1591 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
3140 peter_e@gmx.net 622 : 1591 : relstate->relid = subrel->srrelid;
623 : 1591 : relstate->state = subrel->srsubstate;
1925 tgl@sss.pgh.pa.us 624 : 1591 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
625 : : Anum_pg_subscription_rel_srsublsn, &isnull);
626 [ + + ]: 1591 : if (isnull)
627 : 1323 : relstate->lsn = InvalidXLogRecPtr;
628 : : else
629 : 268 : relstate->lsn = DatumGetLSN(d);
630 : :
3140 peter_e@gmx.net 631 : 1591 : res = lappend(res, relstate);
632 : : }
633 : :
634 : : /* Cleanup */
635 : 1029 : systable_endscan(scan);
2471 andres@anarazel.de 636 : 1029 : table_close(rel, AccessShareLock);
637 : :
3140 peter_e@gmx.net 638 : 1029 : return res;
639 : : }
640 : :
641 : : /*
642 : : * Update the dead tuple retention status for the given subscription.
643 : : */
644 : : void
55 akapila@postgresql.o 645 :GNC 2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
646 : : {
647 : : Relation rel;
648 : : bool nulls[Natts_pg_subscription];
649 : : bool replaces[Natts_pg_subscription];
650 : : Datum values[Natts_pg_subscription];
651 : : HeapTuple tup;
652 : :
653 : : /* Look up the subscription in the catalog */
654 : 2 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
655 : 2 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
656 : :
657 [ - + ]: 2 : if (!HeapTupleIsValid(tup))
55 akapila@postgresql.o 658 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
659 : :
55 akapila@postgresql.o 660 :GNC 2 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
661 : :
662 : : /* Form a new tuple. */
663 : 2 : memset(values, 0, sizeof(values));
664 : 2 : memset(nulls, false, sizeof(nulls));
665 : 2 : memset(replaces, false, sizeof(replaces));
666 : :
667 : : /* Set the subscription to disabled. */
668 : 2 : values[Anum_pg_subscription_subretentionactive - 1] = active;
669 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
670 : :
671 : : /* Update the catalog */
672 : 2 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
673 : : replaces);
674 : 2 : CatalogTupleUpdate(rel, &tup->t_self, tup);
675 : 2 : heap_freetuple(tup);
676 : :
677 : 2 : table_close(rel, NoLock);
678 : 2 : }
|