Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * subscriptioncmds.c
4 : : * subscription catalog manipulation functions
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/subscriptioncmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/commit_ts.h"
18 : : #include "access/htup_details.h"
19 : : #include "access/table.h"
20 : : #include "access/twophase.h"
21 : : #include "access/xact.h"
22 : : #include "catalog/catalog.h"
23 : : #include "catalog/dependency.h"
24 : : #include "catalog/indexing.h"
25 : : #include "catalog/namespace.h"
26 : : #include "catalog/objectaccess.h"
27 : : #include "catalog/objectaddress.h"
28 : : #include "catalog/pg_authid_d.h"
29 : : #include "catalog/pg_database_d.h"
30 : : #include "catalog/pg_subscription.h"
31 : : #include "catalog/pg_subscription_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/defrem.h"
34 : : #include "commands/event_trigger.h"
35 : : #include "commands/subscriptioncmds.h"
36 : : #include "executor/executor.h"
37 : : #include "miscadmin.h"
38 : : #include "nodes/makefuncs.h"
39 : : #include "pgstat.h"
40 : : #include "replication/logicallauncher.h"
41 : : #include "replication/logicalworker.h"
42 : : #include "replication/origin.h"
43 : : #include "replication/slot.h"
44 : : #include "replication/walreceiver.h"
45 : : #include "replication/walsender.h"
46 : : #include "replication/worker_internal.h"
47 : : #include "storage/lmgr.h"
48 : : #include "utils/acl.h"
49 : : #include "utils/builtins.h"
50 : : #include "utils/guc.h"
51 : : #include "utils/lsyscache.h"
52 : : #include "utils/memutils.h"
53 : : #include "utils/pg_lsn.h"
54 : : #include "utils/syscache.h"
55 : :
56 : : /*
57 : : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 : : * command.
59 : : */
60 : : #define SUBOPT_CONNECT 0x00000001
61 : : #define SUBOPT_ENABLED 0x00000002
62 : : #define SUBOPT_CREATE_SLOT 0x00000004
63 : : #define SUBOPT_SLOT_NAME 0x00000008
64 : : #define SUBOPT_COPY_DATA 0x00000010
65 : : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 : : #define SUBOPT_REFRESH 0x00000040
67 : : #define SUBOPT_BINARY 0x00000080
68 : : #define SUBOPT_STREAMING 0x00000100
69 : : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 : : #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 : : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 : : #define SUBOPT_RUN_AS_OWNER 0x00001000
73 : : #define SUBOPT_FAILOVER 0x00002000
74 : : #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75 : : #define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76 : : #define SUBOPT_LSN 0x00010000
77 : : #define SUBOPT_ORIGIN 0x00020000
78 : :
79 : : /* check if the 'val' has 'bits' set */
80 : : #define IsSet(val, bits) (((val) & (bits)) == (bits))
81 : :
82 : : /*
83 : : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
84 : : * SUBSCRIPTION command options and the parsed/default values of each of them.
85 : : */
86 : : typedef struct SubOpts
87 : : {
88 : : bits32 specified_opts;
89 : : char *slot_name;
90 : : char *synchronous_commit;
91 : : bool connect;
92 : : bool enabled;
93 : : bool create_slot;
94 : : bool copy_data;
95 : : bool refresh;
96 : : bool binary;
97 : : char streaming;
98 : : bool twophase;
99 : : bool disableonerr;
100 : : bool passwordrequired;
101 : : bool runasowner;
102 : : bool failover;
103 : : bool retaindeadtuples;
104 : : int32 maxretention;
105 : : char *origin;
106 : : XLogRecPtr lsn;
107 : : } SubOpts;
108 : :
109 : : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
110 : : static void check_publications_origin(WalReceiverConn *wrconn,
111 : : List *publications, bool copydata,
112 : : bool retain_dead_tuples, char *origin,
113 : : Oid *subrel_local_oids, int subrel_count,
114 : : char *subname);
115 : : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
116 : : static void check_duplicates_in_publist(List *publist, Datum *datums);
117 : : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
118 : : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
119 : : static void CheckAlterSubOption(Subscription *sub, const char *option,
120 : : bool slot_needs_update, bool isTopLevel);
121 : :
122 : :
123 : : /*
124 : : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
125 : : *
126 : : * Since not all options can be specified in both commands, this function
127 : : * will report an error if mutually exclusive options are specified.
128 : : */
129 : : static void
1514 dean.a.rasheed@gmail 130 :CBC 478 : parse_subscription_options(ParseState *pstate, List *stmt_options,
131 : : bits32 supported_opts, SubOpts *opts)
132 : : {
133 : : ListCell *lc;
134 : :
135 : : /* Start out with cleared opts. */
1368 michael@paquier.xyz 136 : 478 : memset(opts, 0, sizeof(SubOpts));
137 : :
138 : : /* caller must expect some option */
1523 akapila@postgresql.o 139 [ - + ]: 478 : Assert(supported_opts != 0);
140 : :
141 : : /* If connect option is supported, these others also need to be. */
142 [ + + - + ]: 478 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
143 : : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
144 : : SUBOPT_COPY_DATA));
145 : :
146 : : /* Set default values for the supported options. */
147 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_CONNECT))
148 : 239 : opts->connect = true;
149 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_ENABLED))
150 : 289 : opts->enabled = true;
151 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
152 : 239 : opts->create_slot = true;
153 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
154 : 308 : opts->copy_data = true;
155 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_REFRESH))
156 : 43 : opts->refresh = true;
157 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_BINARY))
158 : 347 : opts->binary = false;
159 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_STREAMING))
313 160 : 347 : opts->streaming = LOGICALREP_STREAM_PARALLEL;
1515 161 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
162 : 347 : opts->twophase = false;
1272 163 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
164 : 347 : opts->disableonerr = false;
891 rhaas@postgresql.org 165 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
166 : 347 : opts->passwordrequired = true;
886 167 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
168 : 347 : opts->runasowner = false;
585 akapila@postgresql.o 169 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_FAILOVER))
170 : 347 : opts->failover = false;
45 akapila@postgresql.o 171 [ + + ]:GNC 478 : if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
172 : 347 : opts->retaindeadtuples = false;
4 173 [ + + ]: 478 : if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
174 : 347 : opts->maxretention = 0;
1143 akapila@postgresql.o 175 [ + + ]:CBC 478 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
176 : 347 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
177 : :
178 : : /* Parse options */
1523 179 [ + + + + : 948 : foreach(lc, stmt_options)
+ + ]
180 : : {
3152 peter_e@gmx.net 181 : 503 : DefElem *defel = (DefElem *) lfirst(lc);
182 : :
1523 akapila@postgresql.o 183 [ + + ]: 503 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
184 [ + + ]: 295 : strcmp(defel->defname, "connect") == 0)
185 : : {
186 [ - + ]: 95 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
1514 dean.a.rasheed@gmail 187 :UBC 0 : errorConflictingDefElem(defel, pstate);
188 : :
1523 akapila@postgresql.o 189 :CBC 95 : opts->specified_opts |= SUBOPT_CONNECT;
190 : 95 : opts->connect = defGetBoolean(defel);
191 : : }
192 [ + + ]: 408 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
193 [ + + ]: 250 : strcmp(defel->defname, "enabled") == 0)
194 : : {
195 [ - + ]: 69 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
1514 dean.a.rasheed@gmail 196 :UBC 0 : errorConflictingDefElem(defel, pstate);
197 : :
1523 akapila@postgresql.o 198 :CBC 69 : opts->specified_opts |= SUBOPT_ENABLED;
199 : 69 : opts->enabled = defGetBoolean(defel);
200 : : }
201 [ + + ]: 339 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
202 [ + + ]: 181 : strcmp(defel->defname, "create_slot") == 0)
203 : : {
204 [ - + ]: 20 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
1514 dean.a.rasheed@gmail 205 :UBC 0 : errorConflictingDefElem(defel, pstate);
206 : :
1523 akapila@postgresql.o 207 :CBC 20 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
208 : 20 : opts->create_slot = defGetBoolean(defel);
209 : : }
210 [ + + ]: 319 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
211 [ + + ]: 271 : strcmp(defel->defname, "slot_name") == 0)
212 : : {
213 [ - + ]: 78 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
1514 dean.a.rasheed@gmail 214 :UBC 0 : errorConflictingDefElem(defel, pstate);
215 : :
1523 akapila@postgresql.o 216 :CBC 78 : opts->specified_opts |= SUBOPT_SLOT_NAME;
217 : 78 : opts->slot_name = defGetString(defel);
218 : :
219 : : /* Setting slot_name = NONE is treated as no slot name. */
220 [ + + ]: 153 : if (strcmp(opts->slot_name, "none") == 0)
221 : 61 : opts->slot_name = NULL;
222 : : else
45 akapila@postgresql.o 223 :GNC 17 : ReplicationSlotValidateName(opts->slot_name, false, ERROR);
224 : : }
1523 akapila@postgresql.o 225 [ + + ]:CBC 241 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
226 [ + + ]: 158 : strcmp(defel->defname, "copy_data") == 0)
227 : : {
228 [ - + ]: 26 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
1514 dean.a.rasheed@gmail 229 :UBC 0 : errorConflictingDefElem(defel, pstate);
230 : :
1523 akapila@postgresql.o 231 :CBC 26 : opts->specified_opts |= SUBOPT_COPY_DATA;
232 : 26 : opts->copy_data = defGetBoolean(defel);
233 : : }
234 [ + + ]: 215 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
235 [ + + ]: 170 : strcmp(defel->defname, "synchronous_commit") == 0)
236 : : {
237 [ - + ]: 6 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
1514 dean.a.rasheed@gmail 238 :UBC 0 : errorConflictingDefElem(defel, pstate);
239 : :
1523 akapila@postgresql.o 240 :CBC 6 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
241 : 6 : opts->synchronous_commit = defGetString(defel);
242 : :
243 : : /* Test if the given value is valid for synchronous_commit GUC. */
244 : 6 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
245 : : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
246 : : false, 0, false);
247 : : }
248 [ + + ]: 209 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
249 [ + - ]: 33 : strcmp(defel->defname, "refresh") == 0)
250 : : {
251 [ - + ]: 33 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
1514 dean.a.rasheed@gmail 252 :UBC 0 : errorConflictingDefElem(defel, pstate);
253 : :
1523 akapila@postgresql.o 254 :CBC 33 : opts->specified_opts |= SUBOPT_REFRESH;
255 : 33 : opts->refresh = defGetBoolean(defel);
256 : : }
257 [ + + ]: 176 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
258 [ + + ]: 164 : strcmp(defel->defname, "binary") == 0)
259 : : {
260 [ - + ]: 16 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
1514 dean.a.rasheed@gmail 261 :UBC 0 : errorConflictingDefElem(defel, pstate);
262 : :
1523 akapila@postgresql.o 263 :CBC 16 : opts->specified_opts |= SUBOPT_BINARY;
264 : 16 : opts->binary = defGetBoolean(defel);
265 : : }
266 [ + + ]: 160 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
267 [ + + ]: 148 : strcmp(defel->defname, "streaming") == 0)
268 : : {
269 [ - + ]: 37 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
1514 dean.a.rasheed@gmail 270 :UBC 0 : errorConflictingDefElem(defel, pstate);
271 : :
1523 akapila@postgresql.o 272 :CBC 37 : opts->specified_opts |= SUBOPT_STREAMING;
971 273 : 37 : opts->streaming = defGetStreamingMode(defel);
274 : : }
409 275 [ + + ]: 123 : else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
276 [ + + ]: 111 : strcmp(defel->defname, "two_phase") == 0)
277 : : {
1515 278 [ - + ]: 21 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
1514 dean.a.rasheed@gmail 279 :UBC 0 : errorConflictingDefElem(defel, pstate);
280 : :
1515 akapila@postgresql.o 281 :CBC 21 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
282 : 21 : opts->twophase = defGetBoolean(defel);
283 : : }
1272 284 [ + + ]: 102 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
285 [ + + ]: 90 : strcmp(defel->defname, "disable_on_error") == 0)
286 : : {
287 [ - + ]: 10 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
1272 akapila@postgresql.o 288 :UBC 0 : errorConflictingDefElem(defel, pstate);
289 : :
1272 akapila@postgresql.o 290 :CBC 10 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
291 : 10 : opts->disableonerr = defGetBoolean(defel);
292 : : }
891 rhaas@postgresql.org 293 [ + + ]: 92 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
294 [ + + ]: 80 : strcmp(defel->defname, "password_required") == 0)
295 : : {
296 [ - + ]: 12 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
891 rhaas@postgresql.org 297 :UBC 0 : errorConflictingDefElem(defel, pstate);
298 : :
891 rhaas@postgresql.org 299 :CBC 12 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
300 : 12 : opts->passwordrequired = defGetBoolean(defel);
301 : : }
886 302 [ + + ]: 80 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
303 [ + + ]: 68 : strcmp(defel->defname, "run_as_owner") == 0)
304 : : {
305 [ - + ]: 9 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
886 rhaas@postgresql.org 306 :UBC 0 : errorConflictingDefElem(defel, pstate);
307 : :
886 rhaas@postgresql.org 308 :CBC 9 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
309 : 9 : opts->runasowner = defGetBoolean(defel);
310 : : }
585 akapila@postgresql.o 311 [ + + ]: 71 : else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
312 [ + + ]: 59 : strcmp(defel->defname, "failover") == 0)
313 : : {
314 [ - + ]: 13 : if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
585 akapila@postgresql.o 315 :UBC 0 : errorConflictingDefElem(defel, pstate);
316 : :
585 akapila@postgresql.o 317 :CBC 13 : opts->specified_opts |= SUBOPT_FAILOVER;
318 : 13 : opts->failover = defGetBoolean(defel);
319 : : }
45 akapila@postgresql.o 320 [ + + ]:GNC 58 : else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
321 [ + + ]: 46 : strcmp(defel->defname, "retain_dead_tuples") == 0)
322 : : {
323 [ - + ]: 12 : if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
45 akapila@postgresql.o 324 :UNC 0 : errorConflictingDefElem(defel, pstate);
325 : :
45 akapila@postgresql.o 326 :GNC 12 : opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
327 : 12 : opts->retaindeadtuples = defGetBoolean(defel);
328 : : }
4 329 [ + + ]: 46 : else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
330 [ + + ]: 34 : strcmp(defel->defname, "max_retention_duration") == 0)
331 : : {
332 [ - + ]: 10 : if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
4 akapila@postgresql.o 333 :UNC 0 : errorConflictingDefElem(defel, pstate);
334 : :
4 akapila@postgresql.o 335 :GNC 10 : opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
336 : 10 : opts->maxretention = defGetInt32(defel);
337 : : }
1143 akapila@postgresql.o 338 [ + + ]:CBC 36 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
339 [ + + ]: 24 : strcmp(defel->defname, "origin") == 0)
340 : : {
341 [ - + ]: 21 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
1143 akapila@postgresql.o 342 :UBC 0 : errorConflictingDefElem(defel, pstate);
343 : :
1143 akapila@postgresql.o 344 :CBC 21 : opts->specified_opts |= SUBOPT_ORIGIN;
345 : 21 : pfree(opts->origin);
346 : :
347 : : /*
348 : : * Even though the "origin" parameter allows only "none" and "any"
349 : : * values, it is implemented as a string type so that the
350 : : * parameter can be extended in future versions to support
351 : : * filtering using origin names specified by the user.
352 : : */
353 : 21 : opts->origin = defGetString(defel);
354 : :
355 [ + + + + ]: 29 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
356 : 8 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
357 [ + - ]: 3 : ereport(ERROR,
358 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
359 : : errmsg("unrecognized origin value: \"%s\"", opts->origin));
360 : : }
1264 361 [ + + ]: 15 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
362 [ + - ]: 12 : strcmp(defel->defname, "lsn") == 0)
363 : 9 : {
364 : 12 : char *lsn_str = defGetString(defel);
365 : : XLogRecPtr lsn;
366 : :
367 [ - + ]: 12 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
1264 akapila@postgresql.o 368 :UBC 0 : errorConflictingDefElem(defel, pstate);
369 : :
370 : : /* Setting lsn = NONE is treated as resetting LSN */
1264 akapila@postgresql.o 371 [ + + ]:CBC 12 : if (strcmp(lsn_str, "none") == 0)
372 : 3 : lsn = InvalidXLogRecPtr;
373 : : else
374 : : {
375 : : /* Parse the argument as LSN */
376 : 9 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
377 : : CStringGetDatum(lsn_str)));
378 : :
379 [ + + ]: 9 : if (XLogRecPtrIsInvalid(lsn))
380 [ + - ]: 3 : ereport(ERROR,
381 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
382 : : errmsg("invalid WAL location (LSN): %s", lsn_str)));
383 : : }
384 : :
385 : 9 : opts->specified_opts |= SUBOPT_LSN;
386 : 9 : opts->lsn = lsn;
387 : : }
388 : : else
3034 peter_e@gmx.net 389 [ + - ]: 3 : ereport(ERROR,
390 : : (errcode(ERRCODE_SYNTAX_ERROR),
391 : : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
392 : : }
393 : :
394 : : /*
395 : : * We've been explicitly asked to not connect, that requires some
396 : : * additional processing.
397 : : */
1523 akapila@postgresql.o 398 [ + + + + ]: 445 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
399 : : {
400 : : /* Check for incompatible options from the user. */
401 [ + - ]: 74 : if (opts->enabled &&
402 [ + + ]: 74 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
3089 peter_e@gmx.net 403 [ + - ]: 3 : ereport(ERROR,
404 : : (errcode(ERRCODE_SYNTAX_ERROR),
405 : : /*- translator: both %s are strings of the form "option = value" */
406 : : errmsg("%s and %s are mutually exclusive options",
407 : : "connect = false", "enabled = true")));
408 : :
1523 akapila@postgresql.o 409 [ + + ]: 71 : if (opts->create_slot &&
410 [ + + ]: 68 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
3089 peter_e@gmx.net 411 [ + - ]: 3 : ereport(ERROR,
412 : : (errcode(ERRCODE_SYNTAX_ERROR),
413 : : errmsg("%s and %s are mutually exclusive options",
414 : : "connect = false", "create_slot = true")));
415 : :
1523 akapila@postgresql.o 416 [ + + ]: 68 : if (opts->copy_data &&
417 [ + + ]: 65 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
3089 peter_e@gmx.net 418 [ + - ]: 3 : ereport(ERROR,
419 : : (errcode(ERRCODE_SYNTAX_ERROR),
420 : : errmsg("%s and %s are mutually exclusive options",
421 : : "connect = false", "copy_data = true")));
422 : :
423 : : /* Change the defaults of other options. */
1523 akapila@postgresql.o 424 : 65 : opts->enabled = false;
425 : 65 : opts->create_slot = false;
426 : 65 : opts->copy_data = false;
427 : : }
428 : :
429 : : /*
430 : : * Do additional checking for disallowed combination when slot_name = NONE
431 : : * was used.
432 : : */
433 [ + + ]: 436 : if (!opts->slot_name &&
434 [ + + ]: 422 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
435 : : {
1368 michael@paquier.xyz 436 [ + + ]: 58 : if (opts->enabled)
437 : : {
438 [ + + ]: 9 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
439 [ + - ]: 3 : ereport(ERROR,
440 : : (errcode(ERRCODE_SYNTAX_ERROR),
441 : : /*- translator: both %s are strings of the form "option = value" */
442 : : errmsg("%s and %s are mutually exclusive options",
443 : : "slot_name = NONE", "enabled = true")));
444 : : else
445 [ + - ]: 6 : ereport(ERROR,
446 : : (errcode(ERRCODE_SYNTAX_ERROR),
447 : : /*- translator: both %s are strings of the form "option = value" */
448 : : errmsg("subscription with %s must also set %s",
449 : : "slot_name = NONE", "enabled = false")));
450 : : }
451 : :
452 [ + + ]: 49 : if (opts->create_slot)
453 : : {
454 [ + + ]: 6 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
455 [ + - ]: 3 : ereport(ERROR,
456 : : (errcode(ERRCODE_SYNTAX_ERROR),
457 : : /*- translator: both %s are strings of the form "option = value" */
458 : : errmsg("%s and %s are mutually exclusive options",
459 : : "slot_name = NONE", "create_slot = true")));
460 : : else
461 [ + - ]: 3 : ereport(ERROR,
462 : : (errcode(ERRCODE_SYNTAX_ERROR),
463 : : /*- translator: both %s are strings of the form "option = value" */
464 : : errmsg("subscription with %s must also set %s",
465 : : "slot_name = NONE", "create_slot = false")));
466 : : }
467 : : }
3152 peter_e@gmx.net 468 : 421 : }
469 : :
470 : : /*
471 : : * Check that the specified publications are present on the publisher.
472 : : */
473 : : static void
1255 akapila@postgresql.o 474 : 126 : check_publications(WalReceiverConn *wrconn, List *publications)
475 : : {
476 : : WalRcvExecResult *res;
477 : : StringInfo cmd;
478 : : TupleTableSlot *slot;
479 : 126 : List *publicationsCopy = NIL;
480 : 126 : Oid tableRow[1] = {TEXTOID};
481 : :
482 : 126 : cmd = makeStringInfo();
483 : 126 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
484 : : " pg_catalog.pg_publication t WHERE\n"
485 : : " t.pubname IN (");
316 michael@paquier.xyz 486 : 126 : GetPublicationsStr(publications, cmd, true);
1255 akapila@postgresql.o 487 : 126 : appendStringInfoChar(cmd, ')');
488 : :
489 : 126 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
539 dgustafsson@postgres 490 : 126 : destroyStringInfo(cmd);
491 : :
1255 akapila@postgresql.o 492 [ - + ]: 126 : if (res->status != WALRCV_OK_TUPLES)
1255 akapila@postgresql.o 493 [ # # ]:UBC 0 : ereport(ERROR,
494 : : errmsg("could not receive list of publications from the publisher: %s",
495 : : res->err));
496 : :
1255 akapila@postgresql.o 497 :CBC 126 : publicationsCopy = list_copy(publications);
498 : :
499 : : /* Process publication(s). */
500 : 126 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
501 [ + + ]: 278 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
502 : : {
503 : : char *pubname;
504 : : bool isnull;
505 : :
506 : 152 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
507 [ - + ]: 152 : Assert(!isnull);
508 : :
509 : : /* Delete the publication present in publisher from the list. */
510 : 152 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
511 : 152 : ExecClearTuple(slot);
512 : : }
513 : :
514 : 126 : ExecDropSingleTupleTableSlot(slot);
515 : :
516 : 126 : walrcv_clear_result(res);
517 : :
518 [ + + ]: 126 : if (list_length(publicationsCopy))
519 : : {
520 : : /* Prepare the list of non-existent publication(s) for error message. */
521 : 4 : StringInfo pubnames = makeStringInfo();
522 : :
316 michael@paquier.xyz 523 : 4 : GetPublicationsStr(publicationsCopy, pubnames, false);
1255 akapila@postgresql.o 524 [ + - ]: 4 : ereport(WARNING,
525 : : errcode(ERRCODE_UNDEFINED_OBJECT),
526 : : errmsg_plural("publication %s does not exist on the publisher",
527 : : "publications %s do not exist on the publisher",
528 : : list_length(publicationsCopy),
529 : : pubnames->data));
530 : : }
531 : 126 : }
532 : :
533 : : /*
534 : : * Auxiliary function to build a text array out of a list of String nodes.
535 : : */
536 : : static Datum
3152 peter_e@gmx.net 537 : 195 : publicationListToArray(List *publist)
538 : : {
539 : : ArrayType *arr;
540 : : Datum *datums;
541 : : MemoryContext memcxt;
542 : : MemoryContext oldcxt;
543 : :
544 : : /* Create memory context for temporary allocations. */
545 : 195 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
546 : : "publicationListToArray to array",
547 : : ALLOCSET_DEFAULT_SIZES);
548 : 195 : oldcxt = MemoryContextSwitchTo(memcxt);
549 : :
2905 tgl@sss.pgh.pa.us 550 : 195 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
551 : :
1614 peter@eisentraut.org 552 : 195 : check_duplicates_in_publist(publist, datums);
553 : :
3152 peter_e@gmx.net 554 : 192 : MemoryContextSwitchTo(oldcxt);
555 : :
1163 peter@eisentraut.org 556 : 192 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
557 : :
3152 peter_e@gmx.net 558 : 192 : MemoryContextDelete(memcxt);
559 : :
560 : 192 : return PointerGetDatum(arr);
561 : : }
562 : :
563 : : /*
564 : : * Create new subscription.
565 : : */
566 : : ObjectAddress
1514 dean.a.rasheed@gmail 567 : 239 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
568 : : bool isTopLevel)
569 : : {
570 : : Relation rel;
571 : : ObjectAddress myself;
572 : : Oid subid;
573 : : bool nulls[Natts_pg_subscription];
574 : : Datum values[Natts_pg_subscription];
3151 alvherre@alvh.no-ip. 575 : 239 : Oid owner = GetUserId();
576 : : HeapTuple tup;
577 : : char *conninfo;
578 : : char originname[NAMEDATALEN];
579 : : List *publications;
580 : : bits32 supported_opts;
1523 akapila@postgresql.o 581 : 239 : SubOpts opts = {0};
582 : : AclResult aclresult;
583 : :
584 : : /*
585 : : * Parse and check options.
586 : : *
587 : : * Connection and publication should not be specified here.
588 : : */
589 : 239 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
590 : : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
591 : : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592 : : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593 : : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594 : : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
595 : : SUBOPT_RETAIN_DEAD_TUPLES |
596 : : SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
1514 dean.a.rasheed@gmail 597 : 239 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
598 : :
599 : : /*
600 : : * Since creating a replication slot is not transactional, rolling back
601 : : * the transaction leaves the created replication slot. So we cannot run
602 : : * CREATE SUBSCRIPTION inside a transaction block if creating a
603 : : * replication slot.
604 : : */
1523 akapila@postgresql.o 605 [ + + ]: 194 : if (opts.create_slot)
2759 peter_e@gmx.net 606 : 124 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
607 : :
608 : : /*
609 : : * We don't want to allow unprivileged users to be able to trigger
610 : : * attempts to access arbitrary network destinations, so require the user
611 : : * to have been specifically authorized to create subscriptions.
612 : : */
891 rhaas@postgresql.org 613 [ + + ]: 191 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
3152 peter_e@gmx.net 614 [ + - ]: 3 : ereport(ERROR,
615 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
616 : : errmsg("permission denied to create subscription"),
617 : : errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
618 : : "pg_create_subscription")));
619 : :
620 : : /*
621 : : * Since a subscription is a database object, we also check for CREATE
622 : : * permission on the database.
623 : : */
891 rhaas@postgresql.org 624 : 188 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
625 : : owner, ACL_CREATE);
626 [ + + ]: 188 : if (aclresult != ACLCHECK_OK)
627 : 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
628 : 3 : get_database_name(MyDatabaseId));
629 : :
630 : : /*
631 : : * Non-superusers are required to set a password for authentication, and
632 : : * that password must be used by the target server, but the superuser can
633 : : * exempt a subscription from this requirement.
634 : : */
635 [ + + + + ]: 185 : if (!opts.passwordrequired && !superuser_arg(owner))
841 tgl@sss.pgh.pa.us 636 [ + - ]: 3 : ereport(ERROR,
637 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
638 : : errmsg("password_required=false is superuser-only"),
639 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
640 : :
641 : : /*
642 : : * If built with appropriate switch, whine when regression-testing
643 : : * conventions for subscription names are violated.
644 : : */
645 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
646 : : if (strncmp(stmt->subname, "regress_", 8) != 0)
647 : : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
648 : : #endif
649 : :
2420 andres@anarazel.de 650 : 182 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
651 : :
652 : : /* Check if name is used */
2482 653 : 182 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
654 : : ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
3152 peter_e@gmx.net 655 [ + + ]: 182 : if (OidIsValid(subid))
656 : : {
657 [ + - ]: 3 : ereport(ERROR,
658 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
659 : : errmsg("subscription \"%s\" already exists",
660 : : stmt->subname)));
661 : : }
662 : :
663 : : /*
664 : : * Ensure that system configuration paramters are set appropriately to
665 : : * support retain_dead_tuples and max_retention_duration.
666 : : */
4 akapila@postgresql.o 667 :GNC 179 : CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
668 : 179 : opts.retaindeadtuples, opts.retaindeadtuples,
669 : 179 : (opts.maxretention > 0));
670 : :
1523 akapila@postgresql.o 671 [ + + ]:CBC 179 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
672 [ + - ]: 158 : opts.slot_name == NULL)
673 : 158 : opts.slot_name = stmt->subname;
674 : :
675 : : /* The default for synchronous_commit of subscriptions is off. */
676 [ + - ]: 179 : if (opts.synchronous_commit == NULL)
677 : 179 : opts.synchronous_commit = "off";
678 : :
3152 peter_e@gmx.net 679 : 179 : conninfo = stmt->conninfo;
680 : 179 : publications = stmt->publication;
681 : :
682 : : /* Load the library providing us libpq calls. */
683 : 179 : load_file("libpqwalreceiver", false);
684 : :
685 : : /* Check the connection info string. */
891 rhaas@postgresql.org 686 [ + + + + ]: 179 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
687 : :
688 : : /* Everything ok, form a new tuple. */
3152 peter_e@gmx.net 689 : 170 : memset(values, 0, sizeof(values));
690 : 170 : memset(nulls, false, sizeof(nulls));
691 : :
2482 andres@anarazel.de 692 : 170 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
693 : : Anum_pg_subscription_oid);
694 : 170 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
3152 peter_e@gmx.net 695 : 170 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
1248 akapila@postgresql.o 696 : 170 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
3152 peter_e@gmx.net 697 : 170 : values[Anum_pg_subscription_subname - 1] =
698 : 170 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
3151 alvherre@alvh.no-ip. 699 : 170 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
1523 akapila@postgresql.o 700 : 170 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
701 : 170 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
971 702 : 170 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
1515 703 : 170 : values[Anum_pg_subscription_subtwophasestate - 1] =
704 [ + + ]: 170 : CharGetDatum(opts.twophase ?
705 : : LOGICALREP_TWOPHASE_STATE_PENDING :
706 : : LOGICALREP_TWOPHASE_STATE_DISABLED);
1272 707 : 170 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
891 rhaas@postgresql.org 708 : 170 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
886 709 : 170 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
585 akapila@postgresql.o 710 : 170 : values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
45 akapila@postgresql.o 711 :GNC 170 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
712 : 170 : BoolGetDatum(opts.retaindeadtuples);
4 713 : 170 : values[Anum_pg_subscription_submaxretention - 1] =
714 : 170 : Int32GetDatum(opts.maxretention);
715 : 170 : values[Anum_pg_subscription_subretentionactive - 1] =
716 : 170 : Int32GetDatum(opts.retaindeadtuples);
3152 peter_e@gmx.net 717 :CBC 170 : values[Anum_pg_subscription_subconninfo - 1] =
718 : 170 : CStringGetTextDatum(conninfo);
1523 akapila@postgresql.o 719 [ + + ]: 170 : if (opts.slot_name)
3042 peter_e@gmx.net 720 : 160 : values[Anum_pg_subscription_subslotname - 1] =
1523 akapila@postgresql.o 721 : 160 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
722 : : else
3042 peter_e@gmx.net 723 : 10 : nulls[Anum_pg_subscription_subslotname - 1] = true;
3067 724 : 170 : values[Anum_pg_subscription_subsynccommit - 1] =
1523 akapila@postgresql.o 725 : 170 : CStringGetTextDatum(opts.synchronous_commit);
3152 peter_e@gmx.net 726 : 167 : values[Anum_pg_subscription_subpublications - 1] =
3034 bruce@momjian.us 727 : 170 : publicationListToArray(publications);
1143 akapila@postgresql.o 728 : 167 : values[Anum_pg_subscription_suborigin - 1] =
729 : 167 : CStringGetTextDatum(opts.origin);
730 : :
3152 peter_e@gmx.net 731 : 167 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
732 : :
733 : : /* Insert tuple into catalog. */
2482 andres@anarazel.de 734 : 167 : CatalogTupleInsert(rel, tup);
3152 peter_e@gmx.net 735 : 167 : heap_freetuple(tup);
736 : :
3151 alvherre@alvh.no-ip. 737 : 167 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
738 : :
1061 akapila@postgresql.o 739 : 167 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
3152 peter_e@gmx.net 740 : 167 : replorigin_create(originname);
741 : :
742 : : /*
743 : : * Connect to remote side to execute requested commands and fetch table
744 : : * info.
745 : : */
1523 akapila@postgresql.o 746 [ + + ]: 167 : if (opts.connect)
747 : : {
748 : : char *err;
749 : : WalReceiverConn *wrconn;
750 : : List *tables;
751 : : ListCell *lc;
752 : : char table_state;
753 : : bool must_use_password;
754 : :
755 : : /* Try to connect to the publisher. */
891 rhaas@postgresql.org 756 [ - + - - ]: 120 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
579 akapila@postgresql.o 757 : 120 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
758 : : stmt->subname, &err);
3152 peter_e@gmx.net 759 [ + + ]: 120 : if (!wrconn)
760 [ + - ]: 3 : ereport(ERROR,
761 : : (errcode(ERRCODE_CONNECTION_FAILURE),
762 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
763 : : stmt->subname, err)));
764 : :
3146 765 [ + + ]: 117 : PG_TRY();
766 : : {
1255 akapila@postgresql.o 767 : 117 : check_publications(wrconn, publications);
1094 768 : 117 : check_publications_origin(wrconn, publications, opts.copy_data,
45 akapila@postgresql.o 769 :GNC 117 : opts.retaindeadtuples, opts.origin,
770 : : NULL, 0, stmt->subname);
771 : :
772 [ + + ]: 117 : if (opts.retaindeadtuples)
773 : 3 : check_pub_dead_tuple_retention(wrconn);
774 : :
775 : : /*
776 : : * Set sync state based on if we were asked to do data copy or
777 : : * not.
778 : : */
1248 tomas.vondra@postgre 779 [ + + ]:CBC 117 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
780 : :
781 : : /*
782 : : * Get the table list from publisher and build local table status
783 : : * info.
784 : : */
785 : 117 : tables = fetch_table_list(wrconn, publications);
786 [ + + + + : 294 : foreach(lc, tables)
+ + ]
787 : : {
3089 peter_e@gmx.net 788 : 178 : RangeVar *rv = (RangeVar *) lfirst(lc);
789 : : Oid relid;
790 : :
3088 791 : 178 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
792 : :
793 : : /* Check for supported relkind. */
3035 794 : 178 : CheckSubscriptionRelkind(get_rel_relkind(relid),
795 : 178 : rv->schemaname, rv->relname);
796 : :
1248 tomas.vondra@postgre 797 : 178 : AddSubscriptionRelState(subid, relid, table_state,
798 : : InvalidXLogRecPtr, true);
799 : : }
800 : :
801 : : /*
802 : : * If requested, create permanent slot for the subscription. We
803 : : * won't use the initial snapshot for anything, so no need to
804 : : * export it.
805 : : */
1523 akapila@postgresql.o 806 [ + + ]: 116 : if (opts.create_slot)
807 : : {
1515 808 : 111 : bool twophase_enabled = false;
809 : :
1523 810 [ - + ]: 111 : Assert(opts.slot_name);
811 : :
812 : : /*
813 : : * Even if two_phase is set, don't create the slot with
814 : : * two-phase enabled. Will enable it once all the tables are
815 : : * synced and ready. This avoids race-conditions like prepared
816 : : * transactions being skipped due to changes not being applied
817 : : * due to checks in should_apply_changes_for_rel() when
818 : : * tablesync for the corresponding tables are in progress. See
819 : : * comments atop worker.c.
820 : : *
821 : : * Note that if tables were specified but copy_data is false
822 : : * then it is safe to enable two_phase up-front because those
823 : : * tables are already initially in READY state. When the
824 : : * subscription has no tables, we leave the twophase state as
825 : : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
826 : : * PUBLICATION to work.
827 : : */
1248 tomas.vondra@postgre 828 [ + + + + : 111 : if (opts.twophase && !opts.copy_data && tables != NIL)
+ - ]
1515 akapila@postgresql.o 829 : 1 : twophase_enabled = true;
830 : :
831 : 111 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
832 : : opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
833 : :
834 [ + + ]: 111 : if (twophase_enabled)
835 : 1 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
836 : :
3060 peter_e@gmx.net 837 [ + - ]: 111 : ereport(NOTICE,
838 : : (errmsg("created replication slot \"%s\" on publisher",
839 : : opts.slot_name)));
840 : : }
841 : : }
2136 peter@eisentraut.org 842 : 1 : PG_FINALLY();
843 : : {
3146 peter_e@gmx.net 844 : 117 : walrcv_disconnect(wrconn);
845 : : }
846 [ + + ]: 117 : PG_END_TRY();
847 : : }
848 : : else
3089 849 [ + - ]: 47 : ereport(WARNING,
850 : : (errmsg("subscription was created, but is not connected"),
851 : : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
852 : :
2420 andres@anarazel.de 853 : 163 : table_close(rel, RowExclusiveLock);
854 : :
1249 855 : 163 : pgstat_create_subscription(subid);
856 : :
1523 akapila@postgresql.o 857 [ + + ]: 163 : if (opts.enabled)
3050 peter_e@gmx.net 858 : 109 : ApplyLauncherWakeupAtCommit();
859 : :
3152 860 : 163 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
861 : :
862 [ - + ]: 163 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
863 : :
864 : 163 : return myself;
865 : : }
866 : :
867 : : static void
1255 akapila@postgresql.o 868 : 33 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
869 : : List *validate_publications)
870 : : {
871 : : char *err;
872 : : List *pubrel_names;
873 : : List *subrel_states;
874 : : Oid *subrel_local_oids;
875 : : Oid *pubrel_local_oids;
876 : : ListCell *lc;
877 : : int off;
878 : : int remove_rel_len;
879 : : int subrel_count;
1667 880 : 33 : Relation rel = NULL;
881 : : typedef struct SubRemoveRels
882 : : {
883 : : Oid relid;
884 : : char state;
885 : : } SubRemoveRels;
886 : : SubRemoveRels *sub_remove_rels;
887 : : WalReceiverConn *wrconn;
888 : : bool must_use_password;
889 : :
890 : : /* Load the library providing us libpq calls. */
3089 peter_e@gmx.net 891 : 33 : load_file("libpqwalreceiver", false);
892 : :
893 : : /* Try to connect to the publisher. */
690 akapila@postgresql.o 894 [ + - + + ]: 33 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
579 895 : 33 : wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
896 : : sub->name, &err);
1583 alvherre@alvh.no-ip. 897 [ - + ]: 32 : if (!wrconn)
1583 alvherre@alvh.no-ip. 898 [ # # ]:UBC 0 : ereport(ERROR,
899 : : (errcode(ERRCODE_CONNECTION_FAILURE),
900 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
901 : : sub->name, err)));
902 : :
1667 akapila@postgresql.o 903 [ + - ]:CBC 32 : PG_TRY();
904 : : {
1255 905 [ + + ]: 32 : if (validate_publications)
906 : 9 : check_publications(wrconn, validate_publications);
907 : :
908 : : /* Get the table list from publisher. */
1667 909 : 32 : pubrel_names = fetch_table_list(wrconn, sub->publications);
910 : :
911 : : /* Get local table list. */
1137 michael@paquier.xyz 912 : 32 : subrel_states = GetSubscriptionRelations(sub->oid, false);
1094 akapila@postgresql.o 913 : 32 : subrel_count = list_length(subrel_states);
914 : :
915 : : /*
916 : : * Build qsorted array of local table oids for faster lookup. This can
917 : : * potentially contain all tables in the database so speed of lookup
918 : : * is important.
919 : : */
920 : 32 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
1667 921 : 32 : off = 0;
922 [ + + + + : 119 : foreach(lc, subrel_states)
+ + ]
923 : : {
924 : 87 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
925 : :
926 : 87 : subrel_local_oids[off++] = relstate->relid;
927 : : }
1094 928 : 32 : qsort(subrel_local_oids, subrel_count,
929 : : sizeof(Oid), oid_cmp);
930 : :
931 : 32 : check_publications_origin(wrconn, sub->publications, copy_data,
45 akapila@postgresql.o 932 :GNC 32 : sub->retaindeadtuples, sub->origin,
933 : : subrel_local_oids, subrel_count, sub->name);
934 : :
935 : : /*
936 : : * Rels that we want to remove from subscription and drop any slots
937 : : * and origins corresponding to them.
938 : : */
1094 akapila@postgresql.o 939 :CBC 32 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
940 : :
941 : : /*
942 : : * Walk over the remote tables and try to match them to locally known
943 : : * tables. If the table is not known locally create a new state for
944 : : * it.
945 : : *
946 : : * Also builds array of local oids of remote tables for the next step.
947 : : */
1667 948 : 32 : off = 0;
949 : 32 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
950 : :
951 [ + + + + : 121 : foreach(lc, pubrel_names)
+ + ]
952 : : {
953 : 89 : RangeVar *rv = (RangeVar *) lfirst(lc);
954 : : Oid relid;
955 : :
956 : 89 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
957 : :
958 : : /* Check for supported relkind. */
959 : 89 : CheckSubscriptionRelkind(get_rel_relkind(relid),
960 : 89 : rv->schemaname, rv->relname);
961 : :
962 : 89 : pubrel_local_oids[off++] = relid;
963 : :
964 [ + + ]: 89 : if (!bsearch(&relid, subrel_local_oids,
965 : : subrel_count, sizeof(Oid), oid_cmp))
966 : : {
967 [ + + ]: 22 : AddSubscriptionRelState(sub->oid, relid,
968 : : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
969 : : InvalidXLogRecPtr, true);
970 [ - + ]: 22 : ereport(DEBUG1,
971 : : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
972 : : rv->schemaname, rv->relname, sub->name)));
973 : : }
974 : : }
975 : :
976 : : /*
977 : : * Next remove state for tables we should not care about anymore using
978 : : * the data we collected above
979 : : */
980 : 32 : qsort(pubrel_local_oids, list_length(pubrel_names),
981 : : sizeof(Oid), oid_cmp);
982 : :
983 : 32 : remove_rel_len = 0;
1094 984 [ + + ]: 119 : for (off = 0; off < subrel_count; off++)
985 : : {
1667 986 : 87 : Oid relid = subrel_local_oids[off];
987 : :
988 [ + + ]: 87 : if (!bsearch(&relid, pubrel_local_oids,
989 : 87 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
990 : : {
991 : : char state;
992 : : XLogRecPtr statelsn;
993 : :
994 : : /*
995 : : * Lock pg_subscription_rel with AccessExclusiveLock to
996 : : * prevent any race conditions with the apply worker
997 : : * re-launching workers at the same time this code is trying
998 : : * to remove those tables.
999 : : *
1000 : : * Even if new worker for this particular rel is restarted it
1001 : : * won't be able to make any progress as we hold exclusive
1002 : : * lock on pg_subscription_rel till the transaction end. It
1003 : : * will simply exit as there is no corresponding rel entry.
1004 : : *
1005 : : * This locking also ensures that the state of rels won't
1006 : : * change till we are done with this refresh operation.
1007 : : */
1008 [ + + ]: 20 : if (!rel)
1009 : 8 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1010 : :
1011 : : /* Last known rel state. */
1012 : 20 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1013 : :
1014 : 20 : sub_remove_rels[remove_rel_len].relid = relid;
1015 : 20 : sub_remove_rels[remove_rel_len++].state = state;
1016 : :
1017 : 20 : RemoveSubscriptionRel(sub->oid, relid);
1018 : :
1019 : 20 : logicalrep_worker_stop(sub->oid, relid);
1020 : :
1021 : : /*
1022 : : * For READY state, we would have already dropped the
1023 : : * tablesync origin.
1024 : : */
1090 1025 [ - + ]: 20 : if (state != SUBREL_STATE_READY)
1026 : : {
1027 : : char originname[NAMEDATALEN];
1028 : :
1029 : : /*
1030 : : * Drop the tablesync's origin tracking if exists.
1031 : : *
1032 : : * It is possible that the origin is not yet created for
1033 : : * tablesync worker, this can happen for the states before
1034 : : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1035 : : * apply worker can also concurrently try to drop the
1036 : : * origin and by this time the origin might be already
1037 : : * removed. For these reasons, passing missing_ok = true.
1038 : : */
1061 akapila@postgresql.o 1039 :UBC 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1040 : : sizeof(originname));
1667 1041 : 0 : replorigin_drop_by_name(originname, true, false);
1042 : : }
1043 : :
1667 akapila@postgresql.o 1044 [ - + ]:CBC 20 : ereport(DEBUG1,
1045 : : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1046 : : get_namespace_name(get_rel_namespace(relid)),
1047 : : get_rel_name(relid),
1048 : : sub->name)));
1049 : : }
1050 : : }
1051 : :
1052 : : /*
1053 : : * Drop the tablesync slots associated with removed tables. This has
1054 : : * to be at the end because otherwise if there is an error while doing
1055 : : * the database operations we won't be able to rollback dropped slots.
1056 : : */
1057 [ + + ]: 52 : for (off = 0; off < remove_rel_len; off++)
1058 : : {
1059 [ - + ]: 20 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1667 akapila@postgresql.o 1060 [ # # ]:UBC 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1061 : : {
1062 : 0 : char syncslotname[NAMEDATALEN] = {0};
1063 : :
1064 : : /*
1065 : : * For READY/SYNCDONE states we know the tablesync slot has
1066 : : * already been dropped by the tablesync worker.
1067 : : *
1068 : : * For other states, there is no certainty, maybe the slot
1069 : : * does not exist yet. Also, if we fail after removing some of
1070 : : * the slots, next time, it will again try to drop already
1071 : : * dropped slots and fail. For these reasons, we allow
1072 : : * missing_ok = true for the drop.
1073 : : */
1664 1074 : 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1075 : : syncslotname, sizeof(syncslotname));
1667 1076 : 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1077 : : }
1078 : : }
1079 : : }
1080 : 0 : PG_FINALLY();
1081 : : {
1583 alvherre@alvh.no-ip. 1082 :CBC 32 : walrcv_disconnect(wrconn);
1083 : : }
1667 akapila@postgresql.o 1084 [ - + ]: 32 : PG_END_TRY();
1085 : :
1086 [ + + ]: 32 : if (rel)
1087 : 8 : table_close(rel, NoLock);
3089 peter_e@gmx.net 1088 : 32 : }
1089 : :
1090 : : /*
1091 : : * Common checks for altering failover, two_phase, and retain_dead_tuples
1092 : : * options.
1093 : : */
1094 : : static void
409 akapila@postgresql.o 1095 : 13 : CheckAlterSubOption(Subscription *sub, const char *option,
1096 : : bool slot_needs_update, bool isTopLevel)
1097 : : {
45 akapila@postgresql.o 1098 [ + + + + :GNC 13 : Assert(strcmp(option, "failover") == 0 ||
- + ]
1099 : : strcmp(option, "two_phase") == 0 ||
1100 : : strcmp(option, "retain_dead_tuples") == 0);
1101 : :
1102 : : /*
1103 : : * Altering the retain_dead_tuples option does not update the slot on the
1104 : : * publisher.
1105 : : */
1106 [ + + - + ]: 13 : Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1107 : :
1108 : : /*
1109 : : * Do not allow changing the option if the subscription is enabled. This
1110 : : * is because both failover and two_phase options of the slot on the
1111 : : * publisher cannot be modified if the slot is currently acquired by the
1112 : : * existing walsender.
1113 : : *
1114 : : * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1115 : : * the publisher by the existing walsender, so we could have allowed that
1116 : : * even when the subscription is enabled. But we kept this restriction for
1117 : : * the sake of consistency and simplicity.
1118 : : *
1119 : : * Additionally, do not allow changing the retain_dead_tuples option when
1120 : : * the subscription is enabled to prevent race conditions arising from the
1121 : : * new option value being acknowledged asynchronously by the launcher and
1122 : : * apply workers.
1123 : : *
1124 : : * Without the restriction, a race condition may arise when a user
1125 : : * disables and immediately re-enables the retain_dead_tuples option. In
1126 : : * this case, the launcher might drop the slot upon noticing the disabled
1127 : : * action, while the apply worker may keep maintaining
1128 : : * oldest_nonremovable_xid without noticing the option change. During this
1129 : : * period, a transaction ID wraparound could falsely make this ID appear
1130 : : * as if it originates from the future w.r.t the transaction ID stored in
1131 : : * the slot maintained by launcher.
1132 : : *
1133 : : * Similarly, if the user enables retain_dead_tuples concurrently with the
1134 : : * launcher starting the worker, the apply worker may start calculating
1135 : : * oldest_nonremovable_xid before the launcher notices the enable action.
1136 : : * Consequently, the launcher may update slot.xmin to a newer value than
1137 : : * that maintained by the worker. In subsequent cycles, upon integrating
1138 : : * the worker's oldest_nonremovable_xid, the launcher might detect a
1139 : : * retreat in the calculated xmin, necessitating additional handling.
1140 : : *
1141 : : * XXX To address the above race conditions, we can define
1142 : : * oldest_nonremovable_xid as FullTransactionID and adds the check to
1143 : : * disallow retreating the conflict slot's xmin. For now, we kept the
1144 : : * implementation simple by disallowing change to the retain_dead_tuples,
1145 : : * but in the future we can change this after some more analysis.
1146 : : *
1147 : : * Note that we could restrict only the enabling of retain_dead_tuples to
1148 : : * avoid the race conditions described above, but we maintain the
1149 : : * restriction for both enable and disable operations for the sake of
1150 : : * consistency.
1151 : : */
409 akapila@postgresql.o 1152 [ + + ]:CBC 13 : if (sub->enabled)
1153 [ + - ]: 2 : ereport(ERROR,
1154 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1155 : : errmsg("cannot set option \"%s\" for enabled subscription",
1156 : : option)));
1157 : :
1158 [ + + ]: 11 : if (slot_needs_update)
1159 : : {
1160 : : StringInfoData cmd;
1161 : :
1162 : : /*
1163 : : * A valid slot must be associated with the subscription for us to
1164 : : * modify any of the slot's properties.
1165 : : */
1166 [ - + ]: 8 : if (!sub->slotname)
409 akapila@postgresql.o 1167 [ # # ]:UBC 0 : ereport(ERROR,
1168 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1169 : : errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1170 : : option)));
1171 : :
1172 : : /* The changed option of the slot can't be rolled back. */
409 akapila@postgresql.o 1173 :CBC 8 : initStringInfo(&cmd);
1174 : 8 : appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1175 : :
1176 : 8 : PreventInTransactionBlock(isTopLevel, cmd.data);
1177 : 5 : pfree(cmd.data);
1178 : : }
1179 : 8 : }
1180 : :
1181 : : /*
1182 : : * Alter the existing subscription.
1183 : : */
1184 : : ObjectAddress
1514 dean.a.rasheed@gmail 1185 : 255 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1186 : : bool isTopLevel)
1187 : : {
1188 : : Relation rel;
1189 : : ObjectAddress myself;
1190 : : bool nulls[Natts_pg_subscription];
1191 : : bool replaces[Natts_pg_subscription];
1192 : : Datum values[Natts_pg_subscription];
1193 : : HeapTuple tup;
1194 : : Oid subid;
3089 peter_e@gmx.net 1195 : 255 : bool update_tuple = false;
409 akapila@postgresql.o 1196 : 255 : bool update_failover = false;
1197 : 255 : bool update_two_phase = false;
45 akapila@postgresql.o 1198 :GNC 255 : bool check_pub_rdt = false;
1199 : : bool retain_dead_tuples;
1200 : : int max_retention;
1201 : : bool retention_active;
1202 : : char *origin;
1203 : : Subscription *sub;
1204 : : Form_pg_subscription form;
1205 : : bits32 supported_opts;
1523 akapila@postgresql.o 1206 :CBC 255 : SubOpts opts = {0};
1207 : :
2420 andres@anarazel.de 1208 : 255 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1209 : :
1210 : : /* Fetch the existing tuple. */
29 peter@eisentraut.org 1211 :GNC 255 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1212 : : CStringGetDatum(stmt->subname));
1213 : :
3152 peter_e@gmx.net 1214 [ + + ]:CBC 255 : if (!HeapTupleIsValid(tup))
1215 [ + - ]: 3 : ereport(ERROR,
1216 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1217 : : errmsg("subscription \"%s\" does not exist",
1218 : : stmt->subname)));
1219 : :
2482 andres@anarazel.de 1220 : 252 : form = (Form_pg_subscription) GETSTRUCT(tup);
1221 : 252 : subid = form->oid;
1222 : :
1223 : : /* must be owner */
1028 peter@eisentraut.org 1224 [ - + ]: 252 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2835 peter_e@gmx.net 1225 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3152 1226 : 0 : stmt->subname);
1227 : :
3042 peter_e@gmx.net 1228 :CBC 252 : sub = GetSubscription(subid, false);
1229 : :
45 akapila@postgresql.o 1230 :GNC 252 : retain_dead_tuples = sub->retaindeadtuples;
1231 : 252 : origin = sub->origin;
4 1232 : 252 : max_retention = sub->maxretention;
1233 : 252 : retention_active = sub->retentionactive;
1234 : :
1235 : : /*
1236 : : * Don't allow non-superuser modification of a subscription with
1237 : : * password_required=false.
1238 : : */
891 rhaas@postgresql.org 1239 [ + + - + ]:CBC 252 : if (!sub->passwordrequired && !superuser())
891 rhaas@postgresql.org 1240 [ # # ]:UBC 0 : ereport(ERROR,
1241 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1242 : : errmsg("password_required=false is superuser-only"),
1243 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1244 : :
1245 : : /* Lock the subscription so nobody else can do anything with it. */
2987 peter_e@gmx.net 1246 :CBC 252 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1247 : :
1248 : : /* Form a new tuple. */
3152 1249 : 252 : memset(values, 0, sizeof(values));
1250 : 252 : memset(nulls, false, sizeof(nulls));
1251 : 252 : memset(replaces, false, sizeof(replaces));
1252 : :
3089 1253 [ + + + + : 252 : switch (stmt->kind)
+ + + - ]
1254 : : {
1255 : 108 : case ALTER_SUBSCRIPTION_OPTIONS:
1256 : : {
1523 akapila@postgresql.o 1257 : 108 : supported_opts = (SUBOPT_SLOT_NAME |
1258 : : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1259 : : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
1260 : : SUBOPT_DISABLE_ON_ERR |
1261 : : SUBOPT_PASSWORD_REQUIRED |
1262 : : SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1263 : : SUBOPT_RETAIN_DEAD_TUPLES |
1264 : : SUBOPT_MAX_RETENTION_DURATION |
1265 : : SUBOPT_ORIGIN);
1266 : :
1514 dean.a.rasheed@gmail 1267 : 108 : parse_subscription_options(pstate, stmt->options,
1268 : : supported_opts, &opts);
1269 : :
1523 akapila@postgresql.o 1270 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1271 : : {
1272 : : /*
1273 : : * The subscription must be disabled to allow slot_name as
1274 : : * 'none', otherwise, the apply worker will repeatedly try
1275 : : * to stream the data using that slot_name which neither
1276 : : * exists on the publisher nor the user will be allowed to
1277 : : * create it.
1278 : : */
1279 [ - + - - ]: 36 : if (sub->enabled && !opts.slot_name)
3042 peter_e@gmx.net 1280 [ # # ]:UBC 0 : ereport(ERROR,
1281 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1282 : : errmsg("cannot set %s for enabled subscription",
1283 : : "slot_name = NONE")));
1284 : :
1523 akapila@postgresql.o 1285 [ + + ]:CBC 36 : if (opts.slot_name)
3042 peter_e@gmx.net 1286 : 3 : values[Anum_pg_subscription_subslotname - 1] =
1523 akapila@postgresql.o 1287 : 3 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1288 : : else
3042 peter_e@gmx.net 1289 : 33 : nulls[Anum_pg_subscription_subslotname - 1] = true;
3067 1290 : 36 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1291 : : }
1292 : :
1523 akapila@postgresql.o 1293 [ + + ]: 99 : if (opts.synchronous_commit)
1294 : : {
3067 peter_e@gmx.net 1295 : 3 : values[Anum_pg_subscription_subsynccommit - 1] =
1523 akapila@postgresql.o 1296 : 3 : CStringGetTextDatum(opts.synchronous_commit);
3067 peter_e@gmx.net 1297 : 3 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1298 : : }
1299 : :
1523 akapila@postgresql.o 1300 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1301 : : {
1876 tgl@sss.pgh.pa.us 1302 : 9 : values[Anum_pg_subscription_subbinary - 1] =
1523 akapila@postgresql.o 1303 : 9 : BoolGetDatum(opts.binary);
1876 tgl@sss.pgh.pa.us 1304 : 9 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1305 : : }
1306 : :
1523 akapila@postgresql.o 1307 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1308 : : {
1829 1309 : 15 : values[Anum_pg_subscription_substream - 1] =
971 1310 : 15 : CharGetDatum(opts.streaming);
1829 1311 : 15 : replaces[Anum_pg_subscription_substream - 1] = true;
1312 : : }
1313 : :
1272 1314 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1315 : : {
1316 : : values[Anum_pg_subscription_subdisableonerr - 1]
1317 : 3 : = BoolGetDatum(opts.disableonerr);
1318 : : replaces[Anum_pg_subscription_subdisableonerr - 1]
1319 : 3 : = true;
1320 : : }
1321 : :
891 rhaas@postgresql.org 1322 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1323 : : {
1324 : : /* Non-superuser may not disable password_required. */
1325 [ + + - + ]: 6 : if (!opts.passwordrequired && !superuser())
891 rhaas@postgresql.org 1326 [ # # ]:UBC 0 : ereport(ERROR,
1327 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1328 : : errmsg("password_required=false is superuser-only"),
1329 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1330 : :
1331 : : values[Anum_pg_subscription_subpasswordrequired - 1]
891 rhaas@postgresql.org 1332 :CBC 6 : = BoolGetDatum(opts.passwordrequired);
1333 : : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1334 : 6 : = true;
1335 : : }
1336 : :
724 akapila@postgresql.o 1337 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1338 : : {
1339 : 7 : values[Anum_pg_subscription_subrunasowner - 1] =
1340 : 7 : BoolGetDatum(opts.runasowner);
1341 : 7 : replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1342 : : }
1343 : :
409 1344 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1345 : : {
1346 : : /*
1347 : : * We need to update both the slot and the subscription
1348 : : * for the two_phase option. We can enable the two_phase
1349 : : * option for a slot only once the initial data
1350 : : * synchronization is done. This is to avoid missing some
1351 : : * data as explained in comments atop worker.c.
1352 : : */
1353 : 3 : update_two_phase = !opts.twophase;
1354 : :
1355 : 3 : CheckAlterSubOption(sub, "two_phase", update_two_phase,
1356 : : isTopLevel);
1357 : :
1358 : : /*
1359 : : * Modifying the two_phase slot option requires a slot
1360 : : * lookup by slot name, so changing the slot name at the
1361 : : * same time is not allowed.
1362 : : */
1363 [ + + ]: 3 : if (update_two_phase &&
1364 [ - + ]: 1 : IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
409 akapila@postgresql.o 1365 [ # # ]:UBC 0 : ereport(ERROR,
1366 : : (errcode(ERRCODE_SYNTAX_ERROR),
1367 : : errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1368 : :
1369 : : /*
1370 : : * Note that workers may still survive even if the
1371 : : * subscription has been disabled.
1372 : : *
1373 : : * Ensure workers have already been exited to avoid
1374 : : * getting prepared transactions while we are disabling
1375 : : * the two_phase option. Otherwise, the changes of an
1376 : : * already prepared transaction can be replicated again
1377 : : * along with its corresponding commit, leading to
1378 : : * duplicate data or errors.
1379 : : */
409 akapila@postgresql.o 1380 [ - + ]:CBC 3 : if (logicalrep_workers_find(subid, true, true))
585 akapila@postgresql.o 1381 [ # # ]:UBC 0 : ereport(ERROR,
1382 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1383 : : errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1384 : : errhint("Try again after some time.")));
1385 : :
1386 : : /*
1387 : : * two_phase cannot be disabled if there are any
1388 : : * uncommitted prepared transactions present otherwise it
1389 : : * can lead to duplicate data or errors as explained in
1390 : : * the comment above.
1391 : : */
409 akapila@postgresql.o 1392 [ + + ]:CBC 3 : if (update_two_phase &&
1393 [ + - ]: 1 : sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1394 [ - + ]: 1 : LookupGXactBySubid(subid))
585 akapila@postgresql.o 1395 [ # # ]:UBC 0 : ereport(ERROR,
1396 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1397 : : errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1398 : : errhint("Resolve these transactions and try again.")));
1399 : :
1400 : : /* Change system catalog accordingly */
409 akapila@postgresql.o 1401 :CBC 3 : values[Anum_pg_subscription_subtwophasestate - 1] =
1402 [ + + ]: 3 : CharGetDatum(opts.twophase ?
1403 : : LOGICALREP_TWOPHASE_STATE_PENDING :
1404 : : LOGICALREP_TWOPHASE_STATE_DISABLED);
1405 : 3 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1406 : : }
1407 : :
1408 [ + + ]: 99 : if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1409 : : {
1410 : : /*
1411 : : * Similar to the two_phase case above, we need to update
1412 : : * the failover option for both the slot and the
1413 : : * subscription.
1414 : : */
1415 : 8 : update_failover = true;
1416 : :
1417 : 8 : CheckAlterSubOption(sub, "failover", update_failover,
1418 : : isTopLevel);
1419 : :
585 1420 : 4 : values[Anum_pg_subscription_subfailover - 1] =
1421 : 4 : BoolGetDatum(opts.failover);
1422 : 4 : replaces[Anum_pg_subscription_subfailover - 1] = true;
1423 : : }
1424 : :
45 akapila@postgresql.o 1425 [ + + ]:GNC 95 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1426 : : {
1427 : 2 : values[Anum_pg_subscription_subretaindeadtuples - 1] =
1428 : 2 : BoolGetDatum(opts.retaindeadtuples);
1429 : 2 : replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1430 : :
1431 : : /*
1432 : : * Update the retention status only if there's a change in
1433 : : * the retain_dead_tuples option value.
1434 : : *
1435 : : * Automatically marking retention as active when
1436 : : * retain_dead_tuples is enabled may not always be ideal,
1437 : : * especially if retention was previously stopped and the
1438 : : * user toggles retain_dead_tuples without adjusting the
1439 : : * publisher workload. However, this behavior provides a
1440 : : * convenient way for users to manually refresh the
1441 : : * retention status. Since retention will be stopped again
1442 : : * unless the publisher workload is reduced, this approach
1443 : : * is acceptable for now.
1444 : : */
4 1445 [ + - ]: 2 : if (opts.retaindeadtuples != sub->retaindeadtuples)
1446 : : {
1447 : 2 : values[Anum_pg_subscription_subretentionactive - 1] =
1448 : 2 : BoolGetDatum(opts.retaindeadtuples);
1449 : 2 : replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1450 : :
1451 : 2 : retention_active = opts.retaindeadtuples;
1452 : : }
1453 : :
45 1454 : 2 : CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1455 : :
1456 : : /*
1457 : : * Workers may continue running even after the
1458 : : * subscription has been disabled.
1459 : : *
1460 : : * To prevent race conditions (as described in
1461 : : * CheckAlterSubOption()), ensure that all worker
1462 : : * processes have already exited before proceeding.
1463 : : */
1464 [ - + ]: 1 : if (logicalrep_workers_find(subid, true, true))
45 akapila@postgresql.o 1465 [ # # ]:UNC 0 : ereport(ERROR,
1466 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1467 : : errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1468 : : errhint("Try again after some time.")));
1469 : :
1470 : : /*
1471 : : * Notify the launcher to manage the replication slot for
1472 : : * conflict detection. This ensures that replication slot
1473 : : * is efficiently handled (created, updated, or dropped)
1474 : : * in response to any configuration changes.
1475 : : */
45 akapila@postgresql.o 1476 :GNC 1 : ApplyLauncherWakeupAtCommit();
1477 : :
1478 : 1 : check_pub_rdt = opts.retaindeadtuples;
1479 : 1 : retain_dead_tuples = opts.retaindeadtuples;
1480 : : }
1481 : :
4 1482 [ + + ]: 94 : if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1483 : : {
1484 : 4 : values[Anum_pg_subscription_submaxretention - 1] =
1485 : 4 : Int32GetDatum(opts.maxretention);
1486 : 4 : replaces[Anum_pg_subscription_submaxretention - 1] = true;
1487 : :
1488 : 4 : max_retention = opts.maxretention;
1489 : : }
1490 : :
1491 : : /*
1492 : : * Ensure that system configuration paramters are set
1493 : : * appropriately to support retain_dead_tuples and
1494 : : * max_retention_duration.
1495 : : */
1496 [ + + ]: 94 : if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1497 [ + + ]: 93 : IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1498 : 5 : CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
1499 : : retain_dead_tuples,
1500 : : retention_active,
1501 : 5 : (max_retention > 0));
1502 : :
1143 akapila@postgresql.o 1503 [ + + ]:CBC 94 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1504 : : {
1505 : 5 : values[Anum_pg_subscription_suborigin - 1] =
1506 : 5 : CStringGetTextDatum(opts.origin);
1507 : 5 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1508 : :
1509 : : /*
1510 : : * Check if changes from different origins may be received
1511 : : * from the publisher when the origin is changed to ANY
1512 : : * and retain_dead_tuples is enabled.
1513 : : */
45 akapila@postgresql.o 1514 [ + + ]:GNC 7 : check_pub_rdt = retain_dead_tuples &&
1515 [ + + ]: 2 : pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1516 : :
1517 : 5 : origin = opts.origin;
1518 : : }
1519 : :
3089 peter_e@gmx.net 1520 :CBC 94 : update_tuple = true;
1521 : 94 : break;
1522 : : }
1523 : :
1524 : 50 : case ALTER_SUBSCRIPTION_ENABLED:
1525 : : {
1514 dean.a.rasheed@gmail 1526 : 50 : parse_subscription_options(pstate, stmt->options,
1527 : : SUBOPT_ENABLED, &opts);
1523 akapila@postgresql.o 1528 [ - + ]: 50 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1529 : :
1530 [ + + + - ]: 50 : if (!sub->slotname && opts.enabled)
3042 peter_e@gmx.net 1531 [ + - ]: 3 : ereport(ERROR,
1532 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1533 : : errmsg("cannot enable subscription that does not have a slot name")));
1534 : :
1535 : : /*
1536 : : * Check track_commit_timestamp only when enabling the
1537 : : * subscription in case it was disabled after creation. See
1538 : : * comments atop CheckSubDeadTupleRetention() for details.
1539 : : */
4 akapila@postgresql.o 1540 :GNC 47 : CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1541 : 47 : WARNING, sub->retaindeadtuples,
1542 : 47 : sub->retentionactive, false);
1543 : :
3089 peter_e@gmx.net 1544 :CBC 47 : values[Anum_pg_subscription_subenabled - 1] =
1523 akapila@postgresql.o 1545 : 47 : BoolGetDatum(opts.enabled);
3089 peter_e@gmx.net 1546 : 47 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1547 : :
1523 akapila@postgresql.o 1548 [ + + ]: 47 : if (opts.enabled)
3056 peter_e@gmx.net 1549 : 26 : ApplyLauncherWakeupAtCommit();
1550 : :
3089 1551 : 47 : update_tuple = true;
1552 : :
1553 : : /*
1554 : : * The subscription might be initially created with
1555 : : * connect=false and retain_dead_tuples=true, meaning the
1556 : : * remote server's status may not be checked. Ensure this
1557 : : * check is conducted now.
1558 : : */
45 akapila@postgresql.o 1559 [ + + + + ]:GNC 47 : check_pub_rdt = sub->retaindeadtuples && opts.enabled;
3089 peter_e@gmx.net 1560 :CBC 47 : break;
1561 : : }
1562 : :
1563 : 10 : case ALTER_SUBSCRIPTION_CONNECTION:
1564 : : /* Load the library providing us libpq calls. */
3043 1565 : 10 : load_file("libpqwalreceiver", false);
1566 : : /* Check the connection info string. */
891 rhaas@postgresql.org 1567 [ + - + + ]: 10 : walrcv_check_conninfo(stmt->conninfo,
1568 : : sub->passwordrequired && !sub->ownersuperuser);
1569 : :
3089 peter_e@gmx.net 1570 : 7 : values[Anum_pg_subscription_subconninfo - 1] =
1571 : 7 : CStringGetTextDatum(stmt->conninfo);
1572 : 7 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1573 : 7 : update_tuple = true;
1574 : :
1575 : : /*
1576 : : * Since the remote server configuration might have changed,
1577 : : * perform a check to ensure it permits enabling
1578 : : * retain_dead_tuples.
1579 : : */
45 akapila@postgresql.o 1580 :GNC 7 : check_pub_rdt = sub->retaindeadtuples;
3089 peter_e@gmx.net 1581 :CBC 7 : break;
1582 : :
1614 peter@eisentraut.org 1583 : 16 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1584 : : {
1523 akapila@postgresql.o 1585 : 16 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1514 dean.a.rasheed@gmail 1586 : 16 : parse_subscription_options(pstate, stmt->options,
1587 : : supported_opts, &opts);
1588 : :
3089 peter_e@gmx.net 1589 : 16 : values[Anum_pg_subscription_subpublications - 1] =
3034 bruce@momjian.us 1590 : 16 : publicationListToArray(stmt->publication);
3089 peter_e@gmx.net 1591 : 16 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1592 : :
1593 : 16 : update_tuple = true;
1594 : :
1595 : : /* Refresh if user asked us to. */
1523 akapila@postgresql.o 1596 [ + + ]: 16 : if (opts.refresh)
1597 : : {
3042 peter_e@gmx.net 1598 [ - + ]: 13 : if (!sub->enabled)
3042 peter_e@gmx.net 1599 [ # # ]:UBC 0 : ereport(ERROR,
1600 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1601 : : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1602 : : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1603 : :
1604 : : /*
1605 : : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1606 : : * not allowed.
1607 : : */
1515 akapila@postgresql.o 1608 [ - + - - ]:CBC 13 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1515 akapila@postgresql.o 1609 [ # # ]:UBC 0 : ereport(ERROR,
1610 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1611 : : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1612 : : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1613 : :
1667 akapila@postgresql.o 1614 :CBC 13 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1615 : :
1616 : : /* Make sure refresh sees the new list of publications. */
3089 peter_e@gmx.net 1617 : 7 : sub->publications = stmt->publication;
1618 : :
1255 akapila@postgresql.o 1619 : 7 : AlterSubscription_refresh(sub, opts.copy_data,
1620 : : stmt->publication);
1621 : : }
1622 : :
3089 peter_e@gmx.net 1623 : 10 : break;
1624 : : }
1625 : :
1614 peter@eisentraut.org 1626 : 27 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1627 : : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1628 : : {
1629 : : List *publist;
1523 akapila@postgresql.o 1630 : 27 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1631 : :
1474 1632 : 27 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1514 dean.a.rasheed@gmail 1633 : 27 : parse_subscription_options(pstate, stmt->options,
1634 : : supported_opts, &opts);
1635 : :
1523 akapila@postgresql.o 1636 : 27 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1614 peter@eisentraut.org 1637 : 9 : values[Anum_pg_subscription_subpublications - 1] =
1638 : 9 : publicationListToArray(publist);
1639 : 9 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1640 : :
1641 : 9 : update_tuple = true;
1642 : :
1643 : : /* Refresh if user asked us to. */
1523 akapila@postgresql.o 1644 [ + + ]: 9 : if (opts.refresh)
1645 : : {
1646 : : /* We only need to validate user specified publications. */
1255 1647 [ + + ]: 3 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1648 : :
1614 peter@eisentraut.org 1649 [ - + ]: 3 : if (!sub->enabled)
1614 peter@eisentraut.org 1650 [ # # # # ]:UBC 0 : ereport(ERROR,
1651 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1652 : : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1653 : : /* translator: %s is an SQL ALTER command */
1654 : : errhint("Use %s instead.",
1655 : : isadd ?
1656 : : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1657 : : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1658 : :
1659 : : /*
1660 : : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1661 : : * not allowed.
1662 : : */
1515 akapila@postgresql.o 1663 [ - + - - ]:CBC 3 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1515 akapila@postgresql.o 1664 [ # # # # ]:UBC 0 : ereport(ERROR,
1665 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1666 : : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1667 : : /* translator: %s is an SQL ALTER command */
1668 : : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1669 : : isadd ?
1670 : : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1671 : : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1672 : :
1614 peter@eisentraut.org 1673 :CBC 3 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1674 : :
1675 : : /* Refresh the new list of publications. */
1474 akapila@postgresql.o 1676 : 3 : sub->publications = publist;
1677 : :
1255 1678 : 3 : AlterSubscription_refresh(sub, opts.copy_data,
1679 : : validate_publications);
1680 : : }
1681 : :
1614 peter@eisentraut.org 1682 : 9 : break;
1683 : : }
1684 : :
3089 peter_e@gmx.net 1685 : 29 : case ALTER_SUBSCRIPTION_REFRESH:
1686 : : {
3042 1687 [ + + ]: 29 : if (!sub->enabled)
1688 [ + - ]: 3 : ereport(ERROR,
1689 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1690 : : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1691 : :
1514 dean.a.rasheed@gmail 1692 : 26 : parse_subscription_options(pstate, stmt->options,
1693 : : SUBOPT_COPY_DATA, &opts);
1694 : :
1695 : : /*
1696 : : * The subscription option "two_phase" requires that
1697 : : * replication has passed the initial table synchronization
1698 : : * phase before the two_phase becomes properly enabled.
1699 : : *
1700 : : * But, having reached this two-phase commit "enabled" state
1701 : : * we must not allow any subsequent table initialization to
1702 : : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1703 : : * when the user had requested two_phase = on mode.
1704 : : *
1705 : : * The exception to this restriction is when copy_data =
1706 : : * false, because when copy_data is false the tablesync will
1707 : : * start already in READY state and will exit directly without
1708 : : * doing anything.
1709 : : *
1710 : : * For more details see comments atop worker.c.
1711 : : */
1515 akapila@postgresql.o 1712 [ - + - - ]: 26 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1515 akapila@postgresql.o 1713 [ # # ]:UBC 0 : ereport(ERROR,
1714 : : (errcode(ERRCODE_SYNTAX_ERROR),
1715 : : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1716 : : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1717 : :
1667 akapila@postgresql.o 1718 :CBC 26 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1719 : :
1255 1720 : 23 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1721 : :
3089 peter_e@gmx.net 1722 : 22 : break;
1723 : : }
1724 : :
1264 akapila@postgresql.o 1725 : 12 : case ALTER_SUBSCRIPTION_SKIP:
1726 : : {
1727 : 12 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1728 : :
1729 : : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1730 [ - + ]: 9 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1731 : :
1732 : : /*
1733 : : * If the user sets subskiplsn, we do a sanity check to make
1734 : : * sure that the specified LSN is a probable value.
1735 : : */
1736 [ + + ]: 9 : if (!XLogRecPtrIsInvalid(opts.lsn))
1737 : : {
1738 : : RepOriginId originid;
1739 : : char originname[NAMEDATALEN];
1740 : : XLogRecPtr remote_lsn;
1741 : :
1061 1742 : 6 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1743 : : originname, sizeof(originname));
1264 1744 : 6 : originid = replorigin_by_name(originname, false);
1745 : 6 : remote_lsn = replorigin_get_progress(originid, false);
1746 : :
1747 : : /* Check the given LSN is at least a future LSN */
1748 [ + + - + ]: 6 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1264 akapila@postgresql.o 1749 [ # # ]:UBC 0 : ereport(ERROR,
1750 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1751 : : errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1752 : : LSN_FORMAT_ARGS(opts.lsn),
1753 : : LSN_FORMAT_ARGS(remote_lsn))));
1754 : : }
1755 : :
1264 akapila@postgresql.o 1756 :CBC 9 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1757 : 9 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1758 : :
1759 : 9 : update_tuple = true;
1760 : 9 : break;
1761 : : }
1762 : :
3089 peter_e@gmx.net 1763 :UBC 0 : default:
1764 [ # # ]: 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1765 : : stmt->kind);
1766 : : }
1767 : :
1768 : : /* Update the catalog if needed. */
3089 peter_e@gmx.net 1769 [ + + ]:CBC 198 : if (update_tuple)
1770 : : {
1771 : 176 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1772 : : replaces);
1773 : :
1774 : 176 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1775 : :
1776 : 176 : heap_freetuple(tup);
1777 : : }
1778 : :
1779 : : /*
1780 : : * Try to acquire the connection necessary either for modifying the slot
1781 : : * or for checking if the remote server permits enabling
1782 : : * retain_dead_tuples.
1783 : : *
1784 : : * This has to be at the end because otherwise if there is an error while
1785 : : * doing the database operations we won't be able to rollback altered
1786 : : * slot.
1787 : : */
45 akapila@postgresql.o 1788 [ + + + + :GNC 198 : if (update_failover || update_two_phase || check_pub_rdt)
+ + ]
1789 : : {
1790 : : bool must_use_password;
1791 : : char *err;
1792 : : WalReceiverConn *wrconn;
1793 : :
1794 : : /* Load the library providing us libpq calls. */
585 akapila@postgresql.o 1795 :CBC 12 : load_file("libpqwalreceiver", false);
1796 : :
1797 : : /*
1798 : : * Try to connect to the publisher, using the new connection string if
1799 : : * available.
1800 : : */
1801 [ + - - + ]: 12 : must_use_password = sub->passwordrequired && !sub->ownersuperuser;
45 akapila@postgresql.o 1802 [ - + ]:GNC 12 : wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1803 : : true, true, must_use_password, sub->name,
1804 : : &err);
585 akapila@postgresql.o 1805 [ - + ]:CBC 12 : if (!wrconn)
585 akapila@postgresql.o 1806 [ # # ]:UBC 0 : ereport(ERROR,
1807 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1808 : : errmsg("subscription \"%s\" could not connect to the publisher: %s",
1809 : : sub->name, err)));
1810 : :
585 akapila@postgresql.o 1811 [ + - ]:CBC 12 : PG_TRY();
1812 : : {
45 akapila@postgresql.o 1813 [ + + ]:GNC 12 : if (retain_dead_tuples)
1814 : 8 : check_pub_dead_tuple_retention(wrconn);
1815 : :
1816 : 12 : check_publications_origin(wrconn, sub->publications, false,
1817 : : retain_dead_tuples, origin, NULL, 0,
1818 : : sub->name);
1819 : :
1820 [ + + + + ]: 12 : if (update_failover || update_two_phase)
1821 [ + + + + ]: 5 : walrcv_alter_slot(wrconn, sub->slotname,
1822 : : update_failover ? &opts.failover : NULL,
1823 : : update_two_phase ? &opts.twophase : NULL);
1824 : : }
585 akapila@postgresql.o 1825 :UBC 0 : PG_FINALLY();
1826 : : {
585 akapila@postgresql.o 1827 :CBC 12 : walrcv_disconnect(wrconn);
1828 : : }
1829 [ - + ]: 12 : PG_END_TRY();
1830 : : }
1831 : :
2420 andres@anarazel.de 1832 : 198 : table_close(rel, RowExclusiveLock);
1833 : :
3089 peter_e@gmx.net 1834 : 198 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1835 : :
3152 1836 [ - + ]: 198 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1837 : :
1838 : : /* Wake up related replication workers to handle this change quickly. */
974 tgl@sss.pgh.pa.us 1839 : 198 : LogicalRepWorkersWakeupAtCommit(subid);
1840 : :
3152 peter_e@gmx.net 1841 : 198 : return myself;
1842 : : }
1843 : :
1844 : : /*
1845 : : * Drop a subscription
1846 : : */
1847 : : void
3109 1848 : 122 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1849 : : {
1850 : : Relation rel;
1851 : : ObjectAddress myself;
1852 : : HeapTuple tup;
1853 : : Oid subid;
1854 : : Oid subowner;
1855 : : Datum datum;
1856 : : bool isnull;
1857 : : char *subname;
1858 : : char *conninfo;
1859 : : char *slotname;
1860 : : List *subworkers;
1861 : : ListCell *lc;
1862 : : char originname[NAMEDATALEN];
3152 1863 : 122 : char *err = NULL;
1864 : : WalReceiverConn *wrconn;
1865 : : Form_pg_subscription form;
1866 : : List *rstates;
1867 : : bool must_use_password;
1868 : :
1869 : : /*
1870 : : * The launcher may concurrently start a new worker for this subscription.
1871 : : * During initialization, the worker checks for subscription validity and
1872 : : * exits if the subscription has already been dropped. See
1873 : : * InitializeLogRepWorker.
1874 : : */
18 akapila@postgresql.o 1875 : 122 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1876 : :
29 peter@eisentraut.org 1877 :GNC 122 : tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
3152 peter_e@gmx.net 1878 :CBC 122 : CStringGetDatum(stmt->subname));
1879 : :
1880 [ + + ]: 122 : if (!HeapTupleIsValid(tup))
1881 : : {
2420 andres@anarazel.de 1882 : 6 : table_close(rel, NoLock);
1883 : :
3152 peter_e@gmx.net 1884 [ + + ]: 6 : if (!stmt->missing_ok)
1885 [ + - ]: 3 : ereport(ERROR,
1886 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1887 : : errmsg("subscription \"%s\" does not exist",
1888 : : stmt->subname)));
1889 : : else
1890 [ + - ]: 3 : ereport(NOTICE,
1891 : : (errmsg("subscription \"%s\" does not exist, skipping",
1892 : : stmt->subname)));
1893 : :
1894 : 46 : return;
1895 : : }
1896 : :
2482 andres@anarazel.de 1897 : 116 : form = (Form_pg_subscription) GETSTRUCT(tup);
1898 : 116 : subid = form->oid;
891 rhaas@postgresql.org 1899 : 116 : subowner = form->subowner;
1900 [ + + + + ]: 116 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1901 : :
1902 : : /* must be owner */
1028 peter@eisentraut.org 1903 [ - + ]: 116 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2835 peter_e@gmx.net 1904 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3152 1905 : 0 : stmt->subname);
1906 : :
1907 : : /* DROP hook for the subscription being removed */
3152 peter_e@gmx.net 1908 [ - + ]:CBC 116 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1909 : :
1910 : : /*
1911 : : * Lock the subscription so nobody else can do anything with it (including
1912 : : * the replication workers).
1913 : : */
1914 : 116 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1915 : :
1916 : : /* Get subname */
896 dgustafsson@postgres 1917 : 116 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1918 : : Anum_pg_subscription_subname);
3152 peter_e@gmx.net 1919 : 116 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1920 : :
1921 : : /* Get conninfo */
896 dgustafsson@postgres 1922 : 116 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1923 : : Anum_pg_subscription_subconninfo);
3067 peter_e@gmx.net 1924 : 116 : conninfo = TextDatumGetCString(datum);
1925 : :
1926 : : /* Get slotname */
3152 1927 : 116 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1928 : : Anum_pg_subscription_subslotname, &isnull);
3042 1929 [ + + ]: 116 : if (!isnull)
1930 : 73 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1931 : : else
1932 : 43 : slotname = NULL;
1933 : :
1934 : : /*
1935 : : * Since dropping a replication slot is not transactional, the replication
1936 : : * slot stays dropped even if the transaction rolls back. So we cannot
1937 : : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1938 : : * replication slot. Also, in this case, we report a message for dropping
1939 : : * the subscription to the cumulative stats system.
1940 : : *
1941 : : * XXX The command name should really be something like "DROP SUBSCRIPTION
1942 : : * of a subscription that is associated with a replication slot", but we
1943 : : * don't have the proper facilities for that.
1944 : : */
1945 [ + + ]: 116 : if (slotname)
2759 1946 : 73 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1947 : :
3152 1948 : 113 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1949 : 113 : EventTriggerSQLDropAddObject(&myself, true, true);
1950 : :
1951 : : /* Remove the tuple from catalog. */
3139 tgl@sss.pgh.pa.us 1952 : 113 : CatalogTupleDelete(rel, &tup->t_self);
1953 : :
3152 peter_e@gmx.net 1954 : 113 : ReleaseSysCache(tup);
1955 : :
1956 : : /*
1957 : : * Stop all the subscription workers immediately.
1958 : : *
1959 : : * This is necessary if we are dropping the replication slot, so that the
1960 : : * slot becomes accessible.
1961 : : *
1962 : : * It is also necessary if the subscription is disabled and was disabled
1963 : : * in the same transaction. Then the workers haven't seen the disabling
1964 : : * yet and will still be running, leading to hangs later when we want to
1965 : : * drop the replication origin. If the subscription was disabled before
1966 : : * this transaction, then there shouldn't be any workers left, so this
1967 : : * won't make a difference.
1968 : : *
1969 : : * New workers won't be started because we hold an exclusive lock on the
1970 : : * subscription till the end of the transaction.
1971 : : */
409 akapila@postgresql.o 1972 : 113 : subworkers = logicalrep_workers_find(subid, false, true);
2945 tgl@sss.pgh.pa.us 1973 [ + + + + : 187 : foreach(lc, subworkers)
+ + ]
1974 : : {
2955 peter_e@gmx.net 1975 : 74 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1976 : :
2911 1977 : 74 : logicalrep_worker_stop(w->subid, w->relid);
1978 : : }
2955 1979 : 113 : list_free(subworkers);
1980 : :
1981 : : /*
1982 : : * Remove the no-longer-useful entry in the launcher's table of apply
1983 : : * worker start times.
1984 : : *
1985 : : * If this transaction rolls back, the launcher might restart a failed
1986 : : * apply worker before wal_retrieve_retry_interval milliseconds have
1987 : : * elapsed, but that's pretty harmless.
1988 : : */
958 tgl@sss.pgh.pa.us 1989 : 113 : ApplyLauncherForgetWorkerStartTime(subid);
1990 : :
1991 : : /*
1992 : : * Cleanup of tablesync replication origins.
1993 : : *
1994 : : * Any READY-state relations would already have dealt with clean-ups.
1995 : : *
1996 : : * Note that the state can't change because we have already stopped both
1997 : : * the apply and tablesync workers and they can't restart because of
1998 : : * exclusive lock on the subscription.
1999 : : */
1137 michael@paquier.xyz 2000 : 113 : rstates = GetSubscriptionRelations(subid, true);
1667 akapila@postgresql.o 2001 [ + + + + : 118 : foreach(lc, rstates)
+ + ]
2002 : : {
2003 : 5 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2004 : 5 : Oid relid = rstate->relid;
2005 : :
2006 : : /* Only cleanup resources of tablesync workers */
2007 [ - + ]: 5 : if (!OidIsValid(relid))
1667 akapila@postgresql.o 2008 :UBC 0 : continue;
2009 : :
2010 : : /*
2011 : : * Drop the tablesync's origin tracking if exists.
2012 : : *
2013 : : * It is possible that the origin is not yet created for tablesync
2014 : : * worker so passing missing_ok = true. This can happen for the states
2015 : : * before SUBREL_STATE_FINISHEDCOPY.
2016 : : */
1061 akapila@postgresql.o 2017 :CBC 5 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
2018 : : sizeof(originname));
1090 2019 : 5 : replorigin_drop_by_name(originname, true, false);
2020 : : }
2021 : :
2022 : : /* Clean up dependencies */
3151 alvherre@alvh.no-ip. 2023 : 113 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2024 : :
2025 : : /* Remove any associated relation synchronization states. */
3089 peter_e@gmx.net 2026 : 113 : RemoveSubscriptionRel(subid, InvalidOid);
2027 : :
2028 : : /* Remove the origin tracking if exists. */
1061 akapila@postgresql.o 2029 : 113 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1669 2030 : 113 : replorigin_drop_by_name(originname, true, false);
2031 : :
2032 : : /*
2033 : : * Tell the cumulative stats system that the subscription is getting
2034 : : * dropped.
2035 : : */
794 msawada@postgresql.o 2036 : 113 : pgstat_drop_subscription(subid);
2037 : :
2038 : : /*
2039 : : * If there is no slot associated with the subscription, we can finish
2040 : : * here.
2041 : : */
1667 akapila@postgresql.o 2042 [ + + + - ]: 113 : if (!slotname && rstates == NIL)
2043 : : {
2420 andres@anarazel.de 2044 : 43 : table_close(rel, NoLock);
3152 peter_e@gmx.net 2045 : 43 : return;
2046 : : }
2047 : :
2048 : : /*
2049 : : * Try to acquire the connection necessary for dropping slots.
2050 : : *
2051 : : * Note: If the slotname is NONE/NULL then we allow the command to finish
2052 : : * and users need to manually cleanup the apply and tablesync worker slots
2053 : : * later.
2054 : : *
2055 : : * This has to be at the end because otherwise if there is an error while
2056 : : * doing the database operations we won't be able to rollback dropped
2057 : : * slot.
2058 : : */
2059 : 70 : load_file("libpqwalreceiver", false);
2060 : :
579 akapila@postgresql.o 2061 : 70 : wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2062 : : subname, &err);
3152 peter_e@gmx.net 2063 [ - + ]: 70 : if (wrconn == NULL)
2064 : : {
1667 akapila@postgresql.o 2065 [ # # ]:UBC 0 : if (!slotname)
2066 : : {
2067 : : /* be tidy */
2068 : 0 : list_free(rstates);
2069 : 0 : table_close(rel, NoLock);
2070 : 0 : return;
2071 : : }
2072 : : else
2073 : : {
2074 : 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
2075 : : }
2076 : : }
2077 : :
1667 akapila@postgresql.o 2078 [ + + ]:CBC 70 : PG_TRY();
2079 : : {
2080 [ + + + + : 75 : foreach(lc, rstates)
+ + ]
2081 : : {
2082 : 5 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2083 : 5 : Oid relid = rstate->relid;
2084 : :
2085 : : /* Only cleanup resources of tablesync workers */
2086 [ - + ]: 5 : if (!OidIsValid(relid))
1667 akapila@postgresql.o 2087 :UBC 0 : continue;
2088 : :
2089 : : /*
2090 : : * Drop the tablesync slots associated with removed tables.
2091 : : *
2092 : : * For SYNCDONE/READY states, the tablesync slot is known to have
2093 : : * already been dropped by the tablesync worker.
2094 : : *
2095 : : * For other states, there is no certainty, maybe the slot does
2096 : : * not exist yet. Also, if we fail after removing some of the
2097 : : * slots, next time, it will again try to drop already dropped
2098 : : * slots and fail. For these reasons, we allow missing_ok = true
2099 : : * for the drop.
2100 : : */
1667 akapila@postgresql.o 2101 [ + - ]:CBC 5 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2102 : : {
2103 : 5 : char syncslotname[NAMEDATALEN] = {0};
2104 : :
1664 2105 : 5 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2106 : : sizeof(syncslotname));
1667 2107 : 5 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2108 : : }
2109 : : }
2110 : :
2111 : 70 : list_free(rstates);
2112 : :
2113 : : /*
2114 : : * If there is a slot associated with the subscription, then drop the
2115 : : * replication slot at the publisher.
2116 : : */
2117 [ + - ]: 70 : if (slotname)
2118 : 70 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2119 : : }
2120 : 1 : PG_FINALLY();
2121 : : {
2122 : 70 : walrcv_disconnect(wrconn);
2123 : : }
2124 [ + + ]: 70 : PG_END_TRY();
2125 : :
2126 : 69 : table_close(rel, NoLock);
2127 : : }
2128 : :
2129 : : /*
2130 : : * Drop the replication slot at the publisher node using the replication
2131 : : * connection.
2132 : : *
2133 : : * missing_ok - if true then only issue a LOG message if the slot doesn't
2134 : : * exist.
2135 : : */
2136 : : void
2137 : 265 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2138 : : {
2139 : : StringInfoData cmd;
2140 : :
2141 [ - + ]: 265 : Assert(wrconn);
2142 : :
2143 : 265 : load_file("libpqwalreceiver", false);
2144 : :
2145 : 265 : initStringInfo(&cmd);
2146 : 265 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2147 : :
3104 fujii@postgresql.org 2148 [ + + ]: 265 : PG_TRY();
2149 : : {
2150 : : WalRcvExecResult *res;
2151 : :
3089 peter_e@gmx.net 2152 : 265 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2153 : :
1667 akapila@postgresql.o 2154 [ + + ]: 265 : if (res->status == WALRCV_OK_COMMAND)
2155 : : {
2156 : : /* NOTICE. Success. */
2157 [ + + ]: 262 : ereport(NOTICE,
2158 : : (errmsg("dropped replication slot \"%s\" on publisher",
2159 : : slotname)));
2160 : : }
2161 [ + - + + ]: 3 : else if (res->status == WALRCV_ERROR &&
2162 : 2 : missing_ok &&
2163 [ + - ]: 2 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2164 : : {
2165 : : /* LOG. Error, but missing_ok = true. */
2166 [ + - ]: 2 : ereport(LOG,
2167 : : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2168 : : slotname, res->err)));
2169 : : }
2170 : : else
2171 : : {
2172 : : /* ERROR. */
2173 [ + - ]: 1 : ereport(ERROR,
2174 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2175 : : errmsg("could not drop replication slot \"%s\" on publisher: %s",
2176 : : slotname, res->err)));
2177 : : }
2178 : :
3089 peter_e@gmx.net 2179 : 264 : walrcv_clear_result(res);
2180 : : }
2136 peter@eisentraut.org 2181 : 1 : PG_FINALLY();
2182 : : {
1667 akapila@postgresql.o 2183 : 265 : pfree(cmd.data);
2184 : : }
3104 fujii@postgresql.org 2185 [ + + ]: 265 : PG_END_TRY();
3152 peter_e@gmx.net 2186 : 264 : }
2187 : :
2188 : : /*
2189 : : * Internal workhorse for changing a subscription owner
2190 : : */
2191 : : static void
2192 : 9 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2193 : : {
2194 : : Form_pg_subscription form;
2195 : : AclResult aclresult;
2196 : :
2197 : 9 : form = (Form_pg_subscription) GETSTRUCT(tup);
2198 : :
2199 [ + + ]: 9 : if (form->subowner == newOwnerId)
2200 : 2 : return;
2201 : :
1028 peter@eisentraut.org 2202 [ - + ]: 7 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2835 peter_e@gmx.net 2203 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
3152 2204 : 0 : NameStr(form->subname));
2205 : :
2206 : : /*
2207 : : * Don't allow non-superuser modification of a subscription with
2208 : : * password_required=false.
2209 : : */
891 rhaas@postgresql.org 2210 [ - + - - ]:CBC 7 : if (!form->subpasswordrequired && !superuser())
3152 peter_e@gmx.net 2211 [ # # ]:UBC 0 : ereport(ERROR,
2212 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2213 : : errmsg("password_required=false is superuser-only"),
2214 : : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2215 : :
2216 : : /* Must be able to become new owner */
891 rhaas@postgresql.org 2217 :CBC 7 : check_can_set_role(GetUserId(), newOwnerId);
2218 : :
2219 : : /*
2220 : : * current owner must have CREATE on database
2221 : : *
2222 : : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2223 : : * other object types behave differently (e.g. you can't give a table to a
2224 : : * user who lacks CREATE privileges on a schema).
2225 : : */
2226 : 4 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2227 : : GetUserId(), ACL_CREATE);
2228 [ - + ]: 4 : if (aclresult != ACLCHECK_OK)
891 rhaas@postgresql.org 2229 :UBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2230 : 0 : get_database_name(MyDatabaseId));
2231 : :
3152 peter_e@gmx.net 2232 :CBC 4 : form->subowner = newOwnerId;
3140 alvherre@alvh.no-ip. 2233 : 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2234 : :
2235 : : /* Update owner dependency reference */
3152 peter_e@gmx.net 2236 : 4 : changeDependencyOnOwner(SubscriptionRelationId,
2237 : : form->oid,
2238 : : newOwnerId);
2239 : :
2240 [ - + ]: 4 : InvokeObjectPostAlterHook(SubscriptionRelationId,
2241 : : form->oid, 0);
2242 : :
2243 : : /* Wake up related background processes to handle this change quickly. */
1338 jdavis@postgresql.or 2244 : 4 : ApplyLauncherWakeupAtCommit();
974 tgl@sss.pgh.pa.us 2245 : 4 : LogicalRepWorkersWakeupAtCommit(form->oid);
2246 : : }
2247 : :
2248 : : /*
2249 : : * Change subscription owner -- by name
2250 : : */
2251 : : ObjectAddress
3152 peter_e@gmx.net 2252 : 9 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2253 : : {
2254 : : Oid subid;
2255 : : HeapTuple tup;
2256 : : Relation rel;
2257 : : ObjectAddress address;
2258 : : Form_pg_subscription form;
2259 : :
2420 andres@anarazel.de 2260 : 9 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2261 : :
29 peter@eisentraut.org 2262 :GNC 9 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2263 : : CStringGetDatum(name));
2264 : :
3152 peter_e@gmx.net 2265 [ - + ]:CBC 9 : if (!HeapTupleIsValid(tup))
3152 peter_e@gmx.net 2266 [ # # ]:UBC 0 : ereport(ERROR,
2267 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2268 : : errmsg("subscription \"%s\" does not exist", name)));
2269 : :
2482 andres@anarazel.de 2270 :CBC 9 : form = (Form_pg_subscription) GETSTRUCT(tup);
2271 : 9 : subid = form->oid;
2272 : :
3152 peter_e@gmx.net 2273 : 9 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2274 : :
2275 : 6 : ObjectAddressSet(address, SubscriptionRelationId, subid);
2276 : :
2277 : 6 : heap_freetuple(tup);
2278 : :
2420 andres@anarazel.de 2279 : 6 : table_close(rel, RowExclusiveLock);
2280 : :
3152 peter_e@gmx.net 2281 : 6 : return address;
2282 : : }
2283 : :
2284 : : /*
2285 : : * Change subscription owner -- by OID
2286 : : */
2287 : : void
3152 peter_e@gmx.net 2288 :UBC 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2289 : : {
2290 : : HeapTuple tup;
2291 : : Relation rel;
2292 : :
2420 andres@anarazel.de 2293 : 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2294 : :
3152 peter_e@gmx.net 2295 : 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2296 : :
2297 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
2298 [ # # ]: 0 : ereport(ERROR,
2299 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2300 : : errmsg("subscription with OID %u does not exist", subid)));
2301 : :
2302 : 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2303 : :
2304 : 0 : heap_freetuple(tup);
2305 : :
2420 andres@anarazel.de 2306 : 0 : table_close(rel, RowExclusiveLock);
3152 peter_e@gmx.net 2307 : 0 : }
2308 : :
2309 : : /*
2310 : : * Check and log a warning if the publisher has subscribed to the same table,
2311 : : * its partition ancestors (if it's a partition), or its partition children (if
2312 : : * it's a partitioned table), from some other publishers. This check is
2313 : : * required in the following scenarios:
2314 : : *
2315 : : * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2316 : : * with "copy_data = true" and "origin = none":
2317 : : * - Warn the user that data with an origin might have been copied.
2318 : : * - This check is skipped for tables already added, as incremental sync via
2319 : : * WAL allows origin tracking. The list of such tables is in
2320 : : * subrel_local_oids.
2321 : : *
2322 : : * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
2323 : : * with "retain_dead_tuples = true" and "origin = any", and for ALTER
2324 : : * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
2325 : : * when the publisher's status changes (e.g., due to a connection string
2326 : : * update):
2327 : : * - Warn the user that only conflict detection info for local changes on
2328 : : * the publisher is retained. Data from other origins may lack sufficient
2329 : : * details for reliable conflict detection.
2330 : : * - See comments atop worker.c for more details.
2331 : : */
2332 : : static void
1094 akapila@postgresql.o 2333 :CBC 161 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
2334 : : bool copydata, bool retain_dead_tuples,
2335 : : char *origin, Oid *subrel_local_oids,
2336 : : int subrel_count, char *subname)
2337 : : {
2338 : : WalRcvExecResult *res;
2339 : : StringInfoData cmd;
2340 : : TupleTableSlot *slot;
2341 : 161 : Oid tableRow[1] = {TEXTOID};
2342 : 161 : List *publist = NIL;
2343 : : int i;
2344 : : bool check_rdt;
2345 : : bool check_table_sync;
45 akapila@postgresql.o 2346 [ + - + + ]:GNC 322 : bool origin_none = origin &&
2347 : 161 : pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2348 : :
2349 : : /*
2350 : : * Enable retain_dead_tuples checks only when origin is set to 'any',
2351 : : * since with origin='none' only local changes are replicated to the
2352 : : * subscriber.
2353 : : */
2354 [ + + + + ]: 161 : check_rdt = retain_dead_tuples && !origin_none;
2355 : :
2356 : : /*
2357 : : * Enable table synchronization checks only when origin is 'none', to
2358 : : * ensure that data from other origins is not inadvertently copied.
2359 : : */
2360 [ + + + + ]: 161 : check_table_sync = copydata && origin_none;
2361 : :
2362 : : /* retain_dead_tuples and table sync checks occur separately */
2363 [ + + - + ]: 161 : Assert(!(check_rdt && check_table_sync));
2364 : :
2365 : : /* Return if no checks are required */
2366 [ + + + + ]: 161 : if (!check_rdt && !check_table_sync)
1094 akapila@postgresql.o 2367 :CBC 148 : return;
2368 : :
2369 : 13 : initStringInfo(&cmd);
2370 : 13 : appendStringInfoString(&cmd,
2371 : : "SELECT DISTINCT P.pubname AS pubname\n"
2372 : : "FROM pg_publication P,\n"
2373 : : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2374 : : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2375 : : " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2376 : : " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2377 : : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2378 : : "WHERE C.oid = GPT.relid AND P.pubname IN (");
316 michael@paquier.xyz 2379 : 13 : GetPublicationsStr(publications, &cmd, true);
1094 akapila@postgresql.o 2380 : 13 : appendStringInfoString(&cmd, ")\n");
2381 : :
2382 : : /*
2383 : : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
2384 : : * the list of relation oids that are already present on the subscriber.
2385 : : * This check should be skipped for these tables if checking for table
2386 : : * sync scenario. However, when handling the retain_dead_tuples scenario,
2387 : : * ensure all tables are checked, as some existing tables may now include
2388 : : * changes from other origins due to newly created subscriptions on the
2389 : : * publisher.
2390 : : */
45 akapila@postgresql.o 2391 [ + + ]:GNC 13 : if (check_table_sync)
2392 : : {
2393 [ + + ]: 12 : for (i = 0; i < subrel_count; i++)
2394 : : {
2395 : 3 : Oid relid = subrel_local_oids[i];
2396 : 3 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
2397 : 3 : char *tablename = get_rel_name(relid);
2398 : :
2399 : 3 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2400 : : schemaname, tablename);
2401 : : }
2402 : : }
2403 : :
1094 akapila@postgresql.o 2404 :CBC 13 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2405 : 13 : pfree(cmd.data);
2406 : :
2407 [ - + ]: 13 : if (res->status != WALRCV_OK_TUPLES)
1094 akapila@postgresql.o 2408 [ # # ]:UBC 0 : ereport(ERROR,
2409 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2410 : : errmsg("could not receive list of replicated tables from the publisher: %s",
2411 : : res->err)));
2412 : :
2413 : : /* Process tables. */
1094 akapila@postgresql.o 2414 :CBC 13 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2415 [ + + ]: 18 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2416 : : {
2417 : : char *pubname;
2418 : : bool isnull;
2419 : :
2420 : 5 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2421 [ - + ]: 5 : Assert(!isnull);
2422 : :
2423 : 5 : ExecClearTuple(slot);
2424 : 5 : publist = list_append_unique(publist, makeString(pubname));
2425 : : }
2426 : :
2427 : : /*
2428 : : * Log a warning if the publisher has subscribed to the same table from
2429 : : * some other publisher. We cannot know the origin of data during the
2430 : : * initial sync. Data origins can be found only from the WAL by looking at
2431 : : * the origin id.
2432 : : *
2433 : : * XXX: For simplicity, we don't check whether the table has any data or
2434 : : * not. If the table doesn't have any data then we don't need to
2435 : : * distinguish between data having origin and data not having origin so we
2436 : : * can avoid logging a warning for table sync scenario.
2437 : : */
2438 [ + + ]: 13 : if (publist)
2439 : : {
2440 : 5 : StringInfo pubnames = makeStringInfo();
45 akapila@postgresql.o 2441 :GNC 5 : StringInfo err_msg = makeStringInfo();
2442 : 5 : StringInfo err_hint = makeStringInfo();
2443 : :
2444 : : /* Prepare the list of publication(s) for warning message. */
316 michael@paquier.xyz 2445 :CBC 5 : GetPublicationsStr(publist, pubnames, false);
2446 : :
45 akapila@postgresql.o 2447 [ + + ]:GNC 5 : if (check_table_sync)
2448 : : {
2449 : 4 : appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2450 : : subname);
2451 : 4 : appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2452 : : }
2453 : : else
2454 : : {
2455 : 1 : appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2456 : : subname);
2457 : 1 : appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2458 : : }
2459 : :
1094 akapila@postgresql.o 2460 [ + - ]:CBC 5 : ereport(WARNING,
2461 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2462 : : errmsg_internal("%s", err_msg->data),
2463 : : errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2464 : : "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2465 : : list_length(publist), pubnames->data),
2466 : : errhint_internal("%s", err_hint->data));
2467 : : }
2468 : :
2469 : 13 : ExecDropSingleTupleTableSlot(slot);
2470 : :
2471 : 13 : walrcv_clear_result(res);
2472 : : }
2473 : :
2474 : : /*
2475 : : * Determine whether the retain_dead_tuples can be enabled based on the
2476 : : * publisher's status.
2477 : : *
2478 : : * This option is disallowed if the publisher is running a version earlier
2479 : : * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2480 : : * server).
2481 : : *
2482 : : * See comments atop worker.c for a detailed explanation.
2483 : : */
2484 : : static void
45 akapila@postgresql.o 2485 :GNC 11 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
2486 : : {
2487 : : WalRcvExecResult *res;
2488 : 11 : Oid RecoveryRow[1] = {BOOLOID};
2489 : : TupleTableSlot *slot;
2490 : : bool isnull;
2491 : : bool remote_in_recovery;
2492 : :
2493 [ - + ]: 11 : if (walrcv_server_version(wrconn) < 19000)
45 akapila@postgresql.o 2494 [ # # ]:UNC 0 : ereport(ERROR,
2495 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2496 : : errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2497 : :
45 akapila@postgresql.o 2498 :GNC 11 : res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2499 : :
2500 [ - + ]: 11 : if (res->status != WALRCV_OK_TUPLES)
45 akapila@postgresql.o 2501 [ # # ]:UNC 0 : ereport(ERROR,
2502 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2503 : : errmsg("could not obtain recovery progress from the publisher: %s",
2504 : : res->err)));
2505 : :
45 akapila@postgresql.o 2506 :GNC 11 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2507 [ - + ]: 11 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
45 akapila@postgresql.o 2508 [ # # ]:UNC 0 : elog(ERROR, "failed to fetch tuple for the recovery progress");
2509 : :
45 akapila@postgresql.o 2510 :GNC 11 : remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2511 : :
2512 [ - + ]: 11 : if (remote_in_recovery)
45 akapila@postgresql.o 2513 [ # # ]:UNC 0 : ereport(ERROR,
2514 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2515 : : errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2516 : :
45 akapila@postgresql.o 2517 :GNC 11 : ExecDropSingleTupleTableSlot(slot);
2518 : :
2519 : 11 : walrcv_clear_result(res);
2520 : 11 : }
2521 : :
2522 : : /*
2523 : : * Check if the subscriber's configuration is adequate to enable the
2524 : : * retain_dead_tuples option.
2525 : : *
2526 : : * Issue an ERROR if the wal_level does not support the use of replication
2527 : : * slots when check_guc is set to true.
2528 : : *
2529 : : * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2530 : : * set to true. This is only to highlight the importance of enabling
2531 : : * track_commit_timestamp instead of catching all the misconfigurations, as
2532 : : * this setting can be adjusted after subscription creation. Without it, the
2533 : : * apply worker will simply skip conflict detection.
2534 : : *
2535 : : * Issue a WARNING or NOTICE if the subscription is disabled and the retention
2536 : : * is active. Do not raise an ERROR since users can only modify
2537 : : * retain_dead_tuples for disabled subscriptions. And as long as the
2538 : : * subscription is enabled promptly, it will not pose issues.
2539 : : *
2540 : : * Issue a NOTICE to inform users that max_retention_duration is
2541 : : * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
2542 : : * is not issued because setting max_retention_duration causes no harm,
2543 : : * even when it is ineffective.
2544 : : */
2545 : : void
2546 : 235 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2547 : : int elevel_for_sub_disabled,
2548 : : bool retain_dead_tuples, bool retention_active,
2549 : : bool max_retention_set)
2550 : : {
2551 [ + + - + ]: 235 : Assert(elevel_for_sub_disabled == NOTICE ||
2552 : : elevel_for_sub_disabled == WARNING);
2553 : :
4 2554 [ + + ]: 235 : if (retain_dead_tuples)
2555 : : {
2556 [ + + - + ]: 14 : if (check_guc && wal_level < WAL_LEVEL_REPLICA)
4 akapila@postgresql.o 2557 [ # # ]:UNC 0 : ereport(ERROR,
2558 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2559 : : errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2560 : : errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2561 : :
4 akapila@postgresql.o 2562 [ + + + + ]:GNC 14 : if (check_guc && !track_commit_timestamp)
2563 [ + - ]: 4 : ereport(WARNING,
2564 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2565 : : errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2566 : : errhint("Consider setting \"%s\" to true.",
2567 : : "track_commit_timestamp"));
2568 : :
2569 [ + + + - ]: 14 : if (sub_disabled && retention_active)
2570 [ + - + + ]: 6 : ereport(elevel_for_sub_disabled,
2571 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2572 : : errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2573 : : (elevel_for_sub_disabled > NOTICE)
2574 : : ? errhint("Consider setting %s to false.",
2575 : : "retain_dead_tuples") : 0);
2576 : : }
2577 [ + + ]: 221 : else if (max_retention_set)
2578 : : {
2579 [ + - ]: 3 : ereport(NOTICE,
2580 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2581 : : errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2582 : : }
45 2583 : 235 : }
2584 : :
2585 : : /*
2586 : : * Get the list of tables which belong to specified publications on the
2587 : : * publisher connection.
2588 : : *
2589 : : * Note that we don't support the case where the column list is different for
2590 : : * the same table in different publications to avoid sending unwanted column
2591 : : * information for some of the rows. This can happen when both the column
2592 : : * list and row filter are specified for different publications.
2593 : : */
2594 : : static List *
3089 peter_e@gmx.net 2595 :CBC 149 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2596 : : {
2597 : : WalRcvExecResult *res;
2598 : : StringInfoData cmd;
2599 : : TupleTableSlot *slot;
892 akapila@postgresql.o 2600 : 149 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
3034 bruce@momjian.us 2601 : 149 : List *tablelist = NIL;
892 akapila@postgresql.o 2602 : 149 : int server_version = walrcv_server_version(wrconn);
2603 : 149 : bool check_columnlist = (server_version >= 150000);
316 michael@paquier.xyz 2604 : 149 : StringInfo pub_names = makeStringInfo();
2605 : :
3089 peter_e@gmx.net 2606 : 149 : initStringInfo(&cmd);
2607 : :
2608 : : /* Build the pub_names comma-separated string. */
316 michael@paquier.xyz 2609 : 149 : GetPublicationsStr(publications, pub_names, true);
2610 : :
2611 : : /* Get the list of tables from the publisher. */
892 akapila@postgresql.o 2612 [ + - ]: 149 : if (server_version >= 160000)
2613 : : {
2614 : 149 : tableRow[2] = INT2VECTOROID;
2615 : :
2616 : : /*
2617 : : * From version 16, we allowed passing multiple publications to the
2618 : : * function pg_get_publication_tables. This helped to filter out the
2619 : : * partition table whose ancestor is also published in this
2620 : : * publication array.
2621 : : *
2622 : : * Join pg_get_publication_tables with pg_publication to exclude
2623 : : * non-existing publications.
2624 : : *
2625 : : * Note that attrs are always stored in sorted order so we don't need
2626 : : * to worry if different publications have specified them in a
2627 : : * different order. See pub_collist_validate.
2628 : : */
2629 : 149 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2630 : : " FROM pg_class c\n"
2631 : : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2632 : : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2633 : : " FROM pg_publication\n"
2634 : : " WHERE pubname IN ( %s )) AS gpt\n"
2635 : : " ON gpt.relid = c.oid\n",
2636 : : pub_names->data);
2637 : : }
2638 : : else
2639 : : {
892 akapila@postgresql.o 2640 :UBC 0 : tableRow[2] = NAMEARRAYOID;
2641 : 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2642 : :
2643 : : /* Get column lists for each relation if the publisher supports it */
2644 [ # # ]: 0 : if (check_columnlist)
2645 : 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2646 : :
316 michael@paquier.xyz 2647 : 0 : appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2648 : : " WHERE t.pubname IN ( %s )",
2649 : : pub_names->data);
2650 : : }
2651 : :
316 michael@paquier.xyz 2652 :CBC 149 : destroyStringInfo(pub_names);
2653 : :
1192 akapila@postgresql.o 2654 [ + - ]: 149 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
3089 peter_e@gmx.net 2655 : 149 : pfree(cmd.data);
2656 : :
2657 [ - + ]: 149 : if (res->status != WALRCV_OK_TUPLES)
3089 peter_e@gmx.net 2658 [ # # ]:UBC 0 : ereport(ERROR,
2659 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2660 : : errmsg("could not receive list of replicated tables from the publisher: %s",
2661 : : res->err)));
2662 : :
2663 : : /* Process tables. */
2487 andres@anarazel.de 2664 :CBC 149 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3089 peter_e@gmx.net 2665 [ + + ]: 417 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2666 : : {
2667 : : char *nspname;
2668 : : char *relname;
2669 : : bool isnull;
2670 : : RangeVar *rv;
2671 : :
2672 : 269 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2673 [ - + ]: 269 : Assert(!isnull);
2674 : 269 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2675 [ - + ]: 269 : Assert(!isnull);
2676 : :
1694 akapila@postgresql.o 2677 : 269 : rv = makeRangeVar(nspname, relname, -1);
2678 : :
1192 2679 [ + - + + ]: 269 : if (check_columnlist && list_member(tablelist, rv))
2680 [ + - ]: 1 : ereport(ERROR,
2681 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2682 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2683 : : nspname, relname));
2684 : : else
2685 : 268 : tablelist = lappend(tablelist, rv);
2686 : :
3089 peter_e@gmx.net 2687 : 268 : ExecClearTuple(slot);
2688 : : }
2689 : 148 : ExecDropSingleTupleTableSlot(slot);
2690 : :
2691 : 148 : walrcv_clear_result(res);
2692 : :
2693 : 148 : return tablelist;
2694 : : }
2695 : :
2696 : : /*
2697 : : * This is to report the connection failure while dropping replication slots.
2698 : : * Here, we report the WARNING for all tablesync slots so that user can drop
2699 : : * them manually, if required.
2700 : : */
2701 : : static void
1667 akapila@postgresql.o 2702 :UBC 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2703 : : {
2704 : : ListCell *lc;
2705 : :
2706 [ # # # # : 0 : foreach(lc, rstates)
# # ]
2707 : : {
2708 : 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2709 : 0 : Oid relid = rstate->relid;
2710 : :
2711 : : /* Only cleanup resources of tablesync workers */
2712 [ # # ]: 0 : if (!OidIsValid(relid))
2713 : 0 : continue;
2714 : :
2715 : : /*
2716 : : * Caller needs to ensure that relstate doesn't change underneath us.
2717 : : * See DropSubscription where we get the relstates.
2718 : : */
2719 [ # # ]: 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2720 : : {
2721 : 0 : char syncslotname[NAMEDATALEN] = {0};
2722 : :
1664 2723 : 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2724 : : sizeof(syncslotname));
1667 2725 [ # # ]: 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2726 : : syncslotname);
2727 : : }
2728 : : }
2729 : :
2730 [ # # ]: 0 : ereport(ERROR,
2731 : : (errcode(ERRCODE_CONNECTION_FAILURE),
2732 : : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2733 : : slotname, err),
2734 : : /* translator: %s is an SQL ALTER command */
2735 : : errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2736 : : "ALTER SUBSCRIPTION ... DISABLE",
2737 : : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2738 : : }
2739 : :
2740 : : /*
2741 : : * Check for duplicates in the given list of publications and error out if
2742 : : * found one. Add publications to datums as text datums, if datums is not
2743 : : * NULL.
2744 : : */
2745 : : static void
1614 peter@eisentraut.org 2746 :CBC 222 : check_duplicates_in_publist(List *publist, Datum *datums)
2747 : : {
2748 : : ListCell *cell;
2749 : 222 : int j = 0;
2750 : :
2751 [ + - + + : 506 : foreach(cell, publist)
+ + ]
2752 : : {
2753 : 293 : char *name = strVal(lfirst(cell));
2754 : : ListCell *pcell;
2755 : :
2756 [ + - + - : 442 : foreach(pcell, publist)
+ - ]
2757 : : {
2758 : 442 : char *pname = strVal(lfirst(pcell));
2759 : :
2760 [ + + ]: 442 : if (pcell == cell)
2761 : 284 : break;
2762 : :
2763 [ + + ]: 158 : if (strcmp(name, pname) == 0)
2764 [ + - ]: 9 : ereport(ERROR,
2765 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
2766 : : errmsg("publication name \"%s\" used more than once",
2767 : : pname)));
2768 : : }
2769 : :
2770 [ + + ]: 284 : if (datums)
2771 : 241 : datums[j++] = CStringGetTextDatum(name);
2772 : : }
2773 : 213 : }
2774 : :
2775 : : /*
2776 : : * Merge current subscription's publications and user-specified publications
2777 : : * from ADD/DROP PUBLICATIONS.
2778 : : *
2779 : : * If addpub is true, we will add the list of publications into oldpublist.
2780 : : * Otherwise, we will delete the list of publications from oldpublist. The
2781 : : * returned list is a copy, oldpublist itself is not changed.
2782 : : *
2783 : : * subname is the subscription name, for error messages.
2784 : : */
2785 : : static List *
2786 : 27 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2787 : : {
2788 : : ListCell *lc;
2789 : :
2790 : 27 : oldpublist = list_copy(oldpublist);
2791 : :
2792 : 27 : check_duplicates_in_publist(newpublist, NULL);
2793 : :
2794 [ + - + + : 46 : foreach(lc, newpublist)
+ + ]
2795 : : {
2796 : 34 : char *name = strVal(lfirst(lc));
2797 : : ListCell *lc2;
2798 : 34 : bool found = false;
2799 : :
2800 [ + - + + : 67 : foreach(lc2, oldpublist)
+ + ]
2801 : : {
2802 : 55 : char *pubname = strVal(lfirst(lc2));
2803 : :
2804 [ + + ]: 55 : if (strcmp(name, pubname) == 0)
2805 : : {
2806 : 22 : found = true;
2807 [ + + ]: 22 : if (addpub)
2808 [ + - ]: 6 : ereport(ERROR,
2809 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
2810 : : errmsg("publication \"%s\" is already in subscription \"%s\"",
2811 : : name, subname)));
2812 : : else
2813 : 16 : oldpublist = foreach_delete_current(oldpublist, lc2);
2814 : :
2815 : 16 : break;
2816 : : }
2817 : : }
2818 : :
2819 [ + + + - ]: 28 : if (addpub && !found)
2820 : 9 : oldpublist = lappend(oldpublist, makeString(name));
2821 [ + - + + ]: 19 : else if (!addpub && !found)
2822 [ + - ]: 3 : ereport(ERROR,
2823 : : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2824 : : errmsg("publication \"%s\" is not in subscription \"%s\"",
2825 : : name, subname)));
2826 : : }
2827 : :
2828 : : /*
2829 : : * XXX Probably no strong reason for this, but for now it's to make ALTER
2830 : : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2831 : : */
2832 [ + + ]: 12 : if (!oldpublist)
2833 [ + - ]: 3 : ereport(ERROR,
2834 : : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2835 : : errmsg("cannot drop all the publications from a subscription")));
2836 : :
2837 : 9 : return oldpublist;
2838 : : }
2839 : :
2840 : : /*
2841 : : * Extract the streaming mode value from a DefElem. This is like
2842 : : * defGetBoolean() but also accepts the special value of "parallel".
2843 : : */
2844 : : char
971 akapila@postgresql.o 2845 : 417 : defGetStreamingMode(DefElem *def)
2846 : : {
2847 : : /*
2848 : : * If no parameter value given, assume "true" is meant.
2849 : : */
2850 [ - + ]: 417 : if (!def->arg)
971 akapila@postgresql.o 2851 :UBC 0 : return LOGICALREP_STREAM_ON;
2852 : :
2853 : : /*
2854 : : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2855 : : */
971 akapila@postgresql.o 2856 [ - + ]:CBC 417 : switch (nodeTag(def->arg))
2857 : : {
971 akapila@postgresql.o 2858 :UBC 0 : case T_Integer:
2859 [ # # # ]: 0 : switch (intVal(def->arg))
2860 : : {
2861 : 0 : case 0:
2862 : 0 : return LOGICALREP_STREAM_OFF;
2863 : 0 : case 1:
2864 : 0 : return LOGICALREP_STREAM_ON;
2865 : 0 : default:
2866 : : /* otherwise, error out below */
2867 : 0 : break;
2868 : : }
2869 : 0 : break;
971 akapila@postgresql.o 2870 :CBC 417 : default:
2871 : : {
2872 : 417 : char *sval = defGetString(def);
2873 : :
2874 : : /*
2875 : : * The set of strings accepted here should match up with the
2876 : : * grammar's opt_boolean_or_string production.
2877 : : */
2878 [ + + + + ]: 831 : if (pg_strcasecmp(sval, "false") == 0 ||
2879 : 414 : pg_strcasecmp(sval, "off") == 0)
2880 : 6 : return LOGICALREP_STREAM_OFF;
2881 [ + + + + ]: 813 : if (pg_strcasecmp(sval, "true") == 0 ||
2882 : 402 : pg_strcasecmp(sval, "on") == 0)
2883 : 46 : return LOGICALREP_STREAM_ON;
2884 [ + + ]: 365 : if (pg_strcasecmp(sval, "parallel") == 0)
2885 : 362 : return LOGICALREP_STREAM_PARALLEL;
2886 : : }
2887 : 3 : break;
2888 : : }
2889 : :
2890 [ + - ]: 3 : ereport(ERROR,
2891 : : (errcode(ERRCODE_SYNTAX_ERROR),
2892 : : errmsg("%s requires a Boolean value or \"parallel\"",
2893 : : def->defname)));
2894 : : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2895 : : }
|