Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_createsubscriber.c
4 : : * Create a new logical replica from a standby server
5 : : *
6 : : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres_fe.h"
15 : :
16 : : #include <sys/stat.h>
17 : : #include <sys/time.h>
18 : : #include <sys/wait.h>
19 : : #include <time.h>
20 : :
21 : : #include "common/connect.h"
22 : : #include "common/controldata_utils.h"
23 : : #include "common/logging.h"
24 : : #include "common/pg_prng.h"
25 : : #include "common/restricted_token.h"
26 : : #include "fe_utils/recovery_gen.h"
27 : : #include "fe_utils/simple_list.h"
28 : : #include "fe_utils/string_utils.h"
29 : : #include "getopt_long.h"
30 : :
31 : : #define DEFAULT_SUB_PORT "50432"
32 : : #define OBJECTTYPE_PUBLICATIONS 0x0001
33 : :
34 : : /* Command-line options */
35 : : struct CreateSubscriberOptions
36 : : {
37 : : char *config_file; /* configuration file */
38 : : char *pub_conninfo_str; /* publisher connection string */
39 : : char *socket_dir; /* directory for Unix-domain socket, if any */
40 : : char *sub_port; /* subscriber port number */
41 : : const char *sub_username; /* subscriber username */
42 : : bool two_phase; /* enable-two-phase option */
43 : : SimpleStringList database_names; /* list of database names */
44 : : SimpleStringList pub_names; /* list of publication names */
45 : : SimpleStringList sub_names; /* list of subscription names */
46 : : SimpleStringList replslot_names; /* list of replication slot names */
47 : : int recovery_timeout; /* stop recovery after this time */
48 : : bool all_dbs; /* all option */
49 : : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
50 : : };
51 : :
52 : : /* per-database publication/subscription info */
53 : : struct LogicalRepInfo
54 : : {
55 : : char *dbname; /* database name */
56 : : char *pubconninfo; /* publisher connection string */
57 : : char *subconninfo; /* subscriber connection string */
58 : : char *pubname; /* publication name */
59 : : char *subname; /* subscription name */
60 : : char *replslotname; /* replication slot name */
61 : :
62 : : bool made_replslot; /* replication slot was created */
63 : : bool made_publication; /* publication was created */
64 : : };
65 : :
66 : : /*
67 : : * Information shared across all the databases (or publications and
68 : : * subscriptions).
69 : : */
70 : : struct LogicalRepInfos
71 : : {
72 : : struct LogicalRepInfo *dbinfo;
73 : : bool two_phase; /* enable-two-phase option */
74 : : bits32 objecttypes_to_clean; /* flags indicating which object types
75 : : * to clean up on subscriber */
76 : : };
77 : :
78 : : static void cleanup_objects_atexit(void);
79 : : static void usage();
80 : : static char *get_base_conninfo(const char *conninfo, char **dbname);
81 : : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
82 : : static char *get_exec_path(const char *argv0, const char *progname);
83 : : static void check_data_directory(const char *datadir);
84 : : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
85 : : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
86 : : const char *pub_base_conninfo,
87 : : const char *sub_base_conninfo);
88 : : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
89 : : static void disconnect_database(PGconn *conn, bool exit_on_error);
90 : : static uint64 get_primary_sysid(const char *conninfo);
91 : : static uint64 get_standby_sysid(const char *datadir);
92 : : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
93 : : static bool server_is_in_recovery(PGconn *conn);
94 : : static char *generate_object_name(PGconn *conn);
95 : : static void check_publisher(const struct LogicalRepInfo *dbinfo);
96 : : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
97 : : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
98 : : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
99 : : const char *consistent_lsn);
100 : : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
101 : : const char *lsn);
102 : : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
103 : : const char *slotname);
104 : : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
105 : : static char *create_logical_replication_slot(PGconn *conn,
106 : : struct LogicalRepInfo *dbinfo);
107 : : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
108 : : const char *slot_name);
109 : : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
110 : : static void start_standby_server(const struct CreateSubscriberOptions *opt,
111 : : bool restricted_access,
112 : : bool restrict_logical_worker);
113 : : static void stop_standby_server(const char *datadir);
114 : : static void wait_for_end_recovery(const char *conninfo,
115 : : const struct CreateSubscriberOptions *opt);
116 : : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
117 : : static void drop_publication(PGconn *conn, const char *pubname,
118 : : const char *dbname, bool *made_publication);
119 : : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
120 : : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
121 : : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
122 : : const char *lsn);
123 : : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
124 : : static void check_and_drop_existing_subscriptions(PGconn *conn,
125 : : const struct LogicalRepInfo *dbinfo);
126 : : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
127 : : const char *dbname);
128 : : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
129 : : bool dbnamespecified);
130 : :
131 : : #define USEC_PER_SEC 1000000
132 : : #define WAIT_INTERVAL 1 /* 1 second */
133 : :
134 : : static const char *progname;
135 : :
136 : : static char *primary_slot_name = NULL;
137 : : static bool dry_run = false;
138 : :
139 : : static bool success = false;
140 : :
141 : : static struct LogicalRepInfos dbinfos;
142 : : static int num_dbs = 0; /* number of specified databases */
143 : : static int num_pubs = 0; /* number of specified publications */
144 : : static int num_subs = 0; /* number of specified subscriptions */
145 : : static int num_replslots = 0; /* number of specified replication slots */
146 : :
147 : : static pg_prng_state prng_state;
148 : :
149 : : static char *pg_ctl_path = NULL;
150 : : static char *pg_resetwal_path = NULL;
151 : :
152 : : /* standby / subscriber data directory */
153 : : static char *subscriber_dir = NULL;
154 : :
155 : : static bool recovery_ended = false;
156 : : static bool standby_running = false;
157 : :
158 : : enum WaitPMResult
159 : : {
160 : : POSTMASTER_READY,
161 : : POSTMASTER_STILL_STARTING
162 : : };
163 : :
164 : :
165 : : /*
166 : : * Cleanup objects that were created by pg_createsubscriber if there is an
167 : : * error.
168 : : *
169 : : * Publications and replication slots are created on primary. Depending on the
170 : : * step it failed, it should remove the already created objects if it is
171 : : * possible (sometimes it won't work due to a connection issue).
172 : : * There is no cleanup on the target server. The steps on the target server are
173 : : * executed *after* promotion, hence, at this point, a failure means recreate
174 : : * the physical replica and start again.
175 : : */
176 : : static void
530 peter@eisentraut.org 177 :CBC 10 : cleanup_objects_atexit(void)
178 : : {
179 [ + + ]: 10 : if (success)
180 : 4 : return;
181 : :
182 : : /*
183 : : * If the server is promoted, there is no way to use the current setup
184 : : * again. Warn the user that a new replication setup should be done before
185 : : * trying again.
186 : : */
187 [ - + ]: 6 : if (recovery_ended)
188 : : {
530 peter@eisentraut.org 189 :UBC 0 : pg_log_warning("failed after the end of recovery");
190 : 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
191 : : "You must recreate the physical replica before continuing.");
192 : : }
193 : :
530 peter@eisentraut.org 194 [ + + ]:CBC 18 : for (int i = 0; i < num_dbs; i++)
195 : : {
174 michael@paquier.xyz 196 : 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
197 : :
198 [ + - - + ]: 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
199 : : {
200 : : PGconn *conn;
201 : :
174 michael@paquier.xyz 202 :UBC 0 : conn = connect_database(dbinfo->pubconninfo, false);
530 peter@eisentraut.org 203 [ # # ]: 0 : if (conn != NULL)
204 : : {
174 michael@paquier.xyz 205 [ # # ]: 0 : if (dbinfo->made_publication)
170 akapila@postgresql.o 206 : 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
207 : : &dbinfo->made_publication);
174 michael@paquier.xyz 208 [ # # ]: 0 : if (dbinfo->made_replslot)
209 : 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
530 peter@eisentraut.org 210 : 0 : disconnect_database(conn, false);
211 : : }
212 : : else
213 : : {
214 : : /*
215 : : * If a connection could not be established, inform the user
216 : : * that some objects were left on primary and should be
217 : : * removed before trying again.
218 : : */
174 michael@paquier.xyz 219 [ # # ]: 0 : if (dbinfo->made_publication)
220 : : {
378 peter@eisentraut.org 221 : 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
222 : : dbinfo->pubname,
223 : : dbinfo->dbname);
224 : 0 : pg_log_warning_hint("Drop this publication before trying again.");
225 : : }
174 michael@paquier.xyz 226 [ # # ]: 0 : if (dbinfo->made_replslot)
227 : : {
378 peter@eisentraut.org 228 : 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
229 : : dbinfo->replslotname,
230 : : dbinfo->dbname);
530 231 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
232 : : }
233 : : }
234 : : }
235 : : }
236 : :
530 peter@eisentraut.org 237 [ + + ]:CBC 6 : if (standby_running)
238 : 4 : stop_standby_server(subscriber_dir);
239 : : }
240 : :
241 : : static void
242 : 1 : usage(void)
243 : : {
244 : 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
245 : : progname);
246 : 1 : printf(_("Usage:\n"));
247 : 1 : printf(_(" %s [OPTION]...\n"), progname);
248 : 1 : printf(_("\nOptions:\n"));
162 akapila@postgresql.o 249 : 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
250 : : " databases and databases that don't allow connections\n"));
378 peter@eisentraut.org 251 : 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
443 252 : 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
253 : 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
254 : 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
255 : 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
401 256 : 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
443 257 : 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
192 akapila@postgresql.o 258 : 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
378 peter@eisentraut.org 259 : 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
443 260 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
73 261 : 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
262 : : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
443 263 : 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
264 : : " file when running target cluster\n"));
265 : 1 : printf(_(" --publication=NAME publication name\n"));
266 : 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
267 : 1 : printf(_(" --subscription=NAME subscription name\n"));
268 : 1 : printf(_(" -V, --version output version information, then exit\n"));
269 : 1 : printf(_(" -?, --help show this help, then exit\n"));
530 270 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
271 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
272 : 1 : }
273 : :
274 : : /*
275 : : * Subroutine to append "keyword=value" to a connection string,
276 : : * with proper quoting of the value. (We assume keywords don't need that.)
277 : : */
278 : : static void
433 tgl@sss.pgh.pa.us 279 : 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
280 : : {
281 [ + + ]: 107 : if (buf->len > 0)
282 : 79 : appendPQExpBufferChar(buf, ' ');
283 : 107 : appendPQExpBufferStr(buf, keyword);
284 : 107 : appendPQExpBufferChar(buf, '=');
285 : 107 : appendConnStrVal(buf, val);
286 : 107 : }
287 : :
288 : : /*
289 : : * Validate a connection string. Returns a base connection string that is a
290 : : * connection string without a database name.
291 : : *
292 : : * Since we might process multiple databases, each database name will be
293 : : * appended to this base connection string to provide a final connection
294 : : * string. If the second argument (dbname) is not null, returns dbname if the
295 : : * provided connection string contains it.
296 : : *
297 : : * It is the caller's responsibility to free the returned connection string and
298 : : * dbname.
299 : : */
300 : : static char *
530 peter@eisentraut.org 301 : 14 : get_base_conninfo(const char *conninfo, char **dbname)
302 : : {
303 : : PQExpBuffer buf;
304 : : PQconninfoOption *conn_opts;
305 : : PQconninfoOption *conn_opt;
306 : 14 : char *errmsg = NULL;
307 : : char *ret;
308 : :
309 : 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
310 [ - + ]: 14 : if (conn_opts == NULL)
311 : : {
530 peter@eisentraut.org 312 :UBC 0 : pg_log_error("could not parse connection string: %s", errmsg);
523 tgl@sss.pgh.pa.us 313 : 0 : PQfreemem(errmsg);
530 peter@eisentraut.org 314 : 0 : return NULL;
315 : : }
316 : :
523 tgl@sss.pgh.pa.us 317 :CBC 14 : buf = createPQExpBuffer();
530 peter@eisentraut.org 318 [ + + ]: 728 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
319 : : {
320 [ + + + - ]: 714 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
321 : : {
433 tgl@sss.pgh.pa.us 322 [ + + ]: 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
323 : : {
324 [ + - ]: 9 : if (dbname)
325 : 9 : *dbname = pg_strdup(conn_opt->val);
326 : 9 : continue;
327 : : }
328 : 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
329 : : }
330 : : }
331 : :
530 peter@eisentraut.org 332 : 14 : ret = pg_strdup(buf->data);
333 : :
334 : 14 : destroyPQExpBuffer(buf);
335 : 14 : PQconninfoFree(conn_opts);
336 : :
337 : 14 : return ret;
338 : : }
339 : :
340 : : /*
341 : : * Build a subscriber connection string. Only a few parameters are supported
342 : : * since it starts a server with restricted access.
343 : : */
344 : : static char *
345 : 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
346 : : {
347 : 14 : PQExpBuffer buf = createPQExpBuffer();
348 : : char *ret;
349 : :
433 tgl@sss.pgh.pa.us 350 : 14 : appendConnStrItem(buf, "port", opt->sub_port);
351 : : #if !defined(WIN32)
352 : 14 : appendConnStrItem(buf, "host", opt->socket_dir);
353 : : #endif
530 peter@eisentraut.org 354 [ - + ]: 14 : if (opt->sub_username != NULL)
433 tgl@sss.pgh.pa.us 355 :UBC 0 : appendConnStrItem(buf, "user", opt->sub_username);
433 tgl@sss.pgh.pa.us 356 :CBC 14 : appendConnStrItem(buf, "fallback_application_name", progname);
357 : :
530 peter@eisentraut.org 358 : 14 : ret = pg_strdup(buf->data);
359 : :
360 : 14 : destroyPQExpBuffer(buf);
361 : :
362 : 14 : return ret;
363 : : }
364 : :
365 : : /*
366 : : * Verify if a PostgreSQL binary (progname) is available in the same directory as
367 : : * pg_createsubscriber and it has the same version. It returns the absolute
368 : : * path of the progname.
369 : : */
370 : : static char *
371 : 20 : get_exec_path(const char *argv0, const char *progname)
372 : : {
373 : : char *versionstr;
374 : : char *exec_path;
375 : : int ret;
376 : :
377 : 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
378 : 20 : exec_path = pg_malloc(MAXPGPATH);
379 : 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
380 : :
381 [ - + ]: 20 : if (ret < 0)
382 : : {
383 : : char full_path[MAXPGPATH];
384 : :
530 peter@eisentraut.org 385 [ # # ]:UBC 0 : if (find_my_exec(argv0, full_path) < 0)
386 : 0 : strlcpy(full_path, progname, sizeof(full_path));
387 : :
388 [ # # ]: 0 : if (ret == -1)
389 : 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
390 : : progname, "pg_createsubscriber", full_path);
391 : : else
392 : 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
393 : : progname, full_path, "pg_createsubscriber");
394 : : }
395 : :
530 peter@eisentraut.org 396 [ + + ]:CBC 20 : pg_log_debug("%s path is: %s", progname, exec_path);
397 : :
398 : 20 : return exec_path;
399 : : }
400 : :
401 : : /*
402 : : * Is it a cluster directory? These are preliminary checks. It is far from
403 : : * making an accurate check. If it is not a clone from the publisher, it will
404 : : * eventually fail in a future step.
405 : : */
406 : : static void
407 : 10 : check_data_directory(const char *datadir)
408 : : {
409 : : struct stat statbuf;
410 : : char versionfile[MAXPGPATH];
411 : :
412 : 10 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
413 : : datadir);
414 : :
415 [ - + ]: 10 : if (stat(datadir, &statbuf) != 0)
416 : : {
530 peter@eisentraut.org 417 [ # # ]:UBC 0 : if (errno == ENOENT)
418 : 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
419 : : else
529 420 : 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
421 : : }
422 : :
530 peter@eisentraut.org 423 :CBC 10 : snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
424 [ - + - - ]: 10 : if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
425 : : {
530 peter@eisentraut.org 426 :UBC 0 : pg_fatal("directory \"%s\" is not a database cluster directory",
427 : : datadir);
428 : : }
530 peter@eisentraut.org 429 :CBC 10 : }
430 : :
431 : : /*
432 : : * Append database name into a base connection string.
433 : : *
434 : : * dbname is the only parameter that changes so it is not included in the base
435 : : * connection string. This function concatenates dbname to build a "real"
436 : : * connection string.
437 : : */
438 : : static char *
439 : 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
440 : : {
441 : 41 : PQExpBuffer buf = createPQExpBuffer();
442 : : char *ret;
443 : :
444 [ - + ]: 41 : Assert(conninfo != NULL);
445 : :
446 : 41 : appendPQExpBufferStr(buf, conninfo);
433 tgl@sss.pgh.pa.us 447 : 41 : appendConnStrItem(buf, "dbname", dbname);
448 : :
530 peter@eisentraut.org 449 : 41 : ret = pg_strdup(buf->data);
450 : 41 : destroyPQExpBuffer(buf);
451 : :
452 : 41 : return ret;
453 : : }
454 : :
455 : : /*
456 : : * Store publication and subscription information.
457 : : *
458 : : * If publication, replication slot and subscription names were specified,
459 : : * store it here. Otherwise, a generated name will be assigned to the object in
460 : : * setup_publisher().
461 : : */
462 : : static struct LogicalRepInfo *
463 : 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
464 : : const char *pub_base_conninfo,
465 : : const char *sub_base_conninfo)
466 : : {
467 : : struct LogicalRepInfo *dbinfo;
468 : 10 : SimpleStringListCell *pubcell = NULL;
469 : 10 : SimpleStringListCell *subcell = NULL;
470 : 10 : SimpleStringListCell *replslotcell = NULL;
471 : 10 : int i = 0;
472 : :
473 : 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
474 : :
475 [ + + ]: 10 : if (num_pubs > 0)
476 : 2 : pubcell = opt->pub_names.head;
477 [ + + ]: 10 : if (num_subs > 0)
478 : 1 : subcell = opt->sub_names.head;
479 [ + + ]: 10 : if (num_replslots > 0)
480 : 2 : replslotcell = opt->replslot_names.head;
481 : :
482 [ + + ]: 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
483 : : {
484 : : char *conninfo;
485 : :
486 : : /* Fill publisher attributes */
487 : 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
488 : 20 : dbinfo[i].pubconninfo = conninfo;
489 : 20 : dbinfo[i].dbname = cell->val;
490 [ + + ]: 20 : if (num_pubs > 0)
491 : 4 : dbinfo[i].pubname = pubcell->val;
492 : : else
493 : 16 : dbinfo[i].pubname = NULL;
494 [ + + ]: 20 : if (num_replslots > 0)
495 : 3 : dbinfo[i].replslotname = replslotcell->val;
496 : : else
497 : 17 : dbinfo[i].replslotname = NULL;
498 : 20 : dbinfo[i].made_replslot = false;
499 : 20 : dbinfo[i].made_publication = false;
500 : : /* Fill subscriber attributes */
501 : 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
502 : 20 : dbinfo[i].subconninfo = conninfo;
503 [ + + ]: 20 : if (num_subs > 0)
504 : 2 : dbinfo[i].subname = subcell->val;
505 : : else
506 : 18 : dbinfo[i].subname = NULL;
507 : : /* Other fields will be filled later */
508 : :
509 [ + + + - : 20 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ - ]
510 : : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
511 : : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
512 : : dbinfo[i].pubconninfo);
192 akapila@postgresql.o 513 [ + + + - : 20 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
- + ]
514 : : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
515 : : dbinfo[i].subconninfo,
516 : : dbinfos.two_phase ? "true" : "false");
517 : :
530 peter@eisentraut.org 518 [ + + ]: 20 : if (num_pubs > 0)
519 : 4 : pubcell = pubcell->next;
520 [ + + ]: 20 : if (num_subs > 0)
521 : 2 : subcell = subcell->next;
522 [ + + ]: 20 : if (num_replslots > 0)
523 : 3 : replslotcell = replslotcell->next;
524 : :
525 : 20 : i++;
526 : : }
527 : :
528 : 10 : return dbinfo;
529 : : }
530 : :
531 : : /*
532 : : * Open a new connection. If exit_on_error is true, it has an undesired
533 : : * condition and it should exit immediately.
534 : : */
535 : : static PGconn *
536 : 57 : connect_database(const char *conninfo, bool exit_on_error)
537 : : {
538 : : PGconn *conn;
539 : : PGresult *res;
540 : :
541 : 57 : conn = PQconnectdb(conninfo);
542 [ - + ]: 57 : if (PQstatus(conn) != CONNECTION_OK)
543 : : {
530 peter@eisentraut.org 544 :UBC 0 : pg_log_error("connection to database failed: %s",
545 : : PQerrorMessage(conn));
523 tgl@sss.pgh.pa.us 546 : 0 : PQfinish(conn);
547 : :
530 peter@eisentraut.org 548 [ # # ]: 0 : if (exit_on_error)
549 : 0 : exit(1);
550 : 0 : return NULL;
551 : : }
552 : :
553 : : /* Secure search_path */
530 peter@eisentraut.org 554 :CBC 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
555 [ - + ]: 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
556 : : {
367 michael@paquier.xyz 557 :UBC 0 : pg_log_error("could not clear \"search_path\": %s",
558 : : PQresultErrorMessage(res));
523 tgl@sss.pgh.pa.us 559 : 0 : PQclear(res);
560 : 0 : PQfinish(conn);
561 : :
530 peter@eisentraut.org 562 [ # # ]: 0 : if (exit_on_error)
563 : 0 : exit(1);
564 : 0 : return NULL;
565 : : }
530 peter@eisentraut.org 566 :CBC 57 : PQclear(res);
567 : :
568 : 57 : return conn;
569 : : }
570 : :
571 : : /*
572 : : * Close the connection. If exit_on_error is true, it has an undesired
573 : : * condition and it should exit immediately.
574 : : */
575 : : static void
576 : 57 : disconnect_database(PGconn *conn, bool exit_on_error)
577 : : {
578 [ - + ]: 57 : Assert(conn != NULL);
579 : :
580 : 57 : PQfinish(conn);
581 : :
582 [ + + ]: 57 : if (exit_on_error)
583 : 2 : exit(1);
584 : 55 : }
585 : :
586 : : /*
587 : : * Obtain the system identifier using the provided connection. It will be used
588 : : * to compare if a data directory is a clone of another one.
589 : : */
590 : : static uint64
591 : 10 : get_primary_sysid(const char *conninfo)
592 : : {
593 : : PGconn *conn;
594 : : PGresult *res;
595 : : uint64 sysid;
596 : :
597 : 10 : pg_log_info("getting system identifier from publisher");
598 : :
599 : 10 : conn = connect_database(conninfo, true);
600 : :
601 : 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
602 [ - + ]: 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
603 : : {
530 peter@eisentraut.org 604 :UBC 0 : pg_log_error("could not get system identifier: %s",
605 : : PQresultErrorMessage(res));
606 : 0 : disconnect_database(conn, true);
607 : : }
530 peter@eisentraut.org 608 [ - + ]:CBC 10 : if (PQntuples(res) != 1)
609 : : {
530 peter@eisentraut.org 610 :UBC 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
611 : : PQntuples(res), 1);
612 : 0 : disconnect_database(conn, true);
613 : : }
614 : :
530 peter@eisentraut.org 615 :CBC 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
616 : :
161 617 : 10 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
618 : :
530 619 : 10 : PQclear(res);
620 : 10 : disconnect_database(conn, false);
621 : :
622 : 10 : return sysid;
623 : : }
624 : :
625 : : /*
626 : : * Obtain the system identifier from control file. It will be used to compare
627 : : * if a data directory is a clone of another one. This routine is used locally
628 : : * and avoids a connection.
629 : : */
630 : : static uint64
631 : 10 : get_standby_sysid(const char *datadir)
632 : : {
633 : : ControlFileData *cf;
634 : : bool crc_ok;
635 : : uint64 sysid;
636 : :
637 : 10 : pg_log_info("getting system identifier from subscriber");
638 : :
639 : 10 : cf = get_controlfile(datadir, &crc_ok);
640 [ - + ]: 10 : if (!crc_ok)
530 peter@eisentraut.org 641 :UBC 0 : pg_fatal("control file appears to be corrupt");
642 : :
530 peter@eisentraut.org 643 :CBC 10 : sysid = cf->system_identifier;
644 : :
161 645 : 10 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
646 : :
530 647 : 10 : pg_free(cf);
648 : :
649 : 10 : return sysid;
650 : : }
651 : :
652 : : /*
653 : : * Modify the system identifier. Since a standby server preserves the system
654 : : * identifier, it makes sense to change it to avoid situations in which WAL
655 : : * files from one of the systems might be used in the other one.
656 : : */
657 : : static void
658 : 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
659 : : {
660 : : ControlFileData *cf;
661 : : bool crc_ok;
662 : : struct timeval tv;
663 : :
664 : : char *cmd_str;
665 : :
666 : 4 : pg_log_info("modifying system identifier of subscriber");
667 : :
668 : 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
669 [ - + ]: 4 : if (!crc_ok)
530 peter@eisentraut.org 670 :UBC 0 : pg_fatal("control file appears to be corrupt");
671 : :
672 : : /*
673 : : * Select a new system identifier.
674 : : *
675 : : * XXX this code was extracted from BootStrapXLOG().
676 : : */
530 peter@eisentraut.org 677 :CBC 4 : gettimeofday(&tv, NULL);
678 : 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
679 : 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
680 : 4 : cf->system_identifier |= getpid() & 0xFFF;
681 : :
682 [ + + ]: 4 : if (!dry_run)
683 : 1 : update_controlfile(subscriber_dir, cf, true);
684 : :
161 685 : 4 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
686 : : cf->system_identifier);
687 : :
530 688 : 4 : pg_log_info("running pg_resetwal on the subscriber");
689 : :
690 : 4 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
691 : : subscriber_dir, DEVNULL);
692 : :
693 [ + + ]: 4 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
694 : :
695 [ + + ]: 4 : if (!dry_run)
696 : : {
697 : 1 : int rc = system(cmd_str);
698 : :
699 [ + - ]: 1 : if (rc == 0)
700 : 1 : pg_log_info("subscriber successfully changed the system identifier");
701 : : else
378 peter@eisentraut.org 702 :UBC 0 : pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
703 : : }
704 : :
530 peter@eisentraut.org 705 :CBC 4 : pg_free(cf);
706 : 4 : }
707 : :
708 : : /*
709 : : * Generate an object name using a prefix, database oid and a random integer.
710 : : * It is used in case the user does not specify an object name (publication,
711 : : * subscription, replication slot).
712 : : */
713 : : static char *
714 : 8 : generate_object_name(PGconn *conn)
715 : : {
716 : : PGresult *res;
717 : : Oid oid;
718 : : uint32 rand;
719 : : char *objname;
720 : :
721 : 8 : res = PQexec(conn,
722 : : "SELECT oid FROM pg_catalog.pg_database "
723 : : "WHERE datname = pg_catalog.current_database()");
724 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
725 : : {
530 peter@eisentraut.org 726 :UBC 0 : pg_log_error("could not obtain database OID: %s",
727 : : PQresultErrorMessage(res));
728 : 0 : disconnect_database(conn, true);
729 : : }
730 : :
530 peter@eisentraut.org 731 [ - + ]:CBC 8 : if (PQntuples(res) != 1)
732 : : {
529 peter@eisentraut.org 733 :UBC 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
734 : : PQntuples(res), 1);
530 735 : 0 : disconnect_database(conn, true);
736 : : }
737 : :
738 : : /* Database OID */
530 peter@eisentraut.org 739 :CBC 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
740 : :
741 : 8 : PQclear(res);
742 : :
743 : : /* Random unsigned integer */
744 : 8 : rand = pg_prng_uint32(&prng_state);
745 : :
746 : : /*
747 : : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
748 : : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
749 : : * '\0').
750 : : */
751 : 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
752 : :
753 : 8 : return objname;
754 : : }
755 : :
756 : : /*
757 : : * Create the publications and replication slots in preparation for logical
758 : : * replication. Returns the LSN from latest replication slot. It will be the
759 : : * replication start point that is used to adjust the subscriptions (see
760 : : * set_replication_progress).
761 : : */
762 : : static char *
763 : 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
764 : : {
765 : 4 : char *lsn = NULL;
766 : :
767 : 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
768 : :
769 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
770 : : {
771 : : PGconn *conn;
772 : 8 : char *genname = NULL;
773 : :
774 : 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
775 : :
776 : : /*
777 : : * If an object name was not specified as command-line options, assign
778 : : * a generated object name. The replication slot has a different rule.
779 : : * The subscription name is assigned to the replication slot name if
780 : : * no replication slot is specified. It follows the same rule as
781 : : * CREATE SUBSCRIPTION.
782 : : */
783 [ + + + + : 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+ - ]
784 : 8 : genname = generate_object_name(conn);
785 [ + + ]: 8 : if (num_pubs == 0)
786 : 4 : dbinfo[i].pubname = pg_strdup(genname);
787 [ + + ]: 8 : if (num_subs == 0)
788 : 6 : dbinfo[i].subname = pg_strdup(genname);
789 [ + + ]: 8 : if (num_replslots == 0)
790 : 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
791 : :
792 : : /*
793 : : * Create publication on publisher. This step should be executed
794 : : * *before* promoting the subscriber to avoid any transactions between
795 : : * consistent LSN and the new publication rows (such transactions
796 : : * wouldn't see the new publication rows resulting in an error).
797 : : */
798 : 8 : create_publication(conn, &dbinfo[i]);
799 : :
800 : : /* Create replication slot on publisher */
801 [ + + ]: 8 : if (lsn)
802 : 1 : pg_free(lsn);
803 : 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
804 [ + + + - ]: 8 : if (lsn != NULL || dry_run)
805 : 8 : pg_log_info("create replication slot \"%s\" on publisher",
806 : : dbinfo[i].replslotname);
807 : : else
530 peter@eisentraut.org 808 :UBC 0 : exit(1);
809 : :
810 : : /*
811 : : * Since we are using the LSN returned by the last replication slot as
812 : : * recovery_target_lsn, this LSN is ahead of the current WAL position
813 : : * and the recovery waits until the publisher writes a WAL record to
814 : : * reach the target and ends the recovery. On idle systems, this wait
815 : : * time is unpredictable and could lead to failure in promoting the
816 : : * subscriber. To avoid that, insert a harmless WAL record.
817 : : */
403 akapila@postgresql.o 818 [ + + + + ]:CBC 8 : if (i == num_dbs - 1 && !dry_run)
819 : : {
820 : : PGresult *res;
821 : :
822 : 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
823 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
824 : : {
403 akapila@postgresql.o 825 :UBC 0 : pg_log_error("could not write an additional WAL record: %s",
826 : : PQresultErrorMessage(res));
827 : 0 : disconnect_database(conn, true);
828 : : }
403 akapila@postgresql.o 829 :CBC 1 : PQclear(res);
830 : : }
831 : :
530 peter@eisentraut.org 832 : 8 : disconnect_database(conn, false);
833 : : }
834 : :
835 : 4 : return lsn;
836 : : }
837 : :
838 : : /*
839 : : * Is recovery still in progress?
840 : : */
841 : : static bool
842 : 18 : server_is_in_recovery(PGconn *conn)
843 : : {
844 : : PGresult *res;
845 : : int ret;
846 : :
847 : 18 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
848 : :
849 [ - + ]: 18 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
850 : : {
530 peter@eisentraut.org 851 :UBC 0 : pg_log_error("could not obtain recovery progress: %s",
852 : : PQresultErrorMessage(res));
853 : 0 : disconnect_database(conn, true);
854 : : }
855 : :
856 : :
530 peter@eisentraut.org 857 :CBC 18 : ret = strcmp("t", PQgetvalue(res, 0, 0));
858 : :
859 : 18 : PQclear(res);
860 : :
861 : 18 : return ret == 0;
862 : : }
863 : :
864 : : /*
865 : : * Is the primary server ready for logical replication?
866 : : *
867 : : * XXX Does it not allow a synchronous replica?
868 : : */
869 : : static void
870 : 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
871 : : {
872 : : PGconn *conn;
873 : : PGresult *res;
874 : 6 : bool failed = false;
875 : :
876 : : char *wal_level;
877 : : int max_repslots;
878 : : int cur_repslots;
879 : : int max_walsenders;
880 : : int cur_walsenders;
881 : : int max_prepared_transactions;
882 : : char *max_slot_wal_keep_size;
883 : :
884 : 6 : pg_log_info("checking settings on publisher");
885 : :
886 : 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
887 : :
888 : : /*
889 : : * If the primary server is in recovery (i.e. cascading replication),
890 : : * objects (publication) cannot be created because it is read only.
891 : : */
892 [ + + ]: 6 : if (server_is_in_recovery(conn))
893 : : {
894 : 1 : pg_log_error("primary server cannot be in recovery");
895 : 1 : disconnect_database(conn, true);
896 : : }
897 : :
898 : : /*------------------------------------------------------------------------
899 : : * Logical replication requires a few parameters to be set on publisher.
900 : : * Since these parameters are not a requirement for physical replication,
901 : : * we should check it to make sure it won't fail.
902 : : *
903 : : * - wal_level = logical
904 : : * - max_replication_slots >= current + number of dbs to be converted
905 : : * - max_wal_senders >= current + number of dbs to be converted
906 : : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
907 : : * -----------------------------------------------------------------------
908 : : */
909 : 5 : res = PQexec(conn,
910 : : "SELECT pg_catalog.current_setting('wal_level'),"
911 : : " pg_catalog.current_setting('max_replication_slots'),"
912 : : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
913 : : " pg_catalog.current_setting('max_wal_senders'),"
914 : : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
915 : : " pg_catalog.current_setting('max_prepared_transactions'),"
916 : : " pg_catalog.current_setting('max_slot_wal_keep_size')");
917 : :
918 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
919 : : {
530 peter@eisentraut.org 920 :UBC 0 : pg_log_error("could not obtain publisher settings: %s",
921 : : PQresultErrorMessage(res));
922 : 0 : disconnect_database(conn, true);
923 : : }
924 : :
530 peter@eisentraut.org 925 :CBC 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
926 : 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
927 : 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
928 : 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
929 : 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
433 tgl@sss.pgh.pa.us 930 : 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
200 akapila@postgresql.o 931 : 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
932 : :
530 peter@eisentraut.org 933 : 5 : PQclear(res);
934 : :
935 [ + + ]: 5 : pg_log_debug("publisher: wal_level: %s", wal_level);
936 [ + + ]: 5 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
937 [ + + ]: 5 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
938 [ + + ]: 5 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
939 [ + + ]: 5 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
433 tgl@sss.pgh.pa.us 940 [ + + ]: 5 : pg_log_debug("publisher: max_prepared_transactions: %d",
941 : : max_prepared_transactions);
200 akapila@postgresql.o 942 [ + + ]: 5 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
943 : : max_slot_wal_keep_size);
944 : :
530 peter@eisentraut.org 945 : 5 : disconnect_database(conn, false);
946 : :
947 [ + + ]: 5 : if (strcmp(wal_level, "logical") != 0)
948 : : {
367 michael@paquier.xyz 949 : 1 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
530 peter@eisentraut.org 950 : 1 : failed = true;
951 : : }
952 : :
953 [ + + ]: 5 : if (max_repslots - cur_repslots < num_dbs)
954 : : {
955 : 1 : pg_log_error("publisher requires %d replication slots, but only %d remain",
956 : : num_dbs, max_repslots - cur_repslots);
407 957 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
958 : : "max_replication_slots", cur_repslots + num_dbs);
530 959 : 1 : failed = true;
960 : : }
961 : :
962 [ + + ]: 5 : if (max_walsenders - cur_walsenders < num_dbs)
963 : : {
378 964 : 1 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
965 : : num_dbs, max_walsenders - cur_walsenders);
407 966 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
967 : : "max_wal_senders", cur_walsenders + num_dbs);
530 968 : 1 : failed = true;
969 : : }
970 : :
192 akapila@postgresql.o 971 [ - + - - ]: 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
972 : : {
378 peter@eisentraut.org 973 :UBC 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
433 tgl@sss.pgh.pa.us 974 : 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
975 : : "Prepared transactions will be replicated at COMMIT PREPARED.");
82 peter@eisentraut.org 976 : 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
977 : : }
978 : :
979 : : /*
980 : : * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
981 : : * non-default value, it may cause replication failures due to required
982 : : * WAL files being prematurely removed.
983 : : */
200 akapila@postgresql.o 984 [ + + - + ]:CBC 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
985 : : {
200 akapila@postgresql.o 986 :UBC 0 : pg_log_warning("required WAL could be removed from the publisher");
987 : 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
988 : : "max_slot_wal_keep_size");
989 : : }
990 : :
523 tgl@sss.pgh.pa.us 991 :CBC 5 : pg_free(wal_level);
992 : :
530 peter@eisentraut.org 993 [ + + ]: 5 : if (failed)
994 : 1 : exit(1);
995 : 4 : }
996 : :
997 : : /*
998 : : * Is the standby server ready for logical replication?
999 : : *
1000 : : * XXX Does it not allow a time-delayed replica?
1001 : : *
1002 : : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1003 : : * is S, it cannot detect there is a replica (server C) because server S starts
1004 : : * accepting only local connections and server C cannot connect to it. Hence,
1005 : : * there is not a reliable way to provide a suitable error saying the server C
1006 : : * will be broken at the end of this process (due to pg_resetwal).
1007 : : */
1008 : : static void
1009 : 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1010 : : {
1011 : : PGconn *conn;
1012 : : PGresult *res;
1013 : 8 : bool failed = false;
1014 : :
1015 : : int max_lrworkers;
1016 : : int max_reporigins;
1017 : : int max_wprocs;
1018 : :
1019 : 8 : pg_log_info("checking settings on subscriber");
1020 : :
1021 : 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1022 : :
1023 : : /* The target server must be a standby */
1024 [ + + ]: 8 : if (!server_is_in_recovery(conn))
1025 : : {
1026 : 1 : pg_log_error("target server must be a standby");
1027 : 1 : disconnect_database(conn, true);
1028 : : }
1029 : :
1030 : : /*------------------------------------------------------------------------
1031 : : * Logical replication requires a few parameters to be set on subscriber.
1032 : : * Since these parameters are not a requirement for physical replication,
1033 : : * we should check it to make sure it won't fail.
1034 : : *
1035 : : * - max_active_replication_origins >= number of dbs to be converted
1036 : : * - max_logical_replication_workers >= number of dbs to be converted
1037 : : * - max_worker_processes >= 1 + number of dbs to be converted
1038 : : *------------------------------------------------------------------------
1039 : : */
1040 : 7 : res = PQexec(conn,
1041 : : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1042 : : "'max_logical_replication_workers', "
1043 : : "'max_active_replication_origins', "
1044 : : "'max_worker_processes', "
1045 : : "'primary_slot_name') "
1046 : : "ORDER BY name");
1047 : :
1048 [ - + ]: 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1049 : : {
530 peter@eisentraut.org 1050 :UBC 0 : pg_log_error("could not obtain subscriber settings: %s",
1051 : : PQresultErrorMessage(res));
1052 : 0 : disconnect_database(conn, true);
1053 : : }
1054 : :
169 msawada@postgresql.o 1055 :CBC 7 : max_reporigins = atoi(PQgetvalue(res, 0, 0));
1056 : 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
530 peter@eisentraut.org 1057 : 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1058 [ + + ]: 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1059 : 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1060 : :
1061 [ + + ]: 7 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1062 : : max_lrworkers);
169 msawada@postgresql.o 1063 [ + + ]: 7 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
530 peter@eisentraut.org 1064 [ + + ]: 7 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1065 [ + + ]: 7 : if (primary_slot_name)
1066 [ + + ]: 6 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1067 : :
1068 : 7 : PQclear(res);
1069 : :
1070 : 7 : disconnect_database(conn, false);
1071 : :
169 msawada@postgresql.o 1072 [ + + ]: 7 : if (max_reporigins < num_dbs)
1073 : : {
1074 : 1 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1075 : : num_dbs, max_reporigins);
407 peter@eisentraut.org 1076 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1077 : : "max_active_replication_origins", num_dbs);
530 1078 : 1 : failed = true;
1079 : : }
1080 : :
1081 [ + + ]: 7 : if (max_lrworkers < num_dbs)
1082 : : {
1083 : 1 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1084 : : num_dbs, max_lrworkers);
407 1085 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1086 : : "max_logical_replication_workers", num_dbs);
530 1087 : 1 : failed = true;
1088 : : }
1089 : :
1090 [ + + ]: 7 : if (max_wprocs < num_dbs + 1)
1091 : : {
1092 : 1 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1093 : : num_dbs + 1, max_wprocs);
407 1094 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1095 : : "max_worker_processes", num_dbs + 1);
530 1096 : 1 : failed = true;
1097 : : }
1098 : :
1099 [ + + ]: 7 : if (failed)
1100 : 1 : exit(1);
1101 : 6 : }
1102 : :
1103 : : /*
1104 : : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1105 : : * the primary (publisher node) and the newly created subscriber. We
1106 : : * shouldn't drop the associated slot as that would be used by the publisher
1107 : : * node.
1108 : : */
1109 : : static void
431 akapila@postgresql.o 1110 : 4 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1111 : : {
1112 : 4 : PQExpBuffer query = createPQExpBuffer();
1113 : : PGresult *res;
1114 : :
1115 [ - + ]: 4 : Assert(conn != NULL);
1116 : :
1117 : : /*
1118 : : * Construct a query string. These commands are allowed to be executed
1119 : : * within a transaction.
1120 : : */
1121 : 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1122 : : subname);
1123 : 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1124 : : subname);
1125 : 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1126 : :
408 peter@eisentraut.org 1127 : 4 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1128 : : subname, dbname);
1129 : :
431 akapila@postgresql.o 1130 [ + + ]: 4 : if (!dry_run)
1131 : : {
1132 : 1 : res = PQexec(conn, query->data);
1133 : :
1134 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1135 : : {
400 alvherre@alvh.no-ip. 1136 :UBC 0 : pg_log_error("could not drop subscription \"%s\": %s",
1137 : : subname, PQresultErrorMessage(res));
431 akapila@postgresql.o 1138 : 0 : disconnect_database(conn, true);
1139 : : }
1140 : :
431 akapila@postgresql.o 1141 :CBC 1 : PQclear(res);
1142 : : }
1143 : :
1144 : 4 : destroyPQExpBuffer(query);
1145 : 4 : }
1146 : :
1147 : : /*
1148 : : * Retrieve and drop the pre-existing subscriptions.
1149 : : */
1150 : : static void
1151 : 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1152 : : const struct LogicalRepInfo *dbinfo)
1153 : : {
1154 : 8 : PQExpBuffer query = createPQExpBuffer();
1155 : : char *dbname;
1156 : : PGresult *res;
1157 : :
1158 [ - + ]: 8 : Assert(conn != NULL);
1159 : :
1160 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1161 : :
1162 : 8 : appendPQExpBuffer(query,
1163 : : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1164 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1165 : : "WHERE d.datname = %s",
1166 : : dbname);
1167 : 8 : res = PQexec(conn, query->data);
1168 : :
1169 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1170 : : {
431 akapila@postgresql.o 1171 :UBC 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1172 : : PQresultErrorMessage(res));
1173 : 0 : disconnect_database(conn, true);
1174 : : }
1175 : :
431 akapila@postgresql.o 1176 [ + + ]:CBC 12 : for (int i = 0; i < PQntuples(res); i++)
1177 : 4 : drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1178 : 4 : dbinfo->dbname);
1179 : :
1180 : 8 : PQclear(res);
1181 : 8 : destroyPQExpBuffer(query);
206 michael@paquier.xyz 1182 : 8 : PQfreemem(dbname);
431 akapila@postgresql.o 1183 : 8 : }
1184 : :
1185 : : /*
1186 : : * Create the subscriptions, adjust the initial location for logical
1187 : : * replication and enable the subscriptions. That's the last step for logical
1188 : : * replication setup.
1189 : : */
1190 : : static void
530 peter@eisentraut.org 1191 : 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1192 : : {
1193 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
1194 : : {
1195 : : PGconn *conn;
1196 : :
1197 : : /* Connect to subscriber. */
1198 : 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1199 : :
1200 : : /*
1201 : : * We don't need the pre-existing subscriptions on the newly formed
1202 : : * subscriber. They can connect to other publisher nodes and either
1203 : : * get some unwarranted data or can lead to ERRORs in connecting to
1204 : : * such nodes.
1205 : : */
431 akapila@postgresql.o 1206 : 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1207 : :
1208 : : /* Check and drop the required publications in the given database. */
170 1209 : 8 : check_and_drop_publications(conn, &dbinfo[i]);
1210 : :
530 peter@eisentraut.org 1211 : 8 : create_subscription(conn, &dbinfo[i]);
1212 : :
1213 : : /* Set the replication progress to the correct LSN */
1214 : 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1215 : :
1216 : : /* Enable subscription */
1217 : 8 : enable_subscription(conn, &dbinfo[i]);
1218 : :
1219 : 8 : disconnect_database(conn, false);
1220 : : }
1221 : 4 : }
1222 : :
1223 : : /*
1224 : : * Write the required recovery parameters.
1225 : : */
1226 : : static void
1227 : 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1228 : : {
1229 : : PGconn *conn;
1230 : : PQExpBuffer recoveryconfcontents;
1231 : :
1232 : : /*
1233 : : * Despite of the recovery parameters will be written to the subscriber,
1234 : : * use a publisher connection. The primary_conninfo is generated using the
1235 : : * connection settings.
1236 : : */
1237 : 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1238 : :
1239 : : /*
1240 : : * Write recovery parameters.
1241 : : *
1242 : : * The subscriber is not running yet. In dry run mode, the recovery
1243 : : * parameters *won't* be written. An invalid LSN is used for printing
1244 : : * purposes. Additional recovery parameters are added here. It avoids
1245 : : * unexpected behavior such as end of recovery as soon as a consistent
1246 : : * state is reached (recovery_target) and failure due to multiple recovery
1247 : : * targets (name, time, xid, LSN).
1248 : : */
1249 : 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
141 drowley@postgresql.o 1250 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1251 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1252 : : "recovery_target_timeline = 'latest'\n");
1253 : :
1254 : : /*
1255 : : * Set recovery_target_inclusive = false to avoid reapplying the
1256 : : * transaction committed at 'lsn' after subscription is enabled. This is
1257 : : * because the provided 'lsn' is also used as the replication start point
1258 : : * for the subscription. So, the server can send the transaction committed
1259 : : * at that 'lsn' after replication is started which can lead to applying
1260 : : * the same transaction twice if we keep recovery_target_inclusive = true.
1261 : : */
1262 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1263 : : "recovery_target_inclusive = false\n");
1264 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1265 : : "recovery_target_action = promote\n");
1266 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1267 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1268 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1269 : :
530 peter@eisentraut.org 1270 [ + + ]: 4 : if (dry_run)
1271 : : {
141 drowley@postgresql.o 1272 : 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode");
530 peter@eisentraut.org 1273 : 3 : appendPQExpBuffer(recoveryconfcontents,
1274 : : "recovery_target_lsn = '%X/%08X'\n",
1275 : 3 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1276 : : }
1277 : : else
1278 : : {
1279 : 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1280 : : lsn);
1281 : 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1282 : : }
1283 : 4 : disconnect_database(conn, false);
1284 : :
1285 [ + + ]: 4 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1286 : 4 : }
1287 : :
1288 : : /*
1289 : : * Drop physical replication slot on primary if the standby was using it. After
1290 : : * the transformation, it has no use.
1291 : : *
1292 : : * XXX we might not fail here. Instead, we provide a warning so the user
1293 : : * eventually drops this replication slot later.
1294 : : */
1295 : : static void
1296 : 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1297 : : {
1298 : : PGconn *conn;
1299 : :
1300 : : /* Replication slot does not exist, do nothing */
1301 [ - + ]: 4 : if (!primary_slot_name)
530 peter@eisentraut.org 1302 :UBC 0 : return;
1303 : :
530 peter@eisentraut.org 1304 :CBC 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1305 [ + - ]: 4 : if (conn != NULL)
1306 : : {
1307 : 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1308 : 4 : disconnect_database(conn, false);
1309 : : }
1310 : : else
1311 : : {
530 peter@eisentraut.org 1312 :UBC 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1313 : : slotname);
1314 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1315 : : }
1316 : : }
1317 : :
1318 : : /*
1319 : : * Drop failover replication slots on subscriber. After the transformation,
1320 : : * they have no use.
1321 : : *
1322 : : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1323 : : * them later.
1324 : : */
1325 : : static void
446 peter@eisentraut.org 1326 :CBC 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1327 : : {
1328 : : PGconn *conn;
1329 : : PGresult *res;
1330 : :
1331 : 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1332 [ + - ]: 4 : if (conn != NULL)
1333 : : {
1334 : : /* Get failover replication slot names */
1335 : 4 : res = PQexec(conn,
1336 : : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1337 : :
1338 [ + - ]: 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1339 : : {
1340 : : /* Remove failover replication slots from subscriber */
1341 [ + + ]: 8 : for (int i = 0; i < PQntuples(res); i++)
1342 : 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1343 : : }
1344 : : else
1345 : : {
446 peter@eisentraut.org 1346 :UBC 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1347 : : PQresultErrorMessage(res));
1348 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1349 : : }
1350 : :
446 peter@eisentraut.org 1351 :CBC 4 : PQclear(res);
1352 : 4 : disconnect_database(conn, false);
1353 : : }
1354 : : else
1355 : : {
446 peter@eisentraut.org 1356 :UBC 0 : pg_log_warning("could not drop failover replication slot");
1357 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1358 : : }
446 peter@eisentraut.org 1359 :CBC 4 : }
1360 : :
1361 : : /*
1362 : : * Create a logical replication slot and returns a LSN.
1363 : : *
1364 : : * CreateReplicationSlot() is not used because it does not provide the one-row
1365 : : * result set that contains the LSN.
1366 : : */
1367 : : static char *
530 1368 : 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1369 : : {
1370 : 8 : PQExpBuffer str = createPQExpBuffer();
1371 : 8 : PGresult *res = NULL;
1372 : 8 : const char *slot_name = dbinfo->replslotname;
1373 : : char *slot_name_esc;
1374 : 8 : char *lsn = NULL;
1375 : :
1376 [ - + ]: 8 : Assert(conn != NULL);
1377 : :
408 1378 : 8 : pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1379 : : slot_name, dbinfo->dbname);
1380 : :
530 1381 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1382 : :
1383 : 8 : appendPQExpBuffer(str,
1384 : : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1385 : : slot_name_esc,
192 akapila@postgresql.o 1386 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
1387 : :
206 michael@paquier.xyz 1388 : 8 : PQfreemem(slot_name_esc);
1389 : :
530 peter@eisentraut.org 1390 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1391 : :
1392 [ + + ]: 8 : if (!dry_run)
1393 : : {
1394 : 2 : res = PQexec(conn, str->data);
1395 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1396 : : {
408 peter@eisentraut.org 1397 :UBC 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1398 : : slot_name, dbinfo->dbname,
1399 : : PQresultErrorMessage(res));
523 tgl@sss.pgh.pa.us 1400 : 0 : PQclear(res);
1401 : 0 : destroyPQExpBuffer(str);
530 peter@eisentraut.org 1402 : 0 : return NULL;
1403 : : }
1404 : :
530 peter@eisentraut.org 1405 :CBC 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1406 : 2 : PQclear(res);
1407 : : }
1408 : :
1409 : : /* For cleanup purposes */
1410 : 8 : dbinfo->made_replslot = true;
1411 : :
1412 : 8 : destroyPQExpBuffer(str);
1413 : :
1414 : 8 : return lsn;
1415 : : }
1416 : :
1417 : : static void
1418 : 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1419 : : const char *slot_name)
1420 : : {
1421 : 8 : PQExpBuffer str = createPQExpBuffer();
1422 : : char *slot_name_esc;
1423 : : PGresult *res;
1424 : :
1425 [ - + ]: 8 : Assert(conn != NULL);
1426 : :
408 1427 : 8 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1428 : : slot_name, dbinfo->dbname);
1429 : :
530 1430 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1431 : :
1432 : 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1433 : :
206 michael@paquier.xyz 1434 : 8 : PQfreemem(slot_name_esc);
1435 : :
530 peter@eisentraut.org 1436 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1437 : :
1438 [ + + ]: 8 : if (!dry_run)
1439 : : {
1440 : 2 : res = PQexec(conn, str->data);
1441 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1442 : : {
408 peter@eisentraut.org 1443 :UBC 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1444 : : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
530 1445 : 0 : dbinfo->made_replslot = false; /* don't try again. */
1446 : : }
1447 : :
530 peter@eisentraut.org 1448 :CBC 2 : PQclear(res);
1449 : : }
1450 : :
1451 : 8 : destroyPQExpBuffer(str);
1452 : 8 : }
1453 : :
1454 : : /*
1455 : : * Reports a suitable message if pg_ctl fails.
1456 : : */
1457 : : static void
1458 : 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1459 : : {
1460 [ - + ]: 24 : if (rc != 0)
1461 : : {
530 peter@eisentraut.org 1462 [ # # ]:UBC 0 : if (WIFEXITED(rc))
1463 : : {
1464 : 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1465 : : }
1466 [ # # ]: 0 : else if (WIFSIGNALED(rc))
1467 : : {
1468 : : #if defined(WIN32)
1469 : : pg_log_error("pg_ctl was terminated by exception 0x%X",
1470 : : WTERMSIG(rc));
1471 : : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1472 : : #else
1473 : 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1474 : : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1475 : : #endif
1476 : : }
1477 : : else
1478 : : {
1479 : 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1480 : : }
1481 : :
1482 : 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1483 : 0 : exit(1);
1484 : : }
530 peter@eisentraut.org 1485 :CBC 24 : }
1486 : :
1487 : : static void
431 akapila@postgresql.o 1488 : 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1489 : : bool restrict_logical_worker)
1490 : : {
530 peter@eisentraut.org 1491 : 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1492 : : int rc;
1493 : :
433 tgl@sss.pgh.pa.us 1494 : 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1495 : 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
141 drowley@postgresql.o 1496 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1497 : :
1498 : : /* Prevent unintended slot invalidation */
142 1499 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1500 : :
530 peter@eisentraut.org 1501 [ + - ]: 12 : if (restricted_access)
1502 : : {
1503 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1504 : : #if !defined(WIN32)
1505 : :
1506 : : /*
1507 : : * An empty listen_addresses list means the server does not listen on
1508 : : * any IP interfaces; only Unix-domain sockets can be used to connect
1509 : : * to the server. Prevent external connections to minimize the chance
1510 : : * of failure.
1511 : : */
1512 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1513 [ + - ]: 12 : if (opt->socket_dir)
1514 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1515 : 12 : opt->socket_dir);
1516 : 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1517 : : #endif
1518 : : }
1519 [ - + ]: 12 : if (opt->config_file != NULL)
530 peter@eisentraut.org 1520 :UBC 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1521 : 0 : opt->config_file);
1522 : :
1523 : : /* Suppress to start logical replication if requested */
431 akapila@postgresql.o 1524 [ + + ]:CBC 12 : if (restrict_logical_worker)
142 drowley@postgresql.o 1525 : 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1526 : :
530 peter@eisentraut.org 1527 [ + + ]: 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1528 : 12 : rc = system(pg_ctl_cmd->data);
1529 : 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1530 : 12 : standby_running = true;
1531 : 12 : destroyPQExpBuffer(pg_ctl_cmd);
1532 : 12 : pg_log_info("server was started");
1533 : 12 : }
1534 : :
1535 : : static void
1536 : 12 : stop_standby_server(const char *datadir)
1537 : : {
1538 : : char *pg_ctl_cmd;
1539 : : int rc;
1540 : :
1541 : 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1542 : : datadir);
1543 [ + + ]: 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1544 : 12 : rc = system(pg_ctl_cmd);
1545 : 12 : pg_ctl_status(pg_ctl_cmd, rc);
1546 : 12 : standby_running = false;
1547 : 12 : pg_log_info("server was stopped");
1548 : 12 : }
1549 : :
1550 : : /*
1551 : : * Returns after the server finishes the recovery process.
1552 : : *
1553 : : * If recovery_timeout option is set, terminate abnormally without finishing
1554 : : * the recovery process. By default, it waits forever.
1555 : : *
1556 : : * XXX Is the recovery process still in progress? When recovery process has a
1557 : : * better progress reporting mechanism, it should be added here.
1558 : : */
1559 : : static void
1560 : 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1561 : : {
1562 : : PGconn *conn;
1563 : 4 : int status = POSTMASTER_STILL_STARTING;
1564 : 4 : int timer = 0;
1565 : :
1566 : 4 : pg_log_info("waiting for the target server to reach the consistent state");
1567 : :
1568 : 4 : conn = connect_database(conninfo, true);
1569 : :
1570 : : for (;;)
530 peter@eisentraut.org 1571 :UBC 0 : {
530 peter@eisentraut.org 1572 :CBC 4 : bool in_recovery = server_is_in_recovery(conn);
1573 : :
1574 : : /*
1575 : : * Does the recovery process finish? In dry run mode, there is no
1576 : : * recovery mode. Bail out as the recovery process has ended.
1577 : : */
1578 [ + + + - ]: 4 : if (!in_recovery || dry_run)
1579 : : {
1580 : 4 : status = POSTMASTER_READY;
1581 : 4 : recovery_ended = true;
1582 : 4 : break;
1583 : : }
1584 : :
1585 : : /* Bail out after recovery_timeout seconds if this option is set */
530 peter@eisentraut.org 1586 [ # # # # ]:UBC 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1587 : : {
1588 : 0 : stop_standby_server(subscriber_dir);
1589 : 0 : pg_log_error("recovery timed out");
1590 : 0 : disconnect_database(conn, true);
1591 : : }
1592 : :
1593 : : /* Keep waiting */
1594 : 0 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1595 : :
1596 : 0 : timer += WAIT_INTERVAL;
1597 : : }
1598 : :
530 peter@eisentraut.org 1599 :CBC 4 : disconnect_database(conn, false);
1600 : :
1601 [ - + ]: 4 : if (status == POSTMASTER_STILL_STARTING)
530 peter@eisentraut.org 1602 :UBC 0 : pg_fatal("server did not end recovery");
1603 : :
530 peter@eisentraut.org 1604 :CBC 4 : pg_log_info("target server reached the consistent state");
1605 : 4 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1606 : 4 : }
1607 : :
1608 : : /*
1609 : : * Create a publication that includes all tables in the database.
1610 : : */
1611 : : static void
1612 : 8 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1613 : : {
1614 : 8 : PQExpBuffer str = createPQExpBuffer();
1615 : : PGresult *res;
1616 : : char *ipubname_esc;
1617 : : char *spubname_esc;
1618 : :
1619 [ - + ]: 8 : Assert(conn != NULL);
1620 : :
1621 : 8 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1622 : 8 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1623 : :
1624 : : /* Check if the publication already exists */
1625 : 8 : appendPQExpBuffer(str,
1626 : : "SELECT 1 FROM pg_catalog.pg_publication "
1627 : : "WHERE pubname = %s",
1628 : : spubname_esc);
1629 : 8 : res = PQexec(conn, str->data);
1630 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1631 : : {
530 peter@eisentraut.org 1632 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1633 : : PQresultErrorMessage(res));
1634 : 0 : disconnect_database(conn, true);
1635 : : }
1636 : :
530 peter@eisentraut.org 1637 [ - + ]:CBC 8 : if (PQntuples(res) == 1)
1638 : : {
1639 : : /*
1640 : : * Unfortunately, if it reaches this code path, it will always fail
1641 : : * (unless you decide to change the existing publication name). That's
1642 : : * bad but it is very unlikely that the user will choose a name with
1643 : : * pg_createsubscriber_ prefix followed by the exact database oid and
1644 : : * a random number.
1645 : : */
530 peter@eisentraut.org 1646 :UBC 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1647 : 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1648 : 0 : disconnect_database(conn, true);
1649 : : }
1650 : :
530 peter@eisentraut.org 1651 :CBC 8 : PQclear(res);
1652 : 8 : resetPQExpBuffer(str);
1653 : :
408 1654 : 8 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1655 : : dbinfo->pubname, dbinfo->dbname);
1656 : :
530 1657 : 8 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1658 : : ipubname_esc);
1659 : :
1660 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1661 : :
1662 [ + + ]: 8 : if (!dry_run)
1663 : : {
1664 : 2 : res = PQexec(conn, str->data);
1665 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1666 : : {
408 peter@eisentraut.org 1667 :UBC 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1668 : : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
530 1669 : 0 : disconnect_database(conn, true);
1670 : : }
530 peter@eisentraut.org 1671 :CBC 2 : PQclear(res);
1672 : : }
1673 : :
1674 : : /* For cleanup purposes */
1675 : 8 : dbinfo->made_publication = true;
1676 : :
206 michael@paquier.xyz 1677 : 8 : PQfreemem(ipubname_esc);
1678 : 8 : PQfreemem(spubname_esc);
530 peter@eisentraut.org 1679 : 8 : destroyPQExpBuffer(str);
1680 : 8 : }
1681 : :
1682 : : /*
1683 : : * Drop the specified publication in the given database.
1684 : : */
1685 : : static void
170 akapila@postgresql.o 1686 : 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1687 : : bool *made_publication)
1688 : : {
530 peter@eisentraut.org 1689 : 10 : PQExpBuffer str = createPQExpBuffer();
1690 : : PGresult *res;
1691 : : char *pubname_esc;
1692 : :
1693 [ - + ]: 10 : Assert(conn != NULL);
1694 : :
170 akapila@postgresql.o 1695 : 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1696 : :
408 peter@eisentraut.org 1697 : 10 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1698 : : pubname, dbname);
1699 : :
530 1700 : 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1701 : :
206 michael@paquier.xyz 1702 : 10 : PQfreemem(pubname_esc);
1703 : :
530 peter@eisentraut.org 1704 [ + + ]: 10 : pg_log_debug("command is: %s", str->data);
1705 : :
1706 [ + + ]: 10 : if (!dry_run)
1707 : : {
1708 : 4 : res = PQexec(conn, str->data);
1709 [ - + ]: 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1710 : : {
408 peter@eisentraut.org 1711 :UBC 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1712 : : pubname, dbname, PQresultErrorMessage(res));
170 akapila@postgresql.o 1713 : 0 : *made_publication = false; /* don't try again. */
1714 : :
1715 : : /*
1716 : : * Don't disconnect and exit here. This routine is used by primary
1717 : : * (cleanup publication / replication slot due to an error) and
1718 : : * subscriber (remove the replicated publications). In both cases,
1719 : : * it can continue and provide instructions for the user to remove
1720 : : * it later if cleanup fails.
1721 : : */
1722 : : }
530 peter@eisentraut.org 1723 :CBC 4 : PQclear(res);
1724 : : }
1725 : :
1726 : 10 : destroyPQExpBuffer(str);
1727 : 10 : }
1728 : :
1729 : : /*
1730 : : * Retrieve and drop the publications.
1731 : : *
1732 : : * Since the publications were created before the consistent LSN, they
1733 : : * remain on the subscriber even after the physical replica is
1734 : : * promoted. Remove these publications from the subscriber because
1735 : : * they have no use. Additionally, if requested, drop all pre-existing
1736 : : * publications.
1737 : : */
1738 : : static void
170 akapila@postgresql.o 1739 : 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1740 : : {
1741 : : PGresult *res;
73 peter@eisentraut.org 1742 : 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1743 : :
170 akapila@postgresql.o 1744 [ - + ]: 8 : Assert(conn != NULL);
1745 : :
1746 [ + + ]: 8 : if (drop_all_pubs)
1747 : : {
1748 : 2 : pg_log_info("dropping all existing publications in database \"%s\"",
1749 : : dbinfo->dbname);
1750 : :
1751 : : /* Fetch all publication names */
1752 : 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1753 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1754 : : {
170 akapila@postgresql.o 1755 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1756 : : PQresultErrorMessage(res));
1757 : 0 : PQclear(res);
1758 : 0 : disconnect_database(conn, true);
1759 : : }
1760 : :
1761 : : /* Drop each publication */
170 akapila@postgresql.o 1762 [ + + ]:CBC 6 : for (int i = 0; i < PQntuples(res); i++)
1763 : 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1764 : : &dbinfo->made_publication);
1765 : :
1766 : 2 : PQclear(res);
1767 : : }
1768 : :
1769 : : /*
1770 : : * In dry-run mode, we don't create publications, but we still try to drop
1771 : : * those to provide necessary information to the user.
1772 : : */
1773 [ + + - + ]: 8 : if (!drop_all_pubs || dry_run)
1774 : 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1775 : : &dbinfo->made_publication);
1776 : 8 : }
1777 : :
1778 : : /*
1779 : : * Create a subscription with some predefined options.
1780 : : *
1781 : : * A replication slot was already created in a previous step. Let's use it. It
1782 : : * is not required to copy data. The subscription will be created but it will
1783 : : * not be enabled now. That's because the replication progress must be set and
1784 : : * the replication origin name (one of the function arguments) contains the
1785 : : * subscription OID in its name. Once the subscription is created,
1786 : : * set_replication_progress() can obtain the chosen origin name and set up its
1787 : : * initial location.
1788 : : */
1789 : : static void
530 peter@eisentraut.org 1790 : 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1791 : : {
1792 : 8 : PQExpBuffer str = createPQExpBuffer();
1793 : : PGresult *res;
1794 : : char *pubname_esc;
1795 : : char *subname_esc;
1796 : : char *pubconninfo_esc;
1797 : : char *replslotname_esc;
1798 : :
1799 [ - + ]: 8 : Assert(conn != NULL);
1800 : :
1801 : 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1802 : 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1803 : 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1804 : 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1805 : :
408 1806 : 8 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1807 : : dbinfo->subname, dbinfo->dbname);
1808 : :
530 1809 : 8 : appendPQExpBuffer(str,
1810 : : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1811 : : "WITH (create_slot = false, enabled = false, "
1812 : : "slot_name = %s, copy_data = false, two_phase = %s)",
1813 : : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
192 akapila@postgresql.o 1814 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
1815 : :
206 michael@paquier.xyz 1816 : 8 : PQfreemem(pubname_esc);
1817 : 8 : PQfreemem(subname_esc);
1818 : 8 : PQfreemem(pubconninfo_esc);
1819 : 8 : PQfreemem(replslotname_esc);
1820 : :
530 peter@eisentraut.org 1821 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1822 : :
1823 [ + + ]: 8 : if (!dry_run)
1824 : : {
1825 : 2 : res = PQexec(conn, str->data);
1826 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1827 : : {
408 peter@eisentraut.org 1828 :UBC 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1829 : : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
530 1830 : 0 : disconnect_database(conn, true);
1831 : : }
530 peter@eisentraut.org 1832 :CBC 2 : PQclear(res);
1833 : : }
1834 : :
1835 : 8 : destroyPQExpBuffer(str);
1836 : 8 : }
1837 : :
1838 : : /*
1839 : : * Sets the replication progress to the consistent LSN.
1840 : : *
1841 : : * The subscriber caught up to the consistent LSN provided by the last
1842 : : * replication slot that was created. The goal is to set up the initial
1843 : : * location for the logical replication that is the exact LSN that the
1844 : : * subscriber was promoted. Once the subscription is enabled it will start
1845 : : * streaming from that location onwards. In dry run mode, the subscription OID
1846 : : * and LSN are set to invalid values for printing purposes.
1847 : : */
1848 : : static void
1849 : 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1850 : : {
1851 : 8 : PQExpBuffer str = createPQExpBuffer();
1852 : : PGresult *res;
1853 : : Oid suboid;
1854 : : char *subname;
1855 : : char *dbname;
1856 : : char *originname;
1857 : : char *lsnstr;
1858 : :
1859 [ - + ]: 8 : Assert(conn != NULL);
1860 : :
1861 : 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1862 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1863 : :
1864 : 8 : appendPQExpBuffer(str,
1865 : : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1866 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1867 : : "WHERE s.subname = %s AND d.datname = %s",
1868 : : subname, dbname);
1869 : :
1870 : 8 : res = PQexec(conn, str->data);
1871 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1872 : : {
530 peter@eisentraut.org 1873 :UBC 0 : pg_log_error("could not obtain subscription OID: %s",
1874 : : PQresultErrorMessage(res));
1875 : 0 : disconnect_database(conn, true);
1876 : : }
1877 : :
530 peter@eisentraut.org 1878 [ + + - + ]:CBC 8 : if (PQntuples(res) != 1 && !dry_run)
1879 : : {
529 peter@eisentraut.org 1880 :UBC 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1881 : : PQntuples(res), 1);
530 1882 : 0 : disconnect_database(conn, true);
1883 : : }
1884 : :
530 peter@eisentraut.org 1885 [ + + ]:CBC 8 : if (dry_run)
1886 : : {
1887 : 6 : suboid = InvalidOid;
61 alvherre@kurilemu.de 1888 :GNC 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1889 : : }
1890 : : else
1891 : : {
530 peter@eisentraut.org 1892 :CBC 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1893 : 2 : lsnstr = psprintf("%s", lsn);
1894 : : }
1895 : :
1896 : 8 : PQclear(res);
1897 : :
1898 : : /*
1899 : : * The origin name is defined as pg_%u. %u is the subscription OID. See
1900 : : * ApplyWorkerMain().
1901 : : */
1902 : 8 : originname = psprintf("pg_%u", suboid);
1903 : :
378 1904 : 8 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1905 : : originname, lsnstr, dbinfo->dbname);
1906 : :
530 1907 : 8 : resetPQExpBuffer(str);
1908 : 8 : appendPQExpBuffer(str,
1909 : : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1910 : : originname, lsnstr);
1911 : :
1912 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1913 : :
1914 [ + + ]: 8 : if (!dry_run)
1915 : : {
1916 : 2 : res = PQexec(conn, str->data);
1917 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1918 : : {
378 peter@eisentraut.org 1919 :UBC 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1920 : : dbinfo->subname, PQresultErrorMessage(res));
530 1921 : 0 : disconnect_database(conn, true);
1922 : : }
530 peter@eisentraut.org 1923 :CBC 2 : PQclear(res);
1924 : : }
1925 : :
206 michael@paquier.xyz 1926 : 8 : PQfreemem(subname);
1927 : 8 : PQfreemem(dbname);
530 peter@eisentraut.org 1928 : 8 : pg_free(originname);
1929 : 8 : pg_free(lsnstr);
1930 : 8 : destroyPQExpBuffer(str);
1931 : 8 : }
1932 : :
1933 : : /*
1934 : : * Enables the subscription.
1935 : : *
1936 : : * The subscription was created in a previous step but it was disabled. After
1937 : : * adjusting the initial logical replication location, enable the subscription.
1938 : : */
1939 : : static void
1940 : 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1941 : : {
1942 : 8 : PQExpBuffer str = createPQExpBuffer();
1943 : : PGresult *res;
1944 : : char *subname;
1945 : :
1946 [ - + ]: 8 : Assert(conn != NULL);
1947 : :
1948 : 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1949 : :
408 1950 : 8 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1951 : : dbinfo->subname, dbinfo->dbname);
1952 : :
530 1953 : 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1954 : :
1955 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1956 : :
1957 [ + + ]: 8 : if (!dry_run)
1958 : : {
1959 : 2 : res = PQexec(conn, str->data);
1960 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1961 : : {
530 peter@eisentraut.org 1962 :UBC 0 : pg_log_error("could not enable subscription \"%s\": %s",
1963 : : dbinfo->subname, PQresultErrorMessage(res));
1964 : 0 : disconnect_database(conn, true);
1965 : : }
1966 : :
530 peter@eisentraut.org 1967 :CBC 2 : PQclear(res);
1968 : : }
1969 : :
206 michael@paquier.xyz 1970 : 8 : PQfreemem(subname);
530 peter@eisentraut.org 1971 : 8 : destroyPQExpBuffer(str);
1972 : 8 : }
1973 : :
1974 : : /*
1975 : : * Fetch a list of all connectable non-template databases from the source server
1976 : : * and form a list such that they appear as if the user has specified multiple
1977 : : * --database options, one for each source database.
1978 : : */
1979 : : static void
162 akapila@postgresql.o 1980 : 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
1981 : : bool dbnamespecified)
1982 : : {
1983 : : PGconn *conn;
1984 : : PGresult *res;
1985 : :
1986 : : /* If a database name was specified, just connect to it. */
1987 [ - + ]: 1 : if (dbnamespecified)
162 akapila@postgresql.o 1988 :UBC 0 : conn = connect_database(opt->pub_conninfo_str, true);
1989 : : else
1990 : : {
1991 : : /* Otherwise, try postgres first and then template1. */
1992 : : char *conninfo;
1993 : :
162 akapila@postgresql.o 1994 :CBC 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
1995 : 1 : conn = connect_database(conninfo, false);
1996 : 1 : pg_free(conninfo);
1997 [ - + ]: 1 : if (!conn)
1998 : : {
162 akapila@postgresql.o 1999 :UBC 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2000 : 0 : conn = connect_database(conninfo, true);
2001 : 0 : pg_free(conninfo);
2002 : : }
2003 : : }
2004 : :
162 akapila@postgresql.o 2005 :CBC 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2006 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2007 : : {
162 akapila@postgresql.o 2008 :UBC 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2009 : 0 : PQclear(res);
2010 : 0 : disconnect_database(conn, true);
2011 : : }
2012 : :
162 akapila@postgresql.o 2013 [ + + ]:CBC 4 : for (int i = 0; i < PQntuples(res); i++)
2014 : : {
2015 : 3 : const char *dbname = PQgetvalue(res, i, 0);
2016 : :
2017 : 3 : simple_string_list_append(&opt->database_names, dbname);
2018 : :
2019 : : /* Increment num_dbs to reflect multiple --database options */
2020 : 3 : num_dbs++;
2021 : : }
2022 : :
2023 : 1 : PQclear(res);
2024 : 1 : disconnect_database(conn, false);
2025 : 1 : }
2026 : :
2027 : : int
530 peter@eisentraut.org 2028 : 23 : main(int argc, char **argv)
2029 : : {
2030 : : static struct option long_options[] =
2031 : : {
2032 : : {"all", no_argument, NULL, 'a'},
2033 : : {"database", required_argument, NULL, 'd'},
2034 : : {"pgdata", required_argument, NULL, 'D'},
2035 : : {"dry-run", no_argument, NULL, 'n'},
2036 : : {"subscriber-port", required_argument, NULL, 'p'},
2037 : : {"publisher-server", required_argument, NULL, 'P'},
2038 : : {"socketdir", required_argument, NULL, 's'},
2039 : : {"recovery-timeout", required_argument, NULL, 't'},
2040 : : {"enable-two-phase", no_argument, NULL, 'T'},
2041 : : {"subscriber-username", required_argument, NULL, 'U'},
2042 : : {"verbose", no_argument, NULL, 'v'},
2043 : : {"version", no_argument, NULL, 'V'},
2044 : : {"help", no_argument, NULL, '?'},
2045 : : {"config-file", required_argument, NULL, 1},
2046 : : {"publication", required_argument, NULL, 2},
2047 : : {"replication-slot", required_argument, NULL, 3},
2048 : : {"subscription", required_argument, NULL, 4},
2049 : : {"clean", required_argument, NULL, 5},
2050 : : {NULL, 0, NULL, 0}
2051 : : };
2052 : :
2053 : 23 : struct CreateSubscriberOptions opt = {0};
2054 : :
2055 : : int c;
2056 : : int option_index;
2057 : :
2058 : : char *pub_base_conninfo;
2059 : : char *sub_base_conninfo;
2060 : 23 : char *dbname_conninfo = NULL;
2061 : :
2062 : : uint64 pub_sysid;
2063 : : uint64 sub_sysid;
2064 : : struct stat statbuf;
2065 : :
2066 : : char *consistent_lsn;
2067 : :
2068 : : char pidfile[MAXPGPATH];
2069 : :
2070 : 23 : pg_logging_init(argv[0]);
2071 : 23 : pg_logging_set_level(PG_LOG_WARNING);
2072 : 23 : progname = get_progname(argv[0]);
400 alvherre@alvh.no-ip. 2073 : 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2074 : :
530 peter@eisentraut.org 2075 [ + + ]: 23 : if (argc > 1)
2076 : : {
2077 [ + + - + ]: 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2078 : : {
2079 : 1 : usage();
2080 : 1 : exit(0);
2081 : : }
2082 [ + - ]: 21 : else if (strcmp(argv[1], "-V") == 0
2083 [ + + ]: 21 : || strcmp(argv[1], "--version") == 0)
2084 : : {
2085 : 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2086 : 1 : exit(0);
2087 : : }
2088 : : }
2089 : :
2090 : : /* Default settings */
2091 : 21 : subscriber_dir = NULL;
2092 : 21 : opt.config_file = NULL;
2093 : 21 : opt.pub_conninfo_str = NULL;
2094 : 21 : opt.socket_dir = NULL;
2095 : 21 : opt.sub_port = DEFAULT_SUB_PORT;
2096 : 21 : opt.sub_username = NULL;
192 akapila@postgresql.o 2097 : 21 : opt.two_phase = false;
530 peter@eisentraut.org 2098 : 21 : opt.database_names = (SimpleStringList)
2099 : : {
2100 : : 0
2101 : : };
2102 : 21 : opt.recovery_timeout = 0;
162 akapila@postgresql.o 2103 : 21 : opt.all_dbs = false;
2104 : :
2105 : : /*
2106 : : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2107 : : * it either.
2108 : : */
2109 : : #ifndef WIN32
530 peter@eisentraut.org 2110 [ - + ]: 21 : if (geteuid() == 0)
2111 : : {
530 peter@eisentraut.org 2112 :UBC 0 : pg_log_error("cannot be executed by \"root\"");
2113 : 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2114 : : progname);
2115 : 0 : exit(1);
2116 : : }
2117 : : #endif
2118 : :
530 peter@eisentraut.org 2119 :CBC 21 : get_restricted_token();
2120 : :
73 2121 : 162 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
530 2122 [ + + ]: 162 : long_options, &option_index)) != -1)
2123 : : {
2124 [ + + + + : 144 : switch (c)
+ + + + +
- + - + +
+ + + ]
2125 : : {
162 akapila@postgresql.o 2126 : 3 : case 'a':
2127 : 3 : opt.all_dbs = true;
2128 : 3 : break;
530 peter@eisentraut.org 2129 : 25 : case 'd':
2130 [ + + ]: 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2131 : : {
2132 : 24 : simple_string_list_append(&opt.database_names, optarg);
2133 : 24 : num_dbs++;
2134 : : }
2135 : : else
155 akapila@postgresql.o 2136 : 1 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
530 peter@eisentraut.org 2137 : 24 : break;
2138 : 19 : case 'D':
2139 : 19 : subscriber_dir = pg_strdup(optarg);
2140 : 19 : canonicalize_path(subscriber_dir);
2141 : 19 : break;
2142 : 9 : case 'n':
2143 : 9 : dry_run = true;
2144 : 9 : break;
2145 : 12 : case 'p':
2146 : 12 : opt.sub_port = pg_strdup(optarg);
2147 : 12 : break;
2148 : 18 : case 'P':
2149 : 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2150 : 18 : break;
2151 : 12 : case 's':
2152 : 12 : opt.socket_dir = pg_strdup(optarg);
2153 : 12 : canonicalize_path(opt.socket_dir);
2154 : 12 : break;
2155 : 3 : case 't':
2156 : 3 : opt.recovery_timeout = atoi(optarg);
2157 : 3 : break;
192 akapila@postgresql.o 2158 : 1 : case 'T':
2159 : 1 : opt.two_phase = true;
2160 : 1 : break;
530 peter@eisentraut.org 2161 :UBC 0 : case 'U':
2162 : 0 : opt.sub_username = pg_strdup(optarg);
2163 : 0 : break;
530 peter@eisentraut.org 2164 :CBC 19 : case 'v':
2165 : 19 : pg_logging_increase_verbosity();
2166 : 19 : break;
530 peter@eisentraut.org 2167 :UBC 0 : case 1:
2168 : 0 : opt.config_file = pg_strdup(optarg);
2169 : 0 : break;
530 peter@eisentraut.org 2170 :CBC 12 : case 2:
2171 [ + + ]: 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2172 : : {
2173 : 11 : simple_string_list_append(&opt.pub_names, optarg);
2174 : 11 : num_pubs++;
2175 : : }
2176 : : else
155 akapila@postgresql.o 2177 : 1 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
530 peter@eisentraut.org 2178 : 11 : break;
2179 : 4 : case 3:
2180 [ + - ]: 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2181 : : {
2182 : 4 : simple_string_list_append(&opt.replslot_names, optarg);
2183 : 4 : num_replslots++;
2184 : : }
2185 : : else
155 akapila@postgresql.o 2186 :UBC 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
530 peter@eisentraut.org 2187 :CBC 4 : break;
2188 : 5 : case 4:
2189 [ + - ]: 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2190 : : {
2191 : 5 : simple_string_list_append(&opt.sub_names, optarg);
2192 : 5 : num_subs++;
2193 : : }
2194 : : else
155 akapila@postgresql.o 2195 :UBC 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
530 peter@eisentraut.org 2196 :CBC 5 : break;
73 2197 : 1 : case 5:
2198 [ + - ]: 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2199 : 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2200 : : else
73 peter@eisentraut.org 2201 :UBC 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
73 peter@eisentraut.org 2202 :CBC 1 : break;
530 2203 : 1 : default:
2204 : : /* getopt_long already emitted a complaint */
2205 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2206 : 1 : exit(1);
2207 : : }
2208 : : }
2209 : :
2210 : : /* Validate that --all is not used with incompatible options */
162 akapila@postgresql.o 2211 [ + + ]: 18 : if (opt.all_dbs)
2212 : : {
2213 : 3 : char *bad_switch = NULL;
2214 : :
2215 [ + + ]: 3 : if (num_dbs > 0)
2216 : 1 : bad_switch = "--database";
2217 [ + + ]: 2 : else if (num_pubs > 0)
2218 : 1 : bad_switch = "--publication";
2219 [ - + ]: 1 : else if (num_replslots > 0)
162 akapila@postgresql.o 2220 :UBC 0 : bad_switch = "--replication-slot";
162 akapila@postgresql.o 2221 [ - + ]:CBC 1 : else if (num_subs > 0)
162 akapila@postgresql.o 2222 :UBC 0 : bad_switch = "--subscription";
2223 : :
162 akapila@postgresql.o 2224 [ + + ]:CBC 3 : if (bad_switch)
2225 : : {
82 peter@eisentraut.org 2226 : 2 : pg_log_error("options %s and -a/--all cannot be used together", bad_switch);
162 akapila@postgresql.o 2227 : 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2228 : 2 : exit(1);
2229 : : }
2230 : : }
2231 : :
2232 : : /* Any non-option arguments? */
530 peter@eisentraut.org 2233 [ - + ]: 16 : if (optind < argc)
2234 : : {
530 peter@eisentraut.org 2235 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2236 : : argv[optind]);
2237 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2238 : 0 : exit(1);
2239 : : }
2240 : :
2241 : : /* Required arguments */
530 peter@eisentraut.org 2242 [ + + ]:CBC 16 : if (subscriber_dir == NULL)
2243 : : {
2244 : 1 : pg_log_error("no subscriber data directory specified");
2245 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2246 : 1 : exit(1);
2247 : : }
2248 : :
2249 : : /* If socket directory is not provided, use the current directory */
2250 [ + + ]: 15 : if (opt.socket_dir == NULL)
2251 : : {
2252 : : char cwd[MAXPGPATH];
2253 : :
2254 [ - + ]: 5 : if (!getcwd(cwd, MAXPGPATH))
530 peter@eisentraut.org 2255 :UBC 0 : pg_fatal("could not determine current directory");
530 peter@eisentraut.org 2256 :CBC 5 : opt.socket_dir = pg_strdup(cwd);
2257 : 5 : canonicalize_path(opt.socket_dir);
2258 : : }
2259 : :
2260 : : /*
2261 : : * Parse connection string. Build a base connection string that might be
2262 : : * reused by multiple databases.
2263 : : */
2264 [ + + ]: 15 : if (opt.pub_conninfo_str == NULL)
2265 : : {
2266 : : /*
2267 : : * TODO use primary_conninfo (if available) from subscriber and
2268 : : * extract publisher connection string. Assume that there are
2269 : : * identical entries for physical and logical replication. If there is
2270 : : * not, we would fail anyway.
2271 : : */
2272 : 1 : pg_log_error("no publisher connection string specified");
2273 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2274 : 1 : exit(1);
2275 : : }
400 alvherre@alvh.no-ip. 2276 : 14 : pg_log_info("validating publisher connection string");
530 peter@eisentraut.org 2277 : 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2278 : : &dbname_conninfo);
2279 [ - + ]: 14 : if (pub_base_conninfo == NULL)
530 peter@eisentraut.org 2280 :UBC 0 : exit(1);
2281 : :
400 alvherre@alvh.no-ip. 2282 :CBC 14 : pg_log_info("validating subscriber connection string");
530 peter@eisentraut.org 2283 : 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2284 : :
2285 : : /*
2286 : : * Fetch all databases from the source (publisher) and treat them as if
2287 : : * the user specified has multiple --database options, one for each source
2288 : : * database.
2289 : : */
162 akapila@postgresql.o 2290 [ + + ]: 14 : if (opt.all_dbs)
2291 : : {
2292 : 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2293 : :
2294 : 1 : get_publisher_databases(&opt, dbnamespecified);
2295 : : }
2296 : :
530 peter@eisentraut.org 2297 [ + + ]: 14 : if (opt.database_names.head == NULL)
2298 : : {
2299 : 2 : pg_log_info("no database was specified");
2300 : :
2301 : : /*
2302 : : * Try to obtain the dbname from the publisher conninfo. If dbname
2303 : : * parameter is not available, error out.
2304 : : */
2305 [ + + ]: 2 : if (dbname_conninfo)
2306 : : {
2307 : 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2308 : 1 : num_dbs++;
2309 : :
378 2310 : 1 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2311 : : dbname_conninfo);
2312 : : }
2313 : : else
2314 : : {
530 2315 : 1 : pg_log_error("no database name specified");
2316 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.",
2317 : : progname);
2318 : 1 : exit(1);
2319 : : }
2320 : : }
2321 : :
2322 : : /* Number of object names must match number of databases */
2323 [ + + + + ]: 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2324 : : {
378 2325 : 1 : pg_log_error("wrong number of publication names specified");
2326 : 1 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2327 : : num_pubs, num_dbs);
530 2328 : 1 : exit(1);
2329 : : }
2330 [ + + + + ]: 12 : if (num_subs > 0 && num_subs != num_dbs)
2331 : : {
378 2332 : 1 : pg_log_error("wrong number of subscription names specified");
2333 : 1 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2334 : : num_subs, num_dbs);
530 2335 : 1 : exit(1);
2336 : : }
2337 [ + + + + ]: 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2338 : : {
378 2339 : 1 : pg_log_error("wrong number of replication slot names specified");
2340 : 1 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2341 : : num_replslots, num_dbs);
530 2342 : 1 : exit(1);
2343 : : }
2344 : :
2345 : : /* Verify the object types specified for removal from the subscriber */
73 2346 [ + + ]: 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2347 : : {
170 akapila@postgresql.o 2348 [ + - ]: 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
73 peter@eisentraut.org 2349 : 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2350 : : else
2351 : : {
73 peter@eisentraut.org 2352 :UBC 0 : pg_log_error("invalid object type \"%s\" specified for --clean", cell->val);
82 2353 : 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
170 akapila@postgresql.o 2354 : 0 : exit(1);
2355 : : }
2356 : : }
2357 : :
2358 : : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
530 peter@eisentraut.org 2359 :CBC 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2360 : 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2361 : :
2362 : : /* Rudimentary check for a data directory */
2363 : 10 : check_data_directory(subscriber_dir);
2364 : :
192 akapila@postgresql.o 2365 : 10 : dbinfos.two_phase = opt.two_phase;
2366 : :
2367 : : /*
2368 : : * Store database information for publisher and subscriber. It should be
2369 : : * called before atexit() because its return is used in the
2370 : : * cleanup_objects_atexit().
2371 : : */
2372 : 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2373 : :
2374 : : /* Register a function to clean up objects in case of failure */
530 peter@eisentraut.org 2375 : 10 : atexit(cleanup_objects_atexit);
2376 : :
2377 : : /*
2378 : : * Check if the subscriber data directory has the same system identifier
2379 : : * than the publisher data directory.
2380 : : */
192 akapila@postgresql.o 2381 : 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
530 peter@eisentraut.org 2382 : 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2383 [ + + ]: 10 : if (pub_sysid != sub_sysid)
2384 : 1 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2385 : :
2386 : : /* Subscriber PID file */
2387 : 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2388 : :
2389 : : /*
2390 : : * The standby server must not be running. If the server is started under
2391 : : * service manager and pg_createsubscriber stops it, the service manager
2392 : : * might react to this action and start the server again. Therefore,
2393 : : * refuse to proceed if the server is running to avoid possible failures.
2394 : : */
2395 [ + + ]: 9 : if (stat(pidfile, &statbuf) == 0)
2396 : : {
378 2397 : 1 : pg_log_error("standby server is running");
2398 : 1 : pg_log_error_hint("Stop the standby server and try again.");
530 2399 : 1 : exit(1);
2400 : : }
2401 : :
2402 : : /*
2403 : : * Start a short-lived standby server with temporary parameters (provided
2404 : : * by command-line options). The goal is to avoid connections during the
2405 : : * transformation steps.
2406 : : */
378 2407 : 8 : pg_log_info("starting the standby server with command-line options");
431 akapila@postgresql.o 2408 : 8 : start_standby_server(&opt, true, false);
2409 : :
2410 : : /* Check if the standby server is ready for logical replication */
192 2411 : 8 : check_subscriber(dbinfos.dbinfo);
2412 : :
2413 : : /* Check if the primary server is ready for logical replication */
2414 : 6 : check_publisher(dbinfos.dbinfo);
2415 : :
2416 : : /*
2417 : : * Stop the target server. The recovery process requires that the server
2418 : : * reaches a consistent state before targeting the recovery stop point.
2419 : : * Make sure a consistent state is reached (stop the target server
2420 : : * guarantees it) *before* creating the replication slots in
2421 : : * setup_publisher().
2422 : : */
530 peter@eisentraut.org 2423 : 4 : pg_log_info("stopping the subscriber");
2424 : 4 : stop_standby_server(subscriber_dir);
2425 : :
2426 : : /* Create the required objects for each database on publisher */
192 akapila@postgresql.o 2427 : 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2428 : :
2429 : : /* Write the required recovery parameters */
2430 : 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2431 : :
2432 : : /*
2433 : : * Start subscriber so the recovery parameters will take effect. Wait
2434 : : * until accepting connections. We don't want to start logical replication
2435 : : * during setup.
2436 : : */
530 peter@eisentraut.org 2437 : 4 : pg_log_info("starting the subscriber");
431 akapila@postgresql.o 2438 : 4 : start_standby_server(&opt, true, true);
2439 : :
2440 : : /* Waiting the subscriber to be promoted */
192 2441 : 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2442 : :
2443 : : /*
2444 : : * Create the subscription for each database on subscriber. It does not
2445 : : * enable it immediately because it needs to adjust the replication start
2446 : : * point to the LSN reported by setup_publisher(). It also cleans up
2447 : : * publications created by this tool and replication to the standby.
2448 : : */
2449 : 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2450 : :
2451 : : /* Remove primary_slot_name if it exists on primary */
2452 : 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2453 : :
2454 : : /* Remove failover replication slots if they exist on subscriber */
2455 : 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2456 : :
2457 : : /* Stop the subscriber */
530 peter@eisentraut.org 2458 : 4 : pg_log_info("stopping the subscriber");
2459 : 4 : stop_standby_server(subscriber_dir);
2460 : :
2461 : : /* Change system identifier from subscriber */
2462 : 4 : modify_subscriber_sysid(&opt);
2463 : :
2464 : 4 : success = true;
2465 : :
2466 : 4 : pg_log_info("Done!");
2467 : :
2468 : 4 : return 0;
2469 : : }
|