Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_subscription.c
4 : : * replication subscriptions
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_subscription.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/pg_subscription.h"
23 : : #include "catalog/pg_subscription_rel.h"
24 : : #include "catalog/pg_type.h"
25 : : #include "miscadmin.h"
26 : : #include "storage/lmgr.h"
27 : : #include "utils/array.h"
28 : : #include "utils/builtins.h"
29 : : #include "utils/fmgroids.h"
30 : : #include "utils/lsyscache.h"
31 : : #include "utils/pg_lsn.h"
32 : : #include "utils/rel.h"
33 : : #include "utils/syscache.h"
34 : :
35 : : static List *textarray_to_stringlist(ArrayType *textarray);
36 : :
37 : : /*
38 : : * Add a comma-separated list of publication names to the 'dest' string.
39 : : */
40 : : void
316 michael@paquier.xyz 41 :CBC 491 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
42 : : {
43 : : ListCell *lc;
44 : 491 : bool first = true;
45 : :
46 [ - + ]: 491 : Assert(publications != NIL);
47 : :
48 [ + - + + : 1275 : foreach(lc, publications)
+ + ]
49 : : {
50 : 784 : char *pubname = strVal(lfirst(lc));
51 : :
52 [ + + ]: 784 : if (first)
53 : 491 : first = false;
54 : : else
55 : 293 : appendStringInfoString(dest, ", ");
56 : :
57 [ + + ]: 784 : if (quote_literal)
58 : 774 : appendStringInfoString(dest, quote_literal_cstr(pubname));
59 : : else
60 : : {
61 : 10 : appendStringInfoChar(dest, '"');
62 : 10 : appendStringInfoString(dest, pubname);
63 : 10 : appendStringInfoChar(dest, '"');
64 : : }
65 : : }
66 : 491 : }
67 : :
68 : : /*
69 : : * Fetch the subscription from the syscache.
70 : : */
71 : : Subscription *
3152 peter_e@gmx.net 72 : 857 : GetSubscription(Oid subid, bool missing_ok)
73 : : {
74 : : HeapTuple tup;
75 : : Subscription *sub;
76 : : Form_pg_subscription subform;
77 : : Datum datum;
78 : : bool isnull;
79 : :
80 : 857 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
81 : :
82 [ + + ]: 857 : if (!HeapTupleIsValid(tup))
83 : : {
84 [ + - ]: 59 : if (missing_ok)
85 : 59 : return NULL;
86 : :
3152 peter_e@gmx.net 87 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
88 : : }
89 : :
3152 peter_e@gmx.net 90 :CBC 798 : subform = (Form_pg_subscription) GETSTRUCT(tup);
91 : :
92 : 798 : sub = (Subscription *) palloc(sizeof(Subscription));
93 : 798 : sub->oid = subid;
94 : 798 : sub->dbid = subform->subdbid;
1248 akapila@postgresql.o 95 : 798 : sub->skiplsn = subform->subskiplsn;
3152 peter_e@gmx.net 96 : 798 : sub->name = pstrdup(NameStr(subform->subname));
97 : 798 : sub->owner = subform->subowner;
98 : 798 : sub->enabled = subform->subenabled;
1876 tgl@sss.pgh.pa.us 99 : 798 : sub->binary = subform->subbinary;
1829 akapila@postgresql.o 100 : 798 : sub->stream = subform->substream;
1515 101 : 798 : sub->twophasestate = subform->subtwophasestate;
1272 102 : 798 : sub->disableonerr = subform->subdisableonerr;
891 rhaas@postgresql.org 103 : 798 : sub->passwordrequired = subform->subpasswordrequired;
886 104 : 798 : sub->runasowner = subform->subrunasowner;
585 akapila@postgresql.o 105 : 798 : sub->failover = subform->subfailover;
45 akapila@postgresql.o 106 :GNC 798 : sub->retaindeadtuples = subform->subretaindeadtuples;
4 107 : 798 : sub->maxretention = subform->submaxretention;
108 : 798 : sub->retentionactive = subform->subretentionactive;
109 : :
110 : : /* Get conninfo */
896 dgustafsson@postgres 111 :CBC 798 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
112 : : tup,
113 : : Anum_pg_subscription_subconninfo);
3067 peter_e@gmx.net 114 : 798 : sub->conninfo = TextDatumGetCString(datum);
115 : :
116 : : /* Get slotname */
3152 117 : 798 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
118 : : tup,
119 : : Anum_pg_subscription_subslotname,
120 : : &isnull);
3042 121 [ + + ]: 798 : if (!isnull)
122 : 765 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
123 : : else
124 : 33 : sub->slotname = NULL;
125 : :
126 : : /* Get synccommit */
896 dgustafsson@postgres 127 : 798 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
128 : : tup,
129 : : Anum_pg_subscription_subsynccommit);
3067 peter_e@gmx.net 130 : 798 : sub->synccommit = TextDatumGetCString(datum);
131 : :
132 : : /* Get publications */
896 dgustafsson@postgres 133 : 798 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134 : : tup,
135 : : Anum_pg_subscription_subpublications);
3152 peter_e@gmx.net 136 : 798 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
137 : :
138 : : /* Get origin */
896 dgustafsson@postgres 139 : 798 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
140 : : tup,
141 : : Anum_pg_subscription_suborigin);
1143 akapila@postgresql.o 142 : 798 : sub->origin = TextDatumGetCString(datum);
143 : :
144 : : /* Is the subscription owner a superuser? */
690 145 : 798 : sub->ownersuperuser = superuser_arg(sub->owner);
146 : :
3152 peter_e@gmx.net 147 : 798 : ReleaseSysCache(tup);
148 : :
149 : 798 : return sub;
150 : : }
151 : :
152 : : /*
153 : : * Return number of subscriptions defined in given database.
154 : : * Used by dropdb() to check if database can indeed be dropped.
155 : : */
156 : : int
157 : 44 : CountDBSubscriptions(Oid dbid)
158 : : {
3034 bruce@momjian.us 159 : 44 : int nsubs = 0;
160 : : Relation rel;
161 : : ScanKeyData scankey;
162 : : SysScanDesc scan;
163 : : HeapTuple tup;
164 : :
2420 andres@anarazel.de 165 : 44 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
166 : :
3152 peter_e@gmx.net 167 : 44 : ScanKeyInit(&scankey,
168 : : Anum_pg_subscription_subdbid,
169 : : BTEqualStrategyNumber, F_OIDEQ,
170 : : ObjectIdGetDatum(dbid));
171 : :
172 : 44 : scan = systable_beginscan(rel, InvalidOid, false,
173 : : NULL, 1, &scankey);
174 : :
175 [ - + ]: 44 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3152 peter_e@gmx.net 176 :UBC 0 : nsubs++;
177 : :
3152 peter_e@gmx.net 178 :CBC 44 : systable_endscan(scan);
179 : :
2420 andres@anarazel.de 180 : 44 : table_close(rel, NoLock);
181 : :
3152 peter_e@gmx.net 182 : 44 : return nsubs;
183 : : }
184 : :
185 : : /*
186 : : * Free memory allocated by subscription struct.
187 : : */
188 : : void
189 : 43 : FreeSubscription(Subscription *sub)
190 : : {
191 : 43 : pfree(sub->name);
192 : 43 : pfree(sub->conninfo);
3042 193 [ + - ]: 43 : if (sub->slotname)
194 : 43 : pfree(sub->slotname);
3152 195 : 43 : list_free_deep(sub->publications);
196 : 43 : pfree(sub);
197 : 43 : }
198 : :
199 : : /*
200 : : * Disable the given subscription.
201 : : */
202 : : void
1272 akapila@postgresql.o 203 : 4 : DisableSubscription(Oid subid)
204 : : {
205 : : Relation rel;
206 : : bool nulls[Natts_pg_subscription];
207 : : bool replaces[Natts_pg_subscription];
208 : : Datum values[Natts_pg_subscription];
209 : : HeapTuple tup;
210 : :
211 : : /* Look up the subscription in the catalog */
212 : 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
213 : 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
214 : :
215 [ - + ]: 4 : if (!HeapTupleIsValid(tup))
1272 akapila@postgresql.o 216 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
217 : :
1272 akapila@postgresql.o 218 :CBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
219 : :
220 : : /* Form a new tuple. */
221 : 4 : memset(values, 0, sizeof(values));
222 : 4 : memset(nulls, false, sizeof(nulls));
223 : 4 : memset(replaces, false, sizeof(replaces));
224 : :
225 : : /* Set the subscription to disabled. */
226 : 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
227 : 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
228 : :
229 : : /* Update the catalog */
230 : 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
231 : : replaces);
232 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
233 : 4 : heap_freetuple(tup);
234 : :
235 : 4 : table_close(rel, NoLock);
236 : 4 : }
237 : :
238 : : /*
239 : : * Convert text array to list of strings.
240 : : *
241 : : * Note: the resulting list of strings is pallocated here.
242 : : */
243 : : static List *
3152 peter_e@gmx.net 244 : 798 : textarray_to_stringlist(ArrayType *textarray)
245 : : {
246 : : Datum *elems;
247 : : int nelems,
248 : : i;
3034 bruce@momjian.us 249 : 798 : List *res = NIL;
250 : :
1163 peter@eisentraut.org 251 : 798 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
252 : :
3152 peter_e@gmx.net 253 [ - + ]: 798 : if (nelems == 0)
3152 peter_e@gmx.net 254 :UBC 0 : return NIL;
255 : :
3152 peter_e@gmx.net 256 [ + + ]:CBC 1955 : for (i = 0; i < nelems; i++)
3067 257 : 1157 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
258 : :
3152 259 : 798 : return res;
260 : : }
261 : :
262 : : /*
263 : : * Add new state record for a subscription table.
264 : : *
265 : : * If retain_lock is true, then don't release the locks taken in this function.
266 : : * We normally release the locks at the end of transaction but in binary-upgrade
267 : : * mode, we expect to release those immediately.
268 : : */
269 : : void
2710 270 : 202 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
271 : : XLogRecPtr sublsn, bool retain_lock)
272 : : {
273 : : Relation rel;
274 : : HeapTuple tup;
275 : : bool nulls[Natts_pg_subscription_rel];
276 : : Datum values[Natts_pg_subscription_rel];
277 : :
2987 278 : 202 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
279 : :
2420 andres@anarazel.de 280 : 202 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
281 : :
282 : : /* Try finding existing mapping. */
3089 peter_e@gmx.net 283 : 202 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
284 : : ObjectIdGetDatum(relid),
285 : : ObjectIdGetDatum(subid));
2710 286 [ - + ]: 202 : if (HeapTupleIsValid(tup))
2710 peter_e@gmx.net 287 [ # # ]:UBC 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
288 : : relid, subid);
289 : :
290 : : /* Form the tuple. */
2710 peter_e@gmx.net 291 :CBC 202 : memset(values, 0, sizeof(values));
292 : 202 : memset(nulls, false, sizeof(nulls));
293 : 202 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
294 : 202 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
295 : 202 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
296 [ + + ]: 202 : if (sublsn != InvalidXLogRecPtr)
297 : 1 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
298 : : else
299 : 201 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
300 : :
301 : 202 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302 : :
303 : : /* Insert tuple into catalog. */
2482 andres@anarazel.de 304 : 202 : CatalogTupleInsert(rel, tup);
305 : :
2710 peter_e@gmx.net 306 : 202 : heap_freetuple(tup);
307 : :
308 : : /* Cleanup. */
613 akapila@postgresql.o 309 [ + + ]: 202 : if (retain_lock)
310 : : {
311 : 200 : table_close(rel, NoLock);
312 : : }
313 : : else
314 : : {
315 : 2 : table_close(rel, RowExclusiveLock);
316 : 2 : UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
317 : : }
2710 peter_e@gmx.net 318 : 202 : }
319 : :
320 : : /*
321 : : * Update the state of a subscription table.
322 : : */
323 : : void
324 : 741 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
325 : : XLogRecPtr sublsn, bool already_locked)
326 : : {
327 : : Relation rel;
328 : : HeapTuple tup;
329 : : bool nulls[Natts_pg_subscription_rel];
330 : : Datum values[Natts_pg_subscription_rel];
331 : : bool replaces[Natts_pg_subscription_rel];
332 : :
36 akapila@postgresql.o 333 [ + + ]: 741 : if (already_locked)
334 : : {
335 : : #ifdef USE_ASSERT_CHECKING
336 : : LOCKTAG tag;
337 : :
338 [ - + ]: 181 : Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
339 : : RowExclusiveLock, true));
340 : 181 : SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
341 [ - + ]: 181 : Assert(LockHeldByMe(&tag, AccessShareLock, true));
342 : : #endif
343 : :
344 : 181 : rel = table_open(SubscriptionRelRelationId, NoLock);
345 : : }
346 : : else
347 : : {
348 : 560 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
349 : 560 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
350 : : }
351 : :
352 : : /* Try finding existing mapping. */
2710 peter_e@gmx.net 353 : 741 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
354 : : ObjectIdGetDatum(relid),
355 : : ObjectIdGetDatum(subid));
356 [ - + ]: 741 : if (!HeapTupleIsValid(tup))
2710 peter_e@gmx.net 357 [ # # ]:UBC 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
358 : : relid, subid);
359 : :
360 : : /* Update the tuple. */
2710 peter_e@gmx.net 361 :CBC 741 : memset(values, 0, sizeof(values));
362 : 741 : memset(nulls, false, sizeof(nulls));
363 : 741 : memset(replaces, false, sizeof(replaces));
364 : :
365 : 741 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
366 : 741 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
367 : :
368 : 741 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
369 [ + + ]: 741 : if (sublsn != InvalidXLogRecPtr)
370 : 364 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
371 : : else
372 : 377 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
373 : :
374 : 741 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
375 : : replaces);
376 : :
377 : : /* Update the catalog. */
378 : 741 : CatalogTupleUpdate(rel, &tup->t_self, tup);
379 : :
380 : : /* Cleanup. */
2420 andres@anarazel.de 381 : 741 : table_close(rel, NoLock);
3089 peter_e@gmx.net 382 : 741 : }
383 : :
384 : : /*
385 : : * Get state of subscription table.
386 : : *
387 : : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
388 : : */
389 : : char
1787 alvherre@alvh.no-ip. 390 : 1186 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
391 : : {
392 : : HeapTuple tup;
393 : : char substate;
394 : : bool isnull;
395 : : Datum d;
396 : : Relation rel;
397 : :
398 : : /*
399 : : * This is to avoid the race condition with AlterSubscription which tries
400 : : * to remove this relstate.
401 : : */
1667 akapila@postgresql.o 402 : 1186 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
403 : :
404 : : /* Try finding the mapping. */
3089 peter_e@gmx.net 405 : 1186 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
406 : : ObjectIdGetDatum(relid),
407 : : ObjectIdGetDatum(subid));
408 : :
409 [ + + ]: 1186 : if (!HeapTupleIsValid(tup))
410 : : {
1654 akapila@postgresql.o 411 : 24 : table_close(rel, AccessShareLock);
1787 alvherre@alvh.no-ip. 412 : 24 : *sublsn = InvalidXLogRecPtr;
413 : 24 : return SUBREL_STATE_UNKNOWN;
414 : : }
415 : :
416 : : /* Get the state. */
417 : 1162 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
418 : :
419 : : /* Get the LSN */
3089 peter_e@gmx.net 420 : 1162 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
421 : : Anum_pg_subscription_rel_srsublsn, &isnull);
422 [ + + ]: 1162 : if (isnull)
423 : 628 : *sublsn = InvalidXLogRecPtr;
424 : : else
425 : 534 : *sublsn = DatumGetLSN(d);
426 : :
427 : : /* Cleanup */
428 : 1162 : ReleaseSysCache(tup);
429 : :
1667 akapila@postgresql.o 430 : 1162 : table_close(rel, AccessShareLock);
431 : :
3089 peter_e@gmx.net 432 : 1162 : return substate;
433 : : }
434 : :
435 : : /*
436 : : * Drop subscription relation mapping. These can be for a particular
437 : : * subscription, or for a particular relation, or both.
438 : : */
439 : : void
440 : 22271 : RemoveSubscriptionRel(Oid subid, Oid relid)
441 : : {
442 : : Relation rel;
443 : : TableScanDesc scan;
444 : : ScanKeyData skey[2];
445 : : HeapTuple tup;
446 : 22271 : int nkeys = 0;
447 : :
2420 andres@anarazel.de 448 : 22271 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
449 : :
3089 peter_e@gmx.net 450 [ + + ]: 22271 : if (OidIsValid(subid))
451 : : {
452 : 133 : ScanKeyInit(&skey[nkeys++],
453 : : Anum_pg_subscription_rel_srsubid,
454 : : BTEqualStrategyNumber,
455 : : F_OIDEQ,
456 : : ObjectIdGetDatum(subid));
457 : : }
458 : :
459 [ + + ]: 22271 : if (OidIsValid(relid))
460 : : {
461 : 22158 : ScanKeyInit(&skey[nkeys++],
462 : : Anum_pg_subscription_rel_srrelid,
463 : : BTEqualStrategyNumber,
464 : : F_OIDEQ,
465 : : ObjectIdGetDatum(relid));
466 : : }
467 : :
468 : : /* Do the search and delete what we found. */
2371 andres@anarazel.de 469 : 22271 : scan = table_beginscan_catalog(rel, nkeys, skey);
3089 peter_e@gmx.net 470 [ + + ]: 22383 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
471 : : {
472 : : Form_pg_subscription_rel subrel;
473 : :
1667 akapila@postgresql.o 474 : 112 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
475 : :
476 : : /*
477 : : * We don't allow to drop the relation mapping when the table
478 : : * synchronization is in progress unless the caller updates the
479 : : * corresponding subscription as well. This is to ensure that we don't
480 : : * leave tablesync slots or origins in the system when the
481 : : * corresponding table is dropped.
482 : : */
483 [ + + - + ]: 112 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
484 : : {
1667 akapila@postgresql.o 485 [ # # ]:UBC 0 : ereport(ERROR,
486 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
487 : : errmsg("could not drop relation mapping for subscription \"%s\"",
488 : : get_subscription_name(subrel->srsubid, false)),
489 : : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
490 : : get_rel_name(relid), subrel->srsubstate),
491 : :
492 : : /*
493 : : * translator: first %s is a SQL ALTER command and second %s is a
494 : : * SQL DROP command
495 : : */
496 : : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
497 : : "ALTER SUBSCRIPTION ... ENABLE",
498 : : "DROP SUBSCRIPTION ...")));
499 : : }
500 : :
3006 tgl@sss.pgh.pa.us 501 :CBC 112 : CatalogTupleDelete(rel, &tup->t_self);
502 : : }
2371 andres@anarazel.de 503 : 22271 : table_endscan(scan);
504 : :
2420 505 : 22271 : table_close(rel, RowExclusiveLock);
3089 peter_e@gmx.net 506 : 22271 : }
507 : :
508 : : /*
509 : : * Does the subscription have any relations?
510 : : *
511 : : * Use this function only to know true/false, and when you have no need for the
512 : : * List returned by GetSubscriptionRelations.
513 : : */
514 : : bool
1515 akapila@postgresql.o 515 : 240 : HasSubscriptionRelations(Oid subid)
516 : : {
517 : : Relation rel;
518 : : ScanKeyData skey[1];
519 : : SysScanDesc scan;
520 : : bool has_subrels;
521 : :
522 : 240 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
523 : :
524 : 240 : ScanKeyInit(&skey[0],
525 : : Anum_pg_subscription_rel_srsubid,
526 : : BTEqualStrategyNumber, F_OIDEQ,
527 : : ObjectIdGetDatum(subid));
528 : :
529 : 240 : scan = systable_beginscan(rel, InvalidOid, false,
530 : : NULL, 1, skey);
531 : :
532 : : /* If even a single tuple exists then the subscription has tables. */
533 : 240 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
534 : :
535 : : /* Cleanup */
536 : 240 : systable_endscan(scan);
537 : 240 : table_close(rel, AccessShareLock);
538 : :
539 : 240 : return has_subrels;
540 : : }
541 : :
542 : : /*
543 : : * Get the relations for the subscription.
544 : : *
545 : : * If not_ready is true, return only the relations that are not in a ready
546 : : * state, otherwise return all the relations of the subscription. The
547 : : * returned list is palloc'ed in the current memory context.
548 : : */
549 : : List *
1137 michael@paquier.xyz 550 : 1031 : GetSubscriptionRelations(Oid subid, bool not_ready)
551 : : {
3089 peter_e@gmx.net 552 : 1031 : List *res = NIL;
553 : : Relation rel;
554 : : HeapTuple tup;
555 : 1031 : int nkeys = 0;
556 : : ScanKeyData skey[2];
557 : : SysScanDesc scan;
558 : :
2420 andres@anarazel.de 559 : 1031 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
560 : :
3089 peter_e@gmx.net 561 : 1031 : ScanKeyInit(&skey[nkeys++],
562 : : Anum_pg_subscription_rel_srsubid,
563 : : BTEqualStrategyNumber, F_OIDEQ,
564 : : ObjectIdGetDatum(subid));
565 : :
1137 michael@paquier.xyz 566 [ + + ]: 1031 : if (not_ready)
567 : 999 : ScanKeyInit(&skey[nkeys++],
568 : : Anum_pg_subscription_rel_srsubstate,
569 : : BTEqualStrategyNumber, F_CHARNE,
570 : : CharGetDatum(SUBREL_STATE_READY));
571 : :
3089 peter_e@gmx.net 572 : 1031 : scan = systable_beginscan(rel, InvalidOid, false,
573 : : NULL, nkeys, skey);
574 : :
575 [ + + ]: 2664 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
576 : : {
577 : : Form_pg_subscription_rel subrel;
578 : : SubscriptionRelState *relstate;
579 : : Datum d;
580 : : bool isnull;
581 : :
582 : 1633 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
583 : :
3034 bruce@momjian.us 584 : 1633 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
3089 peter_e@gmx.net 585 : 1633 : relstate->relid = subrel->srrelid;
586 : 1633 : relstate->state = subrel->srsubstate;
1874 tgl@sss.pgh.pa.us 587 : 1633 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
588 : : Anum_pg_subscription_rel_srsublsn, &isnull);
589 [ + + ]: 1633 : if (isnull)
590 : 1363 : relstate->lsn = InvalidXLogRecPtr;
591 : : else
592 : 270 : relstate->lsn = DatumGetLSN(d);
593 : :
3089 peter_e@gmx.net 594 : 1633 : res = lappend(res, relstate);
595 : : }
596 : :
597 : : /* Cleanup */
598 : 1031 : systable_endscan(scan);
2420 andres@anarazel.de 599 : 1031 : table_close(rel, AccessShareLock);
600 : :
3089 peter_e@gmx.net 601 : 1031 : return res;
602 : : }
603 : :
604 : : /*
605 : : * Update the dead tuple retention status for the given subscription.
606 : : */
607 : : void
4 akapila@postgresql.o 608 :UNC 0 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
609 : : {
610 : : Relation rel;
611 : : bool nulls[Natts_pg_subscription];
612 : : bool replaces[Natts_pg_subscription];
613 : : Datum values[Natts_pg_subscription];
614 : : HeapTuple tup;
615 : :
616 : : /* Look up the subscription in the catalog */
617 : 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
618 : 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
619 : :
620 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
621 [ # # ]: 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
622 : :
623 : 0 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
624 : :
625 : : /* Form a new tuple. */
626 : 0 : memset(values, 0, sizeof(values));
627 : 0 : memset(nulls, false, sizeof(nulls));
628 : 0 : memset(replaces, false, sizeof(replaces));
629 : :
630 : : /* Set the subscription to disabled. */
631 : 0 : values[Anum_pg_subscription_subretentionactive - 1] = active;
632 : 0 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
633 : :
634 : : /* Update the catalog */
635 : 0 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
636 : : replaces);
637 : 0 : CatalogTupleUpdate(rel, &tup->t_self, tup);
638 : 0 : heap_freetuple(tup);
639 : :
640 : 0 : table_close(rel, NoLock);
641 : 0 : }
|