Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_createsubscriber.c
4 : : * Create a new logical replica from a standby server
5 : : *
6 : : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres_fe.h"
15 : :
16 : : #include <sys/stat.h>
17 : : #include <sys/time.h>
18 : : #include <sys/wait.h>
19 : : #include <time.h>
20 : :
21 : : #include "common/connect.h"
22 : : #include "common/controldata_utils.h"
23 : : #include "common/file_utils.h"
24 : : #include "common/logging.h"
25 : : #include "common/pg_prng.h"
26 : : #include "common/restricted_token.h"
27 : : #include "datatype/timestamp.h"
28 : : #include "fe_utils/recovery_gen.h"
29 : : #include "fe_utils/simple_list.h"
30 : : #include "fe_utils/string_utils.h"
31 : : #include "fe_utils/version.h"
32 : : #include "getopt_long.h"
33 : :
34 : : #define DEFAULT_SUB_PORT "50432"
35 : : #define OBJECTTYPE_PUBLICATIONS 0x0001
36 : :
37 : : /*
38 : : * Configuration files for recovery parameters.
39 : : *
40 : : * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
41 : : * the server through an include_if_exists in postgresql.auto.conf.
42 : : *
43 : : * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
44 : : * so as the recovery parameters set by this tool never take effect on node
45 : : * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
46 : : * debugging.
47 : : */
48 : : #define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
49 : : #define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
50 : : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
51 : :
52 : : /* Command-line options */
53 : : struct CreateSubscriberOptions
54 : : {
55 : : char *config_file; /* configuration file */
56 : : char *pub_conninfo_str; /* publisher connection string */
57 : : char *socket_dir; /* directory for Unix-domain socket, if any */
58 : : char *sub_port; /* subscriber port number */
59 : : const char *sub_username; /* subscriber username */
60 : : bool two_phase; /* enable-two-phase option */
61 : : SimpleStringList database_names; /* list of database names */
62 : : SimpleStringList pub_names; /* list of publication names */
63 : : SimpleStringList sub_names; /* list of subscription names */
64 : : SimpleStringList replslot_names; /* list of replication slot names */
65 : : int recovery_timeout; /* stop recovery after this time */
66 : : bool all_dbs; /* all option */
67 : : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
68 : : };
69 : :
70 : : /* per-database publication/subscription info */
71 : : struct LogicalRepInfo
72 : : {
73 : : char *dbname; /* database name */
74 : : char *pubconninfo; /* publisher connection string */
75 : : char *subconninfo; /* subscriber connection string */
76 : : char *pubname; /* publication name */
77 : : char *subname; /* subscription name */
78 : : char *replslotname; /* replication slot name */
79 : :
80 : : bool made_replslot; /* replication slot was created */
81 : : bool made_publication; /* publication was created */
82 : : };
83 : :
84 : : /*
85 : : * Information shared across all the databases (or publications and
86 : : * subscriptions).
87 : : */
88 : : struct LogicalRepInfos
89 : : {
90 : : struct LogicalRepInfo *dbinfo;
91 : : bool two_phase; /* enable-two-phase option */
92 : : bits32 objecttypes_to_clean; /* flags indicating which object types
93 : : * to clean up on subscriber */
94 : : };
95 : :
96 : : static void cleanup_objects_atexit(void);
97 : : static void usage(void);
98 : : static char *get_base_conninfo(const char *conninfo, char **dbname);
99 : : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
100 : : static char *get_exec_path(const char *argv0, const char *progname);
101 : : static void check_data_directory(const char *datadir);
102 : : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
103 : : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
104 : : const char *pub_base_conninfo,
105 : : const char *sub_base_conninfo);
106 : : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
107 : : static void disconnect_database(PGconn *conn, bool exit_on_error);
108 : : static uint64 get_primary_sysid(const char *conninfo);
109 : : static uint64 get_standby_sysid(const char *datadir);
110 : : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
111 : : static bool server_is_in_recovery(PGconn *conn);
112 : : static char *generate_object_name(PGconn *conn);
113 : : static void check_publisher(const struct LogicalRepInfo *dbinfo);
114 : : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
115 : : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
116 : : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
117 : : const char *consistent_lsn);
118 : : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
119 : : const char *lsn);
120 : : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
121 : : const char *slotname);
122 : : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
123 : : static char *create_logical_replication_slot(PGconn *conn,
124 : : struct LogicalRepInfo *dbinfo);
125 : : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
126 : : const char *slot_name);
127 : : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
128 : : static void start_standby_server(const struct CreateSubscriberOptions *opt,
129 : : bool restricted_access,
130 : : bool restrict_logical_worker);
131 : : static void stop_standby_server(const char *datadir);
132 : : static void wait_for_end_recovery(const char *conninfo,
133 : : const struct CreateSubscriberOptions *opt);
134 : : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
135 : : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
136 : : static void drop_publication(PGconn *conn, const char *pubname,
137 : : const char *dbname, bool *made_publication);
138 : : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
139 : : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
140 : : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
141 : : const char *lsn);
142 : : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
143 : : static void check_and_drop_existing_subscriptions(PGconn *conn,
144 : : const struct LogicalRepInfo *dbinfo);
145 : : static void drop_existing_subscription(PGconn *conn, const char *subname,
146 : : const char *dbname);
147 : : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
148 : : bool dbnamespecified);
149 : :
150 : : #define WAIT_INTERVAL 1 /* 1 second */
151 : :
152 : : static const char *progname;
153 : :
154 : : static char *primary_slot_name = NULL;
155 : : static bool dry_run = false;
156 : :
157 : : static bool success = false;
158 : :
159 : : static struct LogicalRepInfos dbinfos;
160 : : static int num_dbs = 0; /* number of specified databases */
161 : : static int num_pubs = 0; /* number of specified publications */
162 : : static int num_subs = 0; /* number of specified subscriptions */
163 : : static int num_replslots = 0; /* number of specified replication slots */
164 : :
165 : : static pg_prng_state prng_state;
166 : :
167 : : static char *pg_ctl_path = NULL;
168 : : static char *pg_resetwal_path = NULL;
169 : :
170 : : /* standby / subscriber data directory */
171 : : static char *subscriber_dir = NULL;
172 : :
173 : : static bool recovery_ended = false;
174 : : static bool standby_running = false;
175 : : static bool recovery_params_set = false;
176 : :
177 : :
178 : : /*
179 : : * Clean up objects created by pg_createsubscriber.
180 : : *
181 : : * Publications and replication slots are created on the primary. Depending
182 : : * on the step where it failed, already-created objects should be removed if
183 : : * possible (sometimes this won't work due to a connection issue).
184 : : * There is no cleanup on the target server *after* its promotion, because any
185 : : * failure at this point means recreating the physical replica and starting
186 : : * again.
187 : : *
188 : : * The recovery configuration is always removed, by renaming the included
189 : : * configuration file out of the way.
190 : : */
191 : : static void
720 peter@eisentraut.org 192 :CBC 10 : cleanup_objects_atexit(void)
193 : : {
194 : : /* Rename the included configuration file, if necessary. */
66 michael@paquier.xyz 195 [ + + ]:GNC 10 : if (recovery_params_set)
196 : : {
197 : : char conf_filename[MAXPGPATH];
198 : : char conf_filename_disabled[MAXPGPATH];
199 : :
200 : 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
201 : : INCLUDED_CONF_FILE);
202 : 1 : snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
203 : : INCLUDED_CONF_FILE_DISABLED);
204 : :
205 [ - + ]: 1 : if (durable_rename(conf_filename, conf_filename_disabled) != 0)
206 : : {
207 : : /* durable_rename() has already logged something. */
66 michael@paquier.xyz 208 :UNC 0 : pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
209 : : }
210 : : }
211 : :
720 peter@eisentraut.org 212 [ + + ]:CBC 10 : if (success)
213 : 4 : return;
214 : :
215 : : /*
216 : : * If the server is promoted, there is no way to use the current setup
217 : : * again. Warn the user that a new replication setup should be done before
218 : : * trying again.
219 : : */
220 [ - + ]: 6 : if (recovery_ended)
221 : : {
720 peter@eisentraut.org 222 :UBC 0 : pg_log_warning("failed after the end of recovery");
223 : 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
224 : : "You must recreate the physical replica before continuing.");
225 : : }
226 : :
720 peter@eisentraut.org 227 [ + + ]:CBC 18 : for (int i = 0; i < num_dbs; i++)
228 : : {
364 michael@paquier.xyz 229 : 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
230 : :
231 [ + - - + ]: 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
232 : : {
233 : : PGconn *conn;
234 : :
364 michael@paquier.xyz 235 :UBC 0 : conn = connect_database(dbinfo->pubconninfo, false);
720 peter@eisentraut.org 236 [ # # ]: 0 : if (conn != NULL)
237 : : {
364 michael@paquier.xyz 238 [ # # ]: 0 : if (dbinfo->made_publication)
360 akapila@postgresql.o 239 : 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
240 : : &dbinfo->made_publication);
364 michael@paquier.xyz 241 [ # # ]: 0 : if (dbinfo->made_replslot)
242 : 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
720 peter@eisentraut.org 243 : 0 : disconnect_database(conn, false);
244 : : }
245 : : else
246 : : {
247 : : /*
248 : : * If a connection could not be established, inform the user
249 : : * that some objects were left on primary and should be
250 : : * removed before trying again.
251 : : */
364 michael@paquier.xyz 252 [ # # ]: 0 : if (dbinfo->made_publication)
253 : : {
568 peter@eisentraut.org 254 : 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
255 : : dbinfo->pubname,
256 : : dbinfo->dbname);
257 : 0 : pg_log_warning_hint("Drop this publication before trying again.");
258 : : }
364 michael@paquier.xyz 259 [ # # ]: 0 : if (dbinfo->made_replslot)
260 : : {
568 peter@eisentraut.org 261 : 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
262 : : dbinfo->replslotname,
263 : : dbinfo->dbname);
720 264 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
265 : : }
266 : : }
267 : : }
268 : : }
269 : :
720 peter@eisentraut.org 270 [ + + ]:CBC 6 : if (standby_running)
271 : 4 : stop_standby_server(subscriber_dir);
272 : : }
273 : :
274 : : static void
275 : 1 : usage(void)
276 : : {
277 : 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
278 : : progname);
279 : 1 : printf(_("Usage:\n"));
280 : 1 : printf(_(" %s [OPTION]...\n"), progname);
281 : 1 : printf(_("\nOptions:\n"));
352 akapila@postgresql.o 282 : 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
283 : : " databases and databases that don't allow connections\n"));
568 peter@eisentraut.org 284 : 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
633 285 : 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
286 : 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
287 : 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
288 : 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
591 289 : 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
633 290 : 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
382 akapila@postgresql.o 291 : 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
568 peter@eisentraut.org 292 : 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
633 293 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
263 294 : 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
295 : : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
633 296 : 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
297 : : " file when running target cluster\n"));
298 : 1 : printf(_(" --publication=NAME publication name\n"));
299 : 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
300 : 1 : printf(_(" --subscription=NAME subscription name\n"));
301 : 1 : printf(_(" -V, --version output version information, then exit\n"));
302 : 1 : printf(_(" -?, --help show this help, then exit\n"));
720 303 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
304 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
305 : 1 : }
306 : :
307 : : /*
308 : : * Subroutine to append "keyword=value" to a connection string,
309 : : * with proper quoting of the value. (We assume keywords don't need that.)
310 : : */
311 : : static void
623 tgl@sss.pgh.pa.us 312 : 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
313 : : {
314 [ + + ]: 107 : if (buf->len > 0)
315 : 79 : appendPQExpBufferChar(buf, ' ');
316 : 107 : appendPQExpBufferStr(buf, keyword);
317 : 107 : appendPQExpBufferChar(buf, '=');
318 : 107 : appendConnStrVal(buf, val);
319 : 107 : }
320 : :
321 : : /*
322 : : * Validate a connection string. Returns a base connection string that is a
323 : : * connection string without a database name.
324 : : *
325 : : * Since we might process multiple databases, each database name will be
326 : : * appended to this base connection string to provide a final connection
327 : : * string. If the second argument (dbname) is not null, returns dbname if the
328 : : * provided connection string contains it.
329 : : *
330 : : * It is the caller's responsibility to free the returned connection string and
331 : : * dbname.
332 : : */
333 : : static char *
720 peter@eisentraut.org 334 : 14 : get_base_conninfo(const char *conninfo, char **dbname)
335 : : {
336 : : PQExpBuffer buf;
337 : : PQconninfoOption *conn_opts;
338 : : PQconninfoOption *conn_opt;
339 : 14 : char *errmsg = NULL;
340 : : char *ret;
341 : :
342 : 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
343 [ - + ]: 14 : if (conn_opts == NULL)
344 : : {
720 peter@eisentraut.org 345 :UBC 0 : pg_log_error("could not parse connection string: %s", errmsg);
713 tgl@sss.pgh.pa.us 346 : 0 : PQfreemem(errmsg);
720 peter@eisentraut.org 347 : 0 : return NULL;
348 : : }
349 : :
713 tgl@sss.pgh.pa.us 350 :CBC 14 : buf = createPQExpBuffer();
720 peter@eisentraut.org 351 [ + + ]: 728 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
352 : : {
353 [ + + + - ]: 714 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
354 : : {
623 tgl@sss.pgh.pa.us 355 [ + + ]: 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
356 : : {
357 [ + - ]: 9 : if (dbname)
358 : 9 : *dbname = pg_strdup(conn_opt->val);
359 : 9 : continue;
360 : : }
361 : 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
362 : : }
363 : : }
364 : :
720 peter@eisentraut.org 365 : 14 : ret = pg_strdup(buf->data);
366 : :
367 : 14 : destroyPQExpBuffer(buf);
368 : 14 : PQconninfoFree(conn_opts);
369 : :
370 : 14 : return ret;
371 : : }
372 : :
373 : : /*
374 : : * Build a subscriber connection string. Only a few parameters are supported
375 : : * since it starts a server with restricted access.
376 : : */
377 : : static char *
378 : 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
379 : : {
380 : 14 : PQExpBuffer buf = createPQExpBuffer();
381 : : char *ret;
382 : :
623 tgl@sss.pgh.pa.us 383 : 14 : appendConnStrItem(buf, "port", opt->sub_port);
384 : : #if !defined(WIN32)
385 : 14 : appendConnStrItem(buf, "host", opt->socket_dir);
386 : : #endif
720 peter@eisentraut.org 387 [ - + ]: 14 : if (opt->sub_username != NULL)
623 tgl@sss.pgh.pa.us 388 :UBC 0 : appendConnStrItem(buf, "user", opt->sub_username);
623 tgl@sss.pgh.pa.us 389 :CBC 14 : appendConnStrItem(buf, "fallback_application_name", progname);
390 : :
720 peter@eisentraut.org 391 : 14 : ret = pg_strdup(buf->data);
392 : :
393 : 14 : destroyPQExpBuffer(buf);
394 : :
395 : 14 : return ret;
396 : : }
397 : :
398 : : /*
399 : : * Verify if a PostgreSQL binary (progname) is available in the same directory as
400 : : * pg_createsubscriber and it has the same version. It returns the absolute
401 : : * path of the progname.
402 : : */
403 : : static char *
404 : 20 : get_exec_path(const char *argv0, const char *progname)
405 : : {
406 : : char *versionstr;
407 : : char *exec_path;
408 : : int ret;
409 : :
410 : 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
411 : 20 : exec_path = pg_malloc(MAXPGPATH);
412 : 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
413 : :
414 [ - + ]: 20 : if (ret < 0)
415 : : {
416 : : char full_path[MAXPGPATH];
417 : :
720 peter@eisentraut.org 418 [ # # ]:UBC 0 : if (find_my_exec(argv0, full_path) < 0)
419 : 0 : strlcpy(full_path, progname, sizeof(full_path));
420 : :
421 [ # # ]: 0 : if (ret == -1)
422 : 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
423 : : progname, "pg_createsubscriber", full_path);
424 : : else
425 : 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
426 : : progname, full_path, "pg_createsubscriber");
427 : : }
428 : :
720 peter@eisentraut.org 429 [ + + ]:CBC 20 : pg_log_debug("%s path is: %s", progname, exec_path);
430 : :
431 : 20 : return exec_path;
432 : : }
433 : :
434 : : /*
435 : : * Is it a cluster directory? These are preliminary checks. It is far from
436 : : * making an accurate check. If it is not a clone from the publisher, it will
437 : : * eventually fail in a future step.
438 : : */
439 : : static void
440 : 10 : check_data_directory(const char *datadir)
441 : : {
442 : : struct stat statbuf;
443 : : uint32 major_version;
444 : : char *version_str;
445 : :
446 : 10 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
447 : : datadir);
448 : :
449 [ - + ]: 10 : if (stat(datadir, &statbuf) != 0)
450 : : {
720 peter@eisentraut.org 451 [ # # ]:UBC 0 : if (errno == ENOENT)
452 : 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
453 : : else
719 454 : 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
455 : : }
456 : :
457 : : /*
458 : : * Retrieve the contents of this cluster's PG_VERSION. We require
459 : : * compatibility with the same major version as the one this tool is
460 : : * compiled with.
461 : : */
151 michael@paquier.xyz 462 :GNC 10 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
463 [ - + ]: 10 : if (major_version != PG_MAJORVERSION_NUM)
464 : : {
151 michael@paquier.xyz 465 :UNC 0 : pg_log_error("data directory is of wrong version");
466 : 0 : pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
467 : : "PG_VERSION", version_str, PG_MAJORVERSION);
468 : 0 : exit(1);
469 : : }
720 peter@eisentraut.org 470 :CBC 10 : }
471 : :
472 : : /*
473 : : * Append database name into a base connection string.
474 : : *
475 : : * dbname is the only parameter that changes so it is not included in the base
476 : : * connection string. This function concatenates dbname to build a "real"
477 : : * connection string.
478 : : */
479 : : static char *
480 : 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
481 : : {
482 : 41 : PQExpBuffer buf = createPQExpBuffer();
483 : : char *ret;
484 : :
485 [ - + ]: 41 : Assert(conninfo != NULL);
486 : :
487 : 41 : appendPQExpBufferStr(buf, conninfo);
623 tgl@sss.pgh.pa.us 488 : 41 : appendConnStrItem(buf, "dbname", dbname);
489 : :
720 peter@eisentraut.org 490 : 41 : ret = pg_strdup(buf->data);
491 : 41 : destroyPQExpBuffer(buf);
492 : :
493 : 41 : return ret;
494 : : }
495 : :
496 : : /*
497 : : * Store publication and subscription information.
498 : : *
499 : : * If publication, replication slot and subscription names were specified,
500 : : * store it here. Otherwise, a generated name will be assigned to the object in
501 : : * setup_publisher().
502 : : */
503 : : static struct LogicalRepInfo *
504 : 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
505 : : const char *pub_base_conninfo,
506 : : const char *sub_base_conninfo)
507 : : {
508 : : struct LogicalRepInfo *dbinfo;
509 : 10 : SimpleStringListCell *pubcell = NULL;
510 : 10 : SimpleStringListCell *subcell = NULL;
511 : 10 : SimpleStringListCell *replslotcell = NULL;
512 : 10 : int i = 0;
513 : :
514 : 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
515 : :
516 [ + + ]: 10 : if (num_pubs > 0)
517 : 2 : pubcell = opt->pub_names.head;
518 [ + + ]: 10 : if (num_subs > 0)
519 : 1 : subcell = opt->sub_names.head;
520 [ + + ]: 10 : if (num_replslots > 0)
521 : 2 : replslotcell = opt->replslot_names.head;
522 : :
523 [ + + ]: 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
524 : : {
525 : : char *conninfo;
526 : :
527 : : /* Fill publisher attributes */
528 : 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
529 : 20 : dbinfo[i].pubconninfo = conninfo;
530 : 20 : dbinfo[i].dbname = cell->val;
531 [ + + ]: 20 : if (num_pubs > 0)
532 : 4 : dbinfo[i].pubname = pubcell->val;
533 : : else
534 : 16 : dbinfo[i].pubname = NULL;
535 [ + + ]: 20 : if (num_replslots > 0)
536 : 3 : dbinfo[i].replslotname = replslotcell->val;
537 : : else
538 : 17 : dbinfo[i].replslotname = NULL;
539 : 20 : dbinfo[i].made_replslot = false;
540 : 20 : dbinfo[i].made_publication = false;
541 : : /* Fill subscriber attributes */
542 : 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
543 : 20 : dbinfo[i].subconninfo = conninfo;
544 [ + + ]: 20 : if (num_subs > 0)
545 : 2 : dbinfo[i].subname = subcell->val;
546 : : else
547 : 18 : dbinfo[i].subname = NULL;
548 : : /* Other fields will be filled later */
549 : :
550 [ + + + - : 20 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ - ]
551 : : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
552 : : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
553 : : dbinfo[i].pubconninfo);
382 akapila@postgresql.o 554 [ + + + - : 20 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
- + ]
555 : : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
556 : : dbinfo[i].subconninfo,
557 : : dbinfos.two_phase ? "true" : "false");
558 : :
720 peter@eisentraut.org 559 [ + + ]: 20 : if (num_pubs > 0)
560 : 4 : pubcell = pubcell->next;
561 [ + + ]: 20 : if (num_subs > 0)
562 : 2 : subcell = subcell->next;
563 [ + + ]: 20 : if (num_replslots > 0)
564 : 3 : replslotcell = replslotcell->next;
565 : :
566 : 20 : i++;
567 : : }
568 : :
569 : 10 : return dbinfo;
570 : : }
571 : :
572 : : /*
573 : : * Open a new connection. If exit_on_error is true, it has an undesired
574 : : * condition and it should exit immediately.
575 : : */
576 : : static PGconn *
577 : 57 : connect_database(const char *conninfo, bool exit_on_error)
578 : : {
579 : : PGconn *conn;
580 : : PGresult *res;
581 : :
582 : 57 : conn = PQconnectdb(conninfo);
583 [ - + ]: 57 : if (PQstatus(conn) != CONNECTION_OK)
584 : : {
720 peter@eisentraut.org 585 :UBC 0 : pg_log_error("connection to database failed: %s",
586 : : PQerrorMessage(conn));
713 tgl@sss.pgh.pa.us 587 : 0 : PQfinish(conn);
588 : :
720 peter@eisentraut.org 589 [ # # ]: 0 : if (exit_on_error)
590 : 0 : exit(1);
591 : 0 : return NULL;
592 : : }
593 : :
594 : : /* Secure search_path */
720 peter@eisentraut.org 595 :CBC 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
596 [ - + ]: 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
597 : : {
557 michael@paquier.xyz 598 :UBC 0 : pg_log_error("could not clear \"search_path\": %s",
599 : : PQresultErrorMessage(res));
713 tgl@sss.pgh.pa.us 600 : 0 : PQclear(res);
601 : 0 : PQfinish(conn);
602 : :
720 peter@eisentraut.org 603 [ # # ]: 0 : if (exit_on_error)
604 : 0 : exit(1);
605 : 0 : return NULL;
606 : : }
720 peter@eisentraut.org 607 :CBC 57 : PQclear(res);
608 : :
609 : 57 : return conn;
610 : : }
611 : :
612 : : /*
613 : : * Close the connection. If exit_on_error is true, it has an undesired
614 : : * condition and it should exit immediately.
615 : : */
616 : : static void
617 : 57 : disconnect_database(PGconn *conn, bool exit_on_error)
618 : : {
619 [ - + ]: 57 : Assert(conn != NULL);
620 : :
621 : 57 : PQfinish(conn);
622 : :
623 [ + + ]: 57 : if (exit_on_error)
624 : 2 : exit(1);
625 : 55 : }
626 : :
627 : : /*
628 : : * Obtain the system identifier using the provided connection. It will be used
629 : : * to compare if a data directory is a clone of another one.
630 : : */
631 : : static uint64
632 : 10 : get_primary_sysid(const char *conninfo)
633 : : {
634 : : PGconn *conn;
635 : : PGresult *res;
636 : : uint64 sysid;
637 : :
638 : 10 : pg_log_info("getting system identifier from publisher");
639 : :
640 : 10 : conn = connect_database(conninfo, true);
641 : :
642 : 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
643 [ - + ]: 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
644 : : {
720 peter@eisentraut.org 645 :UBC 0 : pg_log_error("could not get system identifier: %s",
646 : : PQresultErrorMessage(res));
647 : 0 : disconnect_database(conn, true);
648 : : }
720 peter@eisentraut.org 649 [ - + ]:CBC 10 : if (PQntuples(res) != 1)
650 : : {
720 peter@eisentraut.org 651 :UBC 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
652 : : PQntuples(res), 1);
653 : 0 : disconnect_database(conn, true);
654 : : }
655 : :
720 peter@eisentraut.org 656 :CBC 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
657 : :
351 658 : 10 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
659 : :
720 660 : 10 : PQclear(res);
661 : 10 : disconnect_database(conn, false);
662 : :
663 : 10 : return sysid;
664 : : }
665 : :
666 : : /*
667 : : * Obtain the system identifier from control file. It will be used to compare
668 : : * if a data directory is a clone of another one. This routine is used locally
669 : : * and avoids a connection.
670 : : */
671 : : static uint64
672 : 10 : get_standby_sysid(const char *datadir)
673 : : {
674 : : ControlFileData *cf;
675 : : bool crc_ok;
676 : : uint64 sysid;
677 : :
678 : 10 : pg_log_info("getting system identifier from subscriber");
679 : :
680 : 10 : cf = get_controlfile(datadir, &crc_ok);
681 [ - + ]: 10 : if (!crc_ok)
720 peter@eisentraut.org 682 :UBC 0 : pg_fatal("control file appears to be corrupt");
683 : :
720 peter@eisentraut.org 684 :CBC 10 : sysid = cf->system_identifier;
685 : :
351 686 : 10 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
687 : :
720 688 : 10 : pg_free(cf);
689 : :
690 : 10 : return sysid;
691 : : }
692 : :
693 : : /*
694 : : * Modify the system identifier. Since a standby server preserves the system
695 : : * identifier, it makes sense to change it to avoid situations in which WAL
696 : : * files from one of the systems might be used in the other one.
697 : : */
698 : : static void
699 : 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
700 : : {
701 : : ControlFileData *cf;
702 : : bool crc_ok;
703 : : struct timeval tv;
704 : :
705 : : char *cmd_str;
706 : :
707 : 4 : pg_log_info("modifying system identifier of subscriber");
708 : :
709 : 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
710 [ - + ]: 4 : if (!crc_ok)
720 peter@eisentraut.org 711 :UBC 0 : pg_fatal("control file appears to be corrupt");
712 : :
713 : : /*
714 : : * Select a new system identifier.
715 : : *
716 : : * XXX this code was extracted from BootStrapXLOG().
717 : : */
720 peter@eisentraut.org 718 :CBC 4 : gettimeofday(&tv, NULL);
719 : 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
720 : 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
721 : 4 : cf->system_identifier |= getpid() & 0xFFF;
722 : :
135 alvherre@kurilemu.de 723 [ + + ]: 4 : if (dry_run)
724 : 3 : pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
725 : : cf->system_identifier);
726 : : else
727 : : {
720 peter@eisentraut.org 728 : 1 : update_controlfile(subscriber_dir, cf, true);
135 alvherre@kurilemu.de 729 : 1 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
730 : : cf->system_identifier);
731 : : }
732 : :
733 [ + + ]: 4 : if (dry_run)
734 : 3 : pg_log_info("dry-run: would run pg_resetwal on the subscriber");
735 : : else
736 : 1 : pg_log_info("running pg_resetwal on the subscriber");
737 : :
720 peter@eisentraut.org 738 : 4 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
739 : : subscriber_dir, DEVNULL);
740 : :
741 [ + + ]: 4 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
742 : :
743 [ + + ]: 4 : if (!dry_run)
744 : : {
745 : 1 : int rc = system(cmd_str);
746 : :
747 [ + - ]: 1 : if (rc == 0)
129 748 : 1 : pg_log_info("successfully reset WAL on the subscriber");
749 : : else
135 alvherre@kurilemu.de 750 :UBC 0 : pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
751 : : }
752 : :
720 peter@eisentraut.org 753 :CBC 4 : pg_free(cf);
754 : 4 : }
755 : :
756 : : /*
757 : : * Generate an object name using a prefix, database oid and a random integer.
758 : : * It is used in case the user does not specify an object name (publication,
759 : : * subscription, replication slot).
760 : : */
761 : : static char *
762 : 8 : generate_object_name(PGconn *conn)
763 : : {
764 : : PGresult *res;
765 : : Oid oid;
766 : : uint32 rand;
767 : : char *objname;
768 : :
769 : 8 : res = PQexec(conn,
770 : : "SELECT oid FROM pg_catalog.pg_database "
771 : : "WHERE datname = pg_catalog.current_database()");
772 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
773 : : {
720 peter@eisentraut.org 774 :UBC 0 : pg_log_error("could not obtain database OID: %s",
775 : : PQresultErrorMessage(res));
776 : 0 : disconnect_database(conn, true);
777 : : }
778 : :
720 peter@eisentraut.org 779 [ - + ]:CBC 8 : if (PQntuples(res) != 1)
780 : : {
719 peter@eisentraut.org 781 :UBC 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
782 : : PQntuples(res), 1);
720 783 : 0 : disconnect_database(conn, true);
784 : : }
785 : :
786 : : /* Database OID */
720 peter@eisentraut.org 787 :CBC 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
788 : :
789 : 8 : PQclear(res);
790 : :
791 : : /* Random unsigned integer */
792 : 8 : rand = pg_prng_uint32(&prng_state);
793 : :
794 : : /*
795 : : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
796 : : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
797 : : * '\0').
798 : : */
799 : 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
800 : :
801 : 8 : return objname;
802 : : }
803 : :
804 : : /*
805 : : * Does the publication exist in the specified database?
806 : : */
807 : : static bool
88 akapila@postgresql.o 808 :GNC 8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
809 : : {
810 : 8 : PQExpBuffer str = createPQExpBuffer();
811 : : PGresult *res;
812 : 8 : bool found = false;
813 : 8 : char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
814 : :
815 : 8 : appendPQExpBuffer(str,
816 : : "SELECT 1 FROM pg_catalog.pg_publication "
817 : : "WHERE pubname = %s",
818 : : pubname_esc);
819 : 8 : res = PQexec(conn, str->data);
820 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
821 : : {
88 akapila@postgresql.o 822 :UNC 0 : pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
823 : : pubname, dbname, PQerrorMessage(conn));
824 : 0 : disconnect_database(conn, true);
825 : : }
826 : :
88 akapila@postgresql.o 827 [ + + ]:GNC 8 : if (PQntuples(res) == 1)
828 : 1 : found = true;
829 : :
830 : 8 : PQclear(res);
831 : 8 : PQfreemem(pubname_esc);
832 : 8 : destroyPQExpBuffer(str);
833 : :
834 : 8 : return found;
835 : : }
836 : :
837 : : /*
838 : : * Create the publications and replication slots in preparation for logical
839 : : * replication. Returns the LSN from latest replication slot. It will be the
840 : : * replication start point that is used to adjust the subscriptions (see
841 : : * set_replication_progress).
842 : : */
843 : : static char *
720 peter@eisentraut.org 844 :CBC 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
845 : : {
846 : 4 : char *lsn = NULL;
847 : :
848 : 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
849 : :
850 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
851 : : {
852 : : PGconn *conn;
853 : 8 : char *genname = NULL;
854 : :
855 : 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
856 : :
857 : : /*
858 : : * If an object name was not specified as command-line options, assign
859 : : * a generated object name. The replication slot has a different rule.
860 : : * The subscription name is assigned to the replication slot name if
861 : : * no replication slot is specified. It follows the same rule as
862 : : * CREATE SUBSCRIPTION.
863 : : */
864 [ + + + + : 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+ - ]
865 : 8 : genname = generate_object_name(conn);
866 [ + + ]: 8 : if (num_pubs == 0)
867 : 4 : dbinfo[i].pubname = pg_strdup(genname);
868 [ + + ]: 8 : if (num_subs == 0)
869 : 6 : dbinfo[i].subname = pg_strdup(genname);
870 [ + + ]: 8 : if (num_replslots == 0)
871 : 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
872 : :
88 akapila@postgresql.o 873 [ + + ]:GNC 8 : if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
874 : : {
875 : : /* Reuse existing publication on publisher. */
876 : 1 : pg_log_info("use existing publication \"%s\" in database \"%s\"",
877 : : dbinfo[i].pubname, dbinfo[i].dbname);
878 : : /* Don't remove pre-existing publication if an error occurs. */
879 : 1 : dbinfo[i].made_publication = false;
880 : : }
881 : : else
882 : : {
883 : : /*
884 : : * Create publication on publisher. This step should be executed
885 : : * *before* promoting the subscriber to avoid any transactions
886 : : * between consistent LSN and the new publication rows (such
887 : : * transactions wouldn't see the new publication rows resulting in
888 : : * an error).
889 : : */
890 : 7 : create_publication(conn, &dbinfo[i]);
891 : : }
892 : :
893 : : /* Create replication slot on publisher */
720 peter@eisentraut.org 894 [ + + ]:CBC 8 : if (lsn)
895 : 1 : pg_free(lsn);
896 : 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
157 michael@paquier.xyz 897 [ + + - + ]:GNC 8 : if (lsn == NULL && !dry_run)
720 peter@eisentraut.org 898 :UBC 0 : exit(1);
899 : :
900 : : /*
901 : : * Since we are using the LSN returned by the last replication slot as
902 : : * recovery_target_lsn, this LSN is ahead of the current WAL position
903 : : * and the recovery waits until the publisher writes a WAL record to
904 : : * reach the target and ends the recovery. On idle systems, this wait
905 : : * time is unpredictable and could lead to failure in promoting the
906 : : * subscriber. To avoid that, insert a harmless WAL record.
907 : : */
593 akapila@postgresql.o 908 [ + + + + ]:CBC 8 : if (i == num_dbs - 1 && !dry_run)
909 : : {
910 : : PGresult *res;
911 : :
912 : 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
913 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
914 : : {
593 akapila@postgresql.o 915 :UBC 0 : pg_log_error("could not write an additional WAL record: %s",
916 : : PQresultErrorMessage(res));
917 : 0 : disconnect_database(conn, true);
918 : : }
593 akapila@postgresql.o 919 :CBC 1 : PQclear(res);
920 : : }
921 : :
720 peter@eisentraut.org 922 : 8 : disconnect_database(conn, false);
923 : : }
924 : :
925 : 4 : return lsn;
926 : : }
927 : :
928 : : /*
929 : : * Is recovery still in progress?
930 : : */
931 : : static bool
932 : 15 : server_is_in_recovery(PGconn *conn)
933 : : {
934 : : PGresult *res;
935 : : int ret;
936 : :
937 : 15 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
938 : :
939 [ - + ]: 15 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
940 : : {
720 peter@eisentraut.org 941 :UBC 0 : pg_log_error("could not obtain recovery progress: %s",
942 : : PQresultErrorMessage(res));
943 : 0 : disconnect_database(conn, true);
944 : : }
945 : :
946 : :
720 peter@eisentraut.org 947 :CBC 15 : ret = strcmp("t", PQgetvalue(res, 0, 0));
948 : :
949 : 15 : PQclear(res);
950 : :
951 : 15 : return ret == 0;
952 : : }
953 : :
954 : : /*
955 : : * Is the primary server ready for logical replication?
956 : : *
957 : : * XXX Does it not allow a synchronous replica?
958 : : */
959 : : static void
960 : 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
961 : : {
962 : : PGconn *conn;
963 : : PGresult *res;
964 : 6 : bool failed = false;
965 : :
966 : : char *wal_level;
967 : : int max_repslots;
968 : : int cur_repslots;
969 : : int max_walsenders;
970 : : int cur_walsenders;
971 : : int max_prepared_transactions;
972 : : char *max_slot_wal_keep_size;
973 : :
974 : 6 : pg_log_info("checking settings on publisher");
975 : :
976 : 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
977 : :
978 : : /*
979 : : * If the primary server is in recovery (i.e. cascading replication),
980 : : * objects (publication) cannot be created because it is read only.
981 : : */
982 [ + + ]: 6 : if (server_is_in_recovery(conn))
983 : : {
984 : 1 : pg_log_error("primary server cannot be in recovery");
985 : 1 : disconnect_database(conn, true);
986 : : }
987 : :
988 : : /*------------------------------------------------------------------------
989 : : * Logical replication requires a few parameters to be set on publisher.
990 : : * Since these parameters are not a requirement for physical replication,
991 : : * we should check it to make sure it won't fail.
992 : : *
993 : : * - wal_level >= replica
994 : : * - max_replication_slots >= current + number of dbs to be converted
995 : : * - max_wal_senders >= current + number of dbs to be converted
996 : : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
997 : : * -----------------------------------------------------------------------
998 : : */
999 : 5 : res = PQexec(conn,
1000 : : "SELECT pg_catalog.current_setting('wal_level'),"
1001 : : " pg_catalog.current_setting('max_replication_slots'),"
1002 : : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1003 : : " pg_catalog.current_setting('max_wal_senders'),"
1004 : : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1005 : : " pg_catalog.current_setting('max_prepared_transactions'),"
1006 : : " pg_catalog.current_setting('max_slot_wal_keep_size')");
1007 : :
1008 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1009 : : {
720 peter@eisentraut.org 1010 :UBC 0 : pg_log_error("could not obtain publisher settings: %s",
1011 : : PQresultErrorMessage(res));
1012 : 0 : disconnect_database(conn, true);
1013 : : }
1014 : :
720 peter@eisentraut.org 1015 :CBC 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1016 : 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
1017 : 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
1018 : 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
1019 : 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
623 tgl@sss.pgh.pa.us 1020 : 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
390 akapila@postgresql.o 1021 : 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1022 : :
720 peter@eisentraut.org 1023 : 5 : PQclear(res);
1024 : :
1025 [ + + ]: 5 : pg_log_debug("publisher: wal_level: %s", wal_level);
1026 [ + + ]: 5 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1027 [ + + ]: 5 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1028 [ + + ]: 5 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1029 [ + + ]: 5 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
623 tgl@sss.pgh.pa.us 1030 [ + + ]: 5 : pg_log_debug("publisher: max_prepared_transactions: %d",
1031 : : max_prepared_transactions);
390 akapila@postgresql.o 1032 [ + + ]: 5 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1033 : : max_slot_wal_keep_size);
1034 : :
720 peter@eisentraut.org 1035 : 5 : disconnect_database(conn, false);
1036 : :
82 msawada@postgresql.o 1037 [ - + ]:GNC 5 : if (strcmp(wal_level, "minimal") == 0)
1038 : : {
82 msawada@postgresql.o 1039 :UNC 0 : pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
720 peter@eisentraut.org 1040 :LBC (1) : failed = true;
1041 : : }
1042 : :
720 peter@eisentraut.org 1043 [ + + ]:CBC 5 : if (max_repslots - cur_repslots < num_dbs)
1044 : : {
1045 : 1 : pg_log_error("publisher requires %d replication slots, but only %d remain",
1046 : : num_dbs, max_repslots - cur_repslots);
597 1047 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1048 : : "max_replication_slots", cur_repslots + num_dbs);
720 1049 : 1 : failed = true;
1050 : : }
1051 : :
1052 [ + + ]: 5 : if (max_walsenders - cur_walsenders < num_dbs)
1053 : : {
568 1054 : 1 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1055 : : num_dbs, max_walsenders - cur_walsenders);
597 1056 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1057 : : "max_wal_senders", cur_walsenders + num_dbs);
720 1058 : 1 : failed = true;
1059 : : }
1060 : :
382 akapila@postgresql.o 1061 [ - + - - ]: 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1062 : : {
568 peter@eisentraut.org 1063 :UBC 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
623 tgl@sss.pgh.pa.us 1064 : 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1065 : : "Prepared transactions will be replicated at COMMIT PREPARED.");
272 peter@eisentraut.org 1066 : 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1067 : : }
1068 : :
1069 : : /*
1070 : : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1071 : : * is set to a non-default value, it may cause replication failures due to
1072 : : * required WAL files being prematurely removed.
1073 : : */
390 akapila@postgresql.o 1074 [ + + - + ]:CBC 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1075 : : {
390 akapila@postgresql.o 1076 :UBC 0 : pg_log_warning("required WAL could be removed from the publisher");
1077 : 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1078 : : "max_slot_wal_keep_size");
1079 : : }
1080 : :
713 tgl@sss.pgh.pa.us 1081 :CBC 5 : pg_free(wal_level);
1082 : :
720 peter@eisentraut.org 1083 [ + + ]: 5 : if (failed)
1084 : 1 : exit(1);
1085 : 4 : }
1086 : :
1087 : : /*
1088 : : * Is the standby server ready for logical replication?
1089 : : *
1090 : : * XXX Does it not allow a time-delayed replica?
1091 : : *
1092 : : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1093 : : * is S, it cannot detect there is a replica (server C) because server S starts
1094 : : * accepting only local connections and server C cannot connect to it. Hence,
1095 : : * there is not a reliable way to provide a suitable error saying the server C
1096 : : * will be broken at the end of this process (due to pg_resetwal).
1097 : : */
1098 : : static void
1099 : 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1100 : : {
1101 : : PGconn *conn;
1102 : : PGresult *res;
1103 : 8 : bool failed = false;
1104 : :
1105 : : int max_lrworkers;
1106 : : int max_replorigins;
1107 : : int max_wprocs;
1108 : :
1109 : 8 : pg_log_info("checking settings on subscriber");
1110 : :
1111 : 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1112 : :
1113 : : /* The target server must be a standby */
1114 [ + + ]: 8 : if (!server_is_in_recovery(conn))
1115 : : {
1116 : 1 : pg_log_error("target server must be a standby");
1117 : 1 : disconnect_database(conn, true);
1118 : : }
1119 : :
1120 : : /*------------------------------------------------------------------------
1121 : : * Logical replication requires a few parameters to be set on subscriber.
1122 : : * Since these parameters are not a requirement for physical replication,
1123 : : * we should check it to make sure it won't fail.
1124 : : *
1125 : : * - max_active_replication_origins >= number of dbs to be converted
1126 : : * - max_logical_replication_workers >= number of dbs to be converted
1127 : : * - max_worker_processes >= 1 + number of dbs to be converted
1128 : : *------------------------------------------------------------------------
1129 : : */
1130 : 7 : res = PQexec(conn,
1131 : : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1132 : : "'max_logical_replication_workers', "
1133 : : "'max_active_replication_origins', "
1134 : : "'max_worker_processes', "
1135 : : "'primary_slot_name') "
1136 : : "ORDER BY name");
1137 : :
1138 [ - + ]: 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1139 : : {
720 peter@eisentraut.org 1140 :UBC 0 : pg_log_error("could not obtain subscriber settings: %s",
1141 : : PQresultErrorMessage(res));
1142 : 0 : disconnect_database(conn, true);
1143 : : }
1144 : :
46 msawada@postgresql.o 1145 :GNC 7 : max_replorigins = atoi(PQgetvalue(res, 0, 0));
359 msawada@postgresql.o 1146 :CBC 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
720 peter@eisentraut.org 1147 : 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1148 [ + + ]: 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1149 : 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1150 : :
1151 [ + + ]: 7 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1152 : : max_lrworkers);
46 msawada@postgresql.o 1153 [ + + ]:GNC 7 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
720 peter@eisentraut.org 1154 [ + + ]:CBC 7 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1155 [ + + ]: 7 : if (primary_slot_name)
1156 [ + + ]: 6 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1157 : :
1158 : 7 : PQclear(res);
1159 : :
1160 : 7 : disconnect_database(conn, false);
1161 : :
46 msawada@postgresql.o 1162 [ + + ]:GNC 7 : if (max_replorigins < num_dbs)
1163 : : {
359 msawada@postgresql.o 1164 :CBC 1 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1165 : : num_dbs, max_replorigins);
597 peter@eisentraut.org 1166 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1167 : : "max_active_replication_origins", num_dbs);
720 1168 : 1 : failed = true;
1169 : : }
1170 : :
1171 [ + + ]: 7 : if (max_lrworkers < num_dbs)
1172 : : {
1173 : 1 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1174 : : num_dbs, max_lrworkers);
597 1175 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1176 : : "max_logical_replication_workers", num_dbs);
720 1177 : 1 : failed = true;
1178 : : }
1179 : :
1180 [ + + ]: 7 : if (max_wprocs < num_dbs + 1)
1181 : : {
1182 : 1 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1183 : : num_dbs + 1, max_wprocs);
597 1184 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1185 : : "max_worker_processes", num_dbs + 1);
720 1186 : 1 : failed = true;
1187 : : }
1188 : :
1189 [ + + ]: 7 : if (failed)
1190 : 1 : exit(1);
1191 : 6 : }
1192 : :
1193 : : /*
1194 : : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1195 : : * the primary (publisher node) and the newly created subscriber. We
1196 : : * shouldn't drop the associated slot as that would be used by the publisher
1197 : : * node.
1198 : : */
1199 : : static void
135 alvherre@kurilemu.de 1200 :GNC 4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1201 : : {
621 akapila@postgresql.o 1202 :CBC 4 : PQExpBuffer query = createPQExpBuffer();
1203 : : PGresult *res;
1204 : :
1205 [ - + ]: 4 : Assert(conn != NULL);
1206 : :
1207 : : /*
1208 : : * Construct a query string. These commands are allowed to be executed
1209 : : * within a transaction.
1210 : : */
1211 : 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1212 : : subname);
1213 : 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1214 : : subname);
1215 : 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1216 : :
135 alvherre@kurilemu.de 1217 [ + + ]: 4 : if (dry_run)
1218 : 3 : pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1219 : : subname, dbname);
1220 : : else
1221 : : {
1222 : 1 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1223 : : subname, dbname);
1224 : :
621 akapila@postgresql.o 1225 : 1 : res = PQexec(conn, query->data);
1226 : :
1227 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1228 : : {
590 alvherre@alvh.no-ip. 1229 :UBC 0 : pg_log_error("could not drop subscription \"%s\": %s",
1230 : : subname, PQresultErrorMessage(res));
621 akapila@postgresql.o 1231 : 0 : disconnect_database(conn, true);
1232 : : }
1233 : :
621 akapila@postgresql.o 1234 :CBC 1 : PQclear(res);
1235 : : }
1236 : :
1237 : 4 : destroyPQExpBuffer(query);
1238 : 4 : }
1239 : :
1240 : : /*
1241 : : * Retrieve and drop the pre-existing subscriptions.
1242 : : */
1243 : : static void
1244 : 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1245 : : const struct LogicalRepInfo *dbinfo)
1246 : : {
1247 : 8 : PQExpBuffer query = createPQExpBuffer();
1248 : : char *dbname;
1249 : : PGresult *res;
1250 : :
1251 [ - + ]: 8 : Assert(conn != NULL);
1252 : :
1253 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1254 : :
1255 : 8 : appendPQExpBuffer(query,
1256 : : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1257 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1258 : : "WHERE d.datname = %s",
1259 : : dbname);
1260 : 8 : res = PQexec(conn, query->data);
1261 : :
1262 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1263 : : {
621 akapila@postgresql.o 1264 :UBC 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1265 : : PQresultErrorMessage(res));
1266 : 0 : disconnect_database(conn, true);
1267 : : }
1268 : :
621 akapila@postgresql.o 1269 [ + + ]:CBC 12 : for (int i = 0; i < PQntuples(res); i++)
135 alvherre@kurilemu.de 1270 :GNC 4 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1271 : 4 : dbinfo->dbname);
1272 : :
621 akapila@postgresql.o 1273 :CBC 8 : PQclear(res);
1274 : 8 : destroyPQExpBuffer(query);
396 michael@paquier.xyz 1275 : 8 : PQfreemem(dbname);
621 akapila@postgresql.o 1276 : 8 : }
1277 : :
1278 : : /*
1279 : : * Create the subscriptions, adjust the initial location for logical
1280 : : * replication and enable the subscriptions. That's the last step for logical
1281 : : * replication setup.
1282 : : */
1283 : : static void
720 peter@eisentraut.org 1284 : 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1285 : : {
1286 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
1287 : : {
1288 : : PGconn *conn;
1289 : :
1290 : : /* Connect to subscriber. */
1291 : 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1292 : :
1293 : : /*
1294 : : * We don't need the pre-existing subscriptions on the newly formed
1295 : : * subscriber. They can connect to other publisher nodes and either
1296 : : * get some unwarranted data or can lead to ERRORs in connecting to
1297 : : * such nodes.
1298 : : */
621 akapila@postgresql.o 1299 : 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1300 : :
1301 : : /* Check and drop the required publications in the given database. */
360 1302 : 8 : check_and_drop_publications(conn, &dbinfo[i]);
1303 : :
720 peter@eisentraut.org 1304 : 8 : create_subscription(conn, &dbinfo[i]);
1305 : :
1306 : : /* Set the replication progress to the correct LSN */
1307 : 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1308 : :
1309 : : /* Enable subscription */
1310 : 8 : enable_subscription(conn, &dbinfo[i]);
1311 : :
1312 : 8 : disconnect_database(conn, false);
1313 : : }
1314 : 4 : }
1315 : :
1316 : : /*
1317 : : * Write the required recovery parameters.
1318 : : */
1319 : : static void
1320 : 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1321 : : {
1322 : : PGconn *conn;
1323 : : PQExpBuffer recoveryconfcontents;
1324 : :
1325 : : /*
1326 : : * Despite of the recovery parameters will be written to the subscriber,
1327 : : * use a publisher connection. The primary_conninfo is generated using the
1328 : : * connection settings.
1329 : : */
1330 : 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1331 : :
1332 : : /*
1333 : : * Write recovery parameters.
1334 : : *
1335 : : * The subscriber is not running yet. In dry run mode, the recovery
1336 : : * parameters *won't* be written. An invalid LSN is used for printing
1337 : : * purposes. Additional recovery parameters are added here. It avoids
1338 : : * unexpected behavior such as end of recovery as soon as a consistent
1339 : : * state is reached (recovery_target) and failure due to multiple recovery
1340 : : * targets (name, time, xid, LSN).
1341 : : */
1342 : 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
331 drowley@postgresql.o 1343 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1344 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1345 : : "recovery_target_timeline = 'latest'\n");
1346 : :
1347 : : /*
1348 : : * Set recovery_target_inclusive = false to avoid reapplying the
1349 : : * transaction committed at 'lsn' after subscription is enabled. This is
1350 : : * because the provided 'lsn' is also used as the replication start point
1351 : : * for the subscription. So, the server can send the transaction committed
1352 : : * at that 'lsn' after replication is started which can lead to applying
1353 : : * the same transaction twice if we keep recovery_target_inclusive = true.
1354 : : */
1355 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1356 : : "recovery_target_inclusive = false\n");
1357 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1358 : : "recovery_target_action = promote\n");
1359 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1360 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1361 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1362 : :
720 peter@eisentraut.org 1363 [ + + ]: 4 : if (dry_run)
1364 : : {
135 alvherre@kurilemu.de 1365 : 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
720 peter@eisentraut.org 1366 : 3 : appendPQExpBuffer(recoveryconfcontents,
1367 : : "recovery_target_lsn = '%X/%08X'\n",
720 peter@eisentraut.org 1368 :ECB (3) : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1369 : : }
1370 : : else
1371 : : {
720 peter@eisentraut.org 1372 :CBC 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1373 : : lsn);
1374 : : }
1375 : :
1376 [ + + ]: 4 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1377 : :
66 michael@paquier.xyz 1378 [ + + ]:GNC 4 : if (!dry_run)
1379 : : {
1380 : : char conf_filename[MAXPGPATH];
1381 : : FILE *fd;
1382 : :
1383 : : /* Write the recovery parameters to INCLUDED_CONF_FILE */
1384 : 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1385 : : INCLUDED_CONF_FILE);
1386 : 1 : fd = fopen(conf_filename, "w");
1387 [ - + ]: 1 : if (fd == NULL)
66 michael@paquier.xyz 1388 :UNC 0 : pg_fatal("could not open file \"%s\": %m", conf_filename);
1389 : :
66 michael@paquier.xyz 1390 [ - + ]:GNC 1 : if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
66 michael@paquier.xyz 1391 :UNC 0 : pg_fatal("could not write to file \"%s\": %m", conf_filename);
1392 : :
66 michael@paquier.xyz 1393 :GNC 1 : fclose(fd);
1394 : 1 : recovery_params_set = true;
1395 : :
1396 : : /* Include conditionally the recovery parameters. */
1397 : 1 : resetPQExpBuffer(recoveryconfcontents);
1398 : 1 : appendPQExpBufferStr(recoveryconfcontents,
1399 : : "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1400 : 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1401 : : }
1402 : :
1403 : 4 : disconnect_database(conn, false);
720 peter@eisentraut.org 1404 :CBC 4 : }
1405 : :
1406 : : /*
1407 : : * Drop physical replication slot on primary if the standby was using it. After
1408 : : * the transformation, it has no use.
1409 : : *
1410 : : * XXX we might not fail here. Instead, we provide a warning so the user
1411 : : * eventually drops this replication slot later.
1412 : : */
1413 : : static void
1414 : 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1415 : : {
1416 : : PGconn *conn;
1417 : :
1418 : : /* Replication slot does not exist, do nothing */
1419 [ - + ]: 4 : if (!primary_slot_name)
720 peter@eisentraut.org 1420 :UBC 0 : return;
1421 : :
720 peter@eisentraut.org 1422 :CBC 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1423 [ + - ]: 4 : if (conn != NULL)
1424 : : {
1425 : 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1426 : 4 : disconnect_database(conn, false);
1427 : : }
1428 : : else
1429 : : {
720 peter@eisentraut.org 1430 :UBC 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1431 : : slotname);
1432 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1433 : : }
1434 : : }
1435 : :
1436 : : /*
1437 : : * Drop failover replication slots on subscriber. After the transformation,
1438 : : * they have no use.
1439 : : *
1440 : : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1441 : : * them later.
1442 : : */
1443 : : static void
636 peter@eisentraut.org 1444 :CBC 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1445 : : {
1446 : : PGconn *conn;
1447 : : PGresult *res;
1448 : :
1449 : 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1450 [ + - ]: 4 : if (conn != NULL)
1451 : : {
1452 : : /* Get failover replication slot names */
1453 : 4 : res = PQexec(conn,
1454 : : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1455 : :
1456 [ + - ]: 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1457 : : {
1458 : : /* Remove failover replication slots from subscriber */
1459 [ + + ]: 8 : for (int i = 0; i < PQntuples(res); i++)
1460 : 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1461 : : }
1462 : : else
1463 : : {
636 peter@eisentraut.org 1464 :UBC 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1465 : : PQresultErrorMessage(res));
1466 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1467 : : }
1468 : :
636 peter@eisentraut.org 1469 :CBC 4 : PQclear(res);
1470 : 4 : disconnect_database(conn, false);
1471 : : }
1472 : : else
1473 : : {
636 peter@eisentraut.org 1474 :UBC 0 : pg_log_warning("could not drop failover replication slot");
1475 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1476 : : }
636 peter@eisentraut.org 1477 :CBC 4 : }
1478 : :
1479 : : /*
1480 : : * Create a logical replication slot and returns a LSN.
1481 : : *
1482 : : * CreateReplicationSlot() is not used because it does not provide the one-row
1483 : : * result set that contains the LSN.
1484 : : */
1485 : : static char *
720 1486 : 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1487 : : {
1488 : 8 : PQExpBuffer str = createPQExpBuffer();
1489 : 8 : PGresult *res = NULL;
1490 : 8 : const char *slot_name = dbinfo->replslotname;
1491 : : char *slot_name_esc;
1492 : 8 : char *lsn = NULL;
1493 : :
1494 [ - + ]: 8 : Assert(conn != NULL);
1495 : :
135 alvherre@kurilemu.de 1496 [ + + ]: 8 : if (dry_run)
1497 : 6 : pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1498 : : slot_name, dbinfo->dbname);
1499 : : else
1500 : 2 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1501 : : slot_name, dbinfo->dbname);
1502 : :
720 peter@eisentraut.org 1503 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1504 : :
1505 : 8 : appendPQExpBuffer(str,
1506 : : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1507 : : slot_name_esc,
382 akapila@postgresql.o 1508 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
1509 : :
396 michael@paquier.xyz 1510 : 8 : PQfreemem(slot_name_esc);
1511 : :
720 peter@eisentraut.org 1512 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1513 : :
1514 [ + + ]: 8 : if (!dry_run)
1515 : : {
1516 : 2 : res = PQexec(conn, str->data);
1517 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1518 : : {
598 peter@eisentraut.org 1519 :UBC 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1520 : : slot_name, dbinfo->dbname,
1521 : : PQresultErrorMessage(res));
713 tgl@sss.pgh.pa.us 1522 : 0 : PQclear(res);
1523 : 0 : destroyPQExpBuffer(str);
720 peter@eisentraut.org 1524 : 0 : return NULL;
1525 : : }
1526 : :
720 peter@eisentraut.org 1527 :CBC 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1528 : 2 : PQclear(res);
1529 : : }
1530 : :
1531 : : /* For cleanup purposes */
1532 : 8 : dbinfo->made_replslot = true;
1533 : :
1534 : 8 : destroyPQExpBuffer(str);
1535 : :
1536 : 8 : return lsn;
1537 : : }
1538 : :
1539 : : static void
1540 : 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1541 : : const char *slot_name)
1542 : : {
1543 : 8 : PQExpBuffer str = createPQExpBuffer();
1544 : : char *slot_name_esc;
1545 : : PGresult *res;
1546 : :
1547 [ - + ]: 8 : Assert(conn != NULL);
1548 : :
135 alvherre@kurilemu.de 1549 [ + + ]: 8 : if (dry_run)
1550 : 6 : pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1551 : : slot_name, dbinfo->dbname);
1552 : : else
1553 : 2 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1554 : : slot_name, dbinfo->dbname);
1555 : :
720 peter@eisentraut.org 1556 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1557 : :
1558 : 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1559 : :
396 michael@paquier.xyz 1560 : 8 : PQfreemem(slot_name_esc);
1561 : :
720 peter@eisentraut.org 1562 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1563 : :
1564 [ + + ]: 8 : if (!dry_run)
1565 : : {
1566 : 2 : res = PQexec(conn, str->data);
1567 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1568 : : {
598 peter@eisentraut.org 1569 :UBC 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1570 : : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
720 1571 : 0 : dbinfo->made_replslot = false; /* don't try again. */
1572 : : }
1573 : :
720 peter@eisentraut.org 1574 :CBC 2 : PQclear(res);
1575 : : }
1576 : :
1577 : 8 : destroyPQExpBuffer(str);
1578 : 8 : }
1579 : :
1580 : : /*
1581 : : * Reports a suitable message if pg_ctl fails.
1582 : : */
1583 : : static void
1584 : 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1585 : : {
1586 [ - + ]: 24 : if (rc != 0)
1587 : : {
720 peter@eisentraut.org 1588 [ # # ]:UBC 0 : if (WIFEXITED(rc))
1589 : : {
1590 : 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1591 : : }
1592 [ # # ]: 0 : else if (WIFSIGNALED(rc))
1593 : : {
1594 : : #if defined(WIN32)
1595 : : pg_log_error("pg_ctl was terminated by exception 0x%X",
1596 : : WTERMSIG(rc));
1597 : : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1598 : : #else
1599 : 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1600 : : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1601 : : #endif
1602 : : }
1603 : : else
1604 : : {
1605 : 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1606 : : }
1607 : :
1608 : 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1609 : 0 : exit(1);
1610 : : }
720 peter@eisentraut.org 1611 :CBC 24 : }
1612 : :
1613 : : static void
621 akapila@postgresql.o 1614 : 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1615 : : bool restrict_logical_worker)
1616 : : {
720 peter@eisentraut.org 1617 : 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1618 : : int rc;
1619 : :
623 tgl@sss.pgh.pa.us 1620 : 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1621 : 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
331 drowley@postgresql.o 1622 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1623 : :
1624 : : /* Prevent unintended slot invalidation */
332 1625 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1626 : :
720 peter@eisentraut.org 1627 [ + - ]: 12 : if (restricted_access)
1628 : : {
1629 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1630 : : #if !defined(WIN32)
1631 : :
1632 : : /*
1633 : : * An empty listen_addresses list means the server does not listen on
1634 : : * any IP interfaces; only Unix-domain sockets can be used to connect
1635 : : * to the server. Prevent external connections to minimize the chance
1636 : : * of failure.
1637 : : */
1638 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1639 [ + - ]: 12 : if (opt->socket_dir)
1640 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1641 : 12 : opt->socket_dir);
1642 : 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1643 : : #endif
1644 : : }
1645 [ - + ]: 12 : if (opt->config_file != NULL)
720 peter@eisentraut.org 1646 :UBC 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1647 : 0 : opt->config_file);
1648 : :
1649 : : /* Suppress to start logical replication if requested */
621 akapila@postgresql.o 1650 [ + + ]:CBC 12 : if (restrict_logical_worker)
332 drowley@postgresql.o 1651 : 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1652 : :
720 peter@eisentraut.org 1653 [ + + ]: 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1654 : 12 : rc = system(pg_ctl_cmd->data);
1655 : 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1656 : 12 : standby_running = true;
1657 : 12 : destroyPQExpBuffer(pg_ctl_cmd);
1658 : 12 : pg_log_info("server was started");
1659 : 12 : }
1660 : :
1661 : : static void
1662 : 12 : stop_standby_server(const char *datadir)
1663 : : {
1664 : : char *pg_ctl_cmd;
1665 : : int rc;
1666 : :
1667 : 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1668 : : datadir);
1669 [ + + ]: 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1670 : 12 : rc = system(pg_ctl_cmd);
1671 : 12 : pg_ctl_status(pg_ctl_cmd, rc);
1672 : 12 : standby_running = false;
1673 : 12 : pg_log_info("server was stopped");
1674 : 12 : }
1675 : :
1676 : : /*
1677 : : * Returns after the server finishes the recovery process.
1678 : : *
1679 : : * If recovery_timeout option is set, terminate abnormally without finishing
1680 : : * the recovery process. By default, it waits forever.
1681 : : *
1682 : : * XXX Is the recovery process still in progress? When recovery process has a
1683 : : * better progress reporting mechanism, it should be added here.
1684 : : */
1685 : : static void
1686 : 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1687 : : {
1688 : : PGconn *conn;
132 alvherre@kurilemu.de 1689 :GNC 4 : bool ready = false;
720 peter@eisentraut.org 1690 :CBC 4 : int timer = 0;
1691 : :
1692 : 4 : pg_log_info("waiting for the target server to reach the consistent state");
1693 : :
1694 : 4 : conn = connect_database(conninfo, true);
1695 : :
1696 : : for (;;)
1697 : : {
1698 : : /* Did the recovery process finish? We're done if so. */
135 alvherre@kurilemu.de 1699 [ + + + - ]: 4 : if (dry_run || !server_is_in_recovery(conn))
1700 : : {
132 alvherre@kurilemu.de 1701 :GNC 4 : ready = true;
720 peter@eisentraut.org 1702 :CBC 4 : recovery_ended = true;
1703 : 4 : break;
1704 : : }
1705 : :
1706 : : /* Bail out after recovery_timeout seconds if this option is set */
720 peter@eisentraut.org 1707 [ # # # # ]:UBC 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1708 : : {
1709 : 0 : stop_standby_server(subscriber_dir);
1710 : 0 : pg_log_error("recovery timed out");
1711 : 0 : disconnect_database(conn, true);
1712 : : }
1713 : :
1714 : : /* Keep waiting */
131 alvherre@kurilemu.de 1715 :UNC 0 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
720 peter@eisentraut.org 1716 :UBC 0 : timer += WAIT_INTERVAL;
1717 : : }
1718 : :
720 peter@eisentraut.org 1719 :CBC 4 : disconnect_database(conn, false);
1720 : :
132 alvherre@kurilemu.de 1721 [ - + ]:GNC 4 : if (!ready)
720 peter@eisentraut.org 1722 :UBC 0 : pg_fatal("server did not end recovery");
1723 : :
720 peter@eisentraut.org 1724 :CBC 4 : pg_log_info("target server reached the consistent state");
1725 : 4 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1726 : 4 : }
1727 : :
1728 : : /*
1729 : : * Create a publication that includes all tables in the database.
1730 : : */
1731 : : static void
1732 : 7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1733 : : {
1734 : 7 : PQExpBuffer str = createPQExpBuffer();
1735 : : PGresult *res;
1736 : : char *ipubname_esc;
1737 : : char *spubname_esc;
1738 : :
1739 [ - + ]: 7 : Assert(conn != NULL);
1740 : :
1741 : 7 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1742 : 7 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1743 : :
1744 : : /* Check if the publication already exists */
1745 : 7 : appendPQExpBuffer(str,
1746 : : "SELECT 1 FROM pg_catalog.pg_publication "
1747 : : "WHERE pubname = %s",
1748 : : spubname_esc);
1749 : 7 : res = PQexec(conn, str->data);
1750 [ - + ]: 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1751 : : {
720 peter@eisentraut.org 1752 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1753 : : PQresultErrorMessage(res));
1754 : 0 : disconnect_database(conn, true);
1755 : : }
1756 : :
720 peter@eisentraut.org 1757 [ - + ]:CBC 7 : if (PQntuples(res) == 1)
1758 : : {
1759 : : /*
1760 : : * Unfortunately, if it reaches this code path, it will always fail
1761 : : * (unless you decide to change the existing publication name). That's
1762 : : * bad but it is very unlikely that the user will choose a name with
1763 : : * pg_createsubscriber_ prefix followed by the exact database oid and
1764 : : * a random number.
1765 : : */
720 peter@eisentraut.org 1766 :UBC 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1767 : 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1768 : 0 : disconnect_database(conn, true);
1769 : : }
1770 : :
720 peter@eisentraut.org 1771 :CBC 7 : PQclear(res);
1772 : 7 : resetPQExpBuffer(str);
1773 : :
135 alvherre@kurilemu.de 1774 [ + + ]: 7 : if (dry_run)
1775 : 6 : pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1776 : : dbinfo->pubname, dbinfo->dbname);
1777 : : else
1778 : 1 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1779 : : dbinfo->pubname, dbinfo->dbname);
1780 : :
720 peter@eisentraut.org 1781 : 7 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1782 : : ipubname_esc);
1783 : :
1784 [ + + ]: 7 : pg_log_debug("command is: %s", str->data);
1785 : :
1786 [ + + ]: 7 : if (!dry_run)
1787 : : {
1788 : 1 : res = PQexec(conn, str->data);
1789 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1790 : : {
598 peter@eisentraut.org 1791 :UBC 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1792 : : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
720 1793 : 0 : disconnect_database(conn, true);
1794 : : }
720 peter@eisentraut.org 1795 :CBC 1 : PQclear(res);
1796 : : }
1797 : :
1798 : : /* For cleanup purposes */
1799 : 7 : dbinfo->made_publication = true;
1800 : :
396 michael@paquier.xyz 1801 : 7 : PQfreemem(ipubname_esc);
1802 : 7 : PQfreemem(spubname_esc);
720 peter@eisentraut.org 1803 : 7 : destroyPQExpBuffer(str);
1804 : 7 : }
1805 : :
1806 : : /*
1807 : : * Drop the specified publication in the given database.
1808 : : */
1809 : : static void
360 akapila@postgresql.o 1810 : 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1811 : : bool *made_publication)
1812 : : {
720 peter@eisentraut.org 1813 : 10 : PQExpBuffer str = createPQExpBuffer();
1814 : : PGresult *res;
1815 : : char *pubname_esc;
1816 : :
1817 [ - + ]: 10 : Assert(conn != NULL);
1818 : :
360 akapila@postgresql.o 1819 : 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1820 : :
135 alvherre@kurilemu.de 1821 [ + + ]: 10 : if (dry_run)
1822 : 6 : pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1823 : : pubname, dbname);
1824 : : else
1825 : 4 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1826 : : pubname, dbname);
1827 : :
720 peter@eisentraut.org 1828 : 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1829 : :
396 michael@paquier.xyz 1830 : 10 : PQfreemem(pubname_esc);
1831 : :
720 peter@eisentraut.org 1832 [ + + ]: 10 : pg_log_debug("command is: %s", str->data);
1833 : :
1834 [ + + ]: 10 : if (!dry_run)
1835 : : {
1836 : 4 : res = PQexec(conn, str->data);
1837 [ - + ]: 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1838 : : {
598 peter@eisentraut.org 1839 :UBC 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1840 : : pubname, dbname, PQresultErrorMessage(res));
360 akapila@postgresql.o 1841 : 0 : *made_publication = false; /* don't try again. */
1842 : :
1843 : : /*
1844 : : * Don't disconnect and exit here. This routine is used by primary
1845 : : * (cleanup publication / replication slot due to an error) and
1846 : : * subscriber (remove the replicated publications). In both cases,
1847 : : * it can continue and provide instructions for the user to remove
1848 : : * it later if cleanup fails.
1849 : : */
1850 : : }
720 peter@eisentraut.org 1851 :CBC 4 : PQclear(res);
1852 : : }
1853 : :
1854 : 10 : destroyPQExpBuffer(str);
1855 : 10 : }
1856 : :
1857 : : /*
1858 : : * Retrieve and drop the publications.
1859 : : *
1860 : : * Publications copied during physical replication remain on the subscriber
1861 : : * after promotion. If --clean=publications is specified, drop all existing
1862 : : * publications in the subscriber database. Otherwise, only drop publications
1863 : : * that were created by pg_createsubscriber during this operation.
1864 : : */
1865 : : static void
360 akapila@postgresql.o 1866 : 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1867 : : {
1868 : : PGresult *res;
263 peter@eisentraut.org 1869 : 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1870 : :
360 akapila@postgresql.o 1871 [ - + ]: 8 : Assert(conn != NULL);
1872 : :
1873 [ + + ]: 8 : if (drop_all_pubs)
1874 : : {
1875 : 2 : pg_log_info("dropping all existing publications in database \"%s\"",
1876 : : dbinfo->dbname);
1877 : :
1878 : : /* Fetch all publication names */
1879 : 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1880 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1881 : : {
360 akapila@postgresql.o 1882 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1883 : : PQresultErrorMessage(res));
1884 : 0 : PQclear(res);
1885 : 0 : disconnect_database(conn, true);
1886 : : }
1887 : :
1888 : : /* Drop each publication */
152 msawada@postgresql.o 1889 [ + + ]:CBC 6 : for (int i = 0; i < PQntuples(res); i++)
1890 : 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1891 : : &dbinfo->made_publication);
1892 : :
360 akapila@postgresql.o 1893 : 2 : PQclear(res);
1894 : : }
1895 : : else
1896 : : {
1897 : : /* Drop publication only if it was created by this tool */
88 akapila@postgresql.o 1898 [ + - ]:GNC 6 : if (dbinfo->made_publication)
1899 : : {
1900 : 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1901 : : &dbinfo->made_publication);
1902 : : }
1903 : : else
1904 : : {
88 akapila@postgresql.o 1905 [ # # ]:UNC 0 : if (dry_run)
1906 : 0 : pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1907 : : dbinfo->pubname, dbinfo->dbname);
1908 : : else
1909 : 0 : pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
1910 : : dbinfo->pubname, dbinfo->dbname);
1911 : : }
1912 : : }
360 akapila@postgresql.o 1913 :CBC 8 : }
1914 : :
1915 : : /*
1916 : : * Create a subscription with some predefined options.
1917 : : *
1918 : : * A replication slot was already created in a previous step. Let's use it. It
1919 : : * is not required to copy data. The subscription will be created but it will
1920 : : * not be enabled now. That's because the replication progress must be set and
1921 : : * the replication origin name (one of the function arguments) contains the
1922 : : * subscription OID in its name. Once the subscription is created,
1923 : : * set_replication_progress() can obtain the chosen origin name and set up its
1924 : : * initial location.
1925 : : */
1926 : : static void
720 peter@eisentraut.org 1927 : 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1928 : : {
1929 : 8 : PQExpBuffer str = createPQExpBuffer();
1930 : : PGresult *res;
1931 : : char *pubname_esc;
1932 : : char *subname_esc;
1933 : : char *pubconninfo_esc;
1934 : : char *replslotname_esc;
1935 : :
1936 [ - + ]: 8 : Assert(conn != NULL);
1937 : :
1938 : 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1939 : 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1940 : 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1941 : 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1942 : :
135 alvherre@kurilemu.de 1943 [ + + ]: 8 : if (dry_run)
1944 : 6 : pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1945 : : dbinfo->subname, dbinfo->dbname);
1946 : : else
1947 : 2 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1948 : : dbinfo->subname, dbinfo->dbname);
1949 : :
720 peter@eisentraut.org 1950 : 8 : appendPQExpBuffer(str,
1951 : : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1952 : : "WITH (create_slot = false, enabled = false, "
1953 : : "slot_name = %s, copy_data = false, two_phase = %s)",
1954 : : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
382 akapila@postgresql.o 1955 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
1956 : :
396 michael@paquier.xyz 1957 : 8 : PQfreemem(pubname_esc);
1958 : 8 : PQfreemem(subname_esc);
1959 : 8 : PQfreemem(pubconninfo_esc);
1960 : 8 : PQfreemem(replslotname_esc);
1961 : :
720 peter@eisentraut.org 1962 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1963 : :
1964 [ + + ]: 8 : if (!dry_run)
1965 : : {
1966 : 2 : res = PQexec(conn, str->data);
1967 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1968 : : {
598 peter@eisentraut.org 1969 :UBC 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1970 : : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
720 1971 : 0 : disconnect_database(conn, true);
1972 : : }
720 peter@eisentraut.org 1973 :CBC 2 : PQclear(res);
1974 : : }
1975 : :
1976 : 8 : destroyPQExpBuffer(str);
1977 : 8 : }
1978 : :
1979 : : /*
1980 : : * Sets the replication progress to the consistent LSN.
1981 : : *
1982 : : * The subscriber caught up to the consistent LSN provided by the last
1983 : : * replication slot that was created. The goal is to set up the initial
1984 : : * location for the logical replication that is the exact LSN that the
1985 : : * subscriber was promoted. Once the subscription is enabled it will start
1986 : : * streaming from that location onwards. In dry run mode, the subscription OID
1987 : : * and LSN are set to invalid values for printing purposes.
1988 : : */
1989 : : static void
1990 : 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1991 : : {
1992 : 8 : PQExpBuffer str = createPQExpBuffer();
1993 : : PGresult *res;
1994 : : Oid suboid;
1995 : : char *subname;
1996 : : char *dbname;
1997 : : char *originname;
1998 : : char *lsnstr;
1999 : :
2000 [ - + ]: 8 : Assert(conn != NULL);
2001 : :
2002 : 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2003 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2004 : :
2005 : 8 : appendPQExpBuffer(str,
2006 : : "SELECT s.oid FROM pg_catalog.pg_subscription s "
2007 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2008 : : "WHERE s.subname = %s AND d.datname = %s",
2009 : : subname, dbname);
2010 : :
2011 : 8 : res = PQexec(conn, str->data);
2012 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2013 : : {
720 peter@eisentraut.org 2014 :UBC 0 : pg_log_error("could not obtain subscription OID: %s",
2015 : : PQresultErrorMessage(res));
2016 : 0 : disconnect_database(conn, true);
2017 : : }
2018 : :
720 peter@eisentraut.org 2019 [ + + - + ]:CBC 8 : if (PQntuples(res) != 1 && !dry_run)
2020 : : {
719 peter@eisentraut.org 2021 :UBC 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2022 : : PQntuples(res), 1);
720 2023 : 0 : disconnect_database(conn, true);
2024 : : }
2025 : :
720 peter@eisentraut.org 2026 [ + + ]:CBC 8 : if (dry_run)
2027 : : {
2028 : 6 : suboid = InvalidOid;
251 alvherre@kurilemu.de 2029 :GNC 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2030 : : }
2031 : : else
2032 : : {
720 peter@eisentraut.org 2033 :CBC 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2034 : 2 : lsnstr = psprintf("%s", lsn);
2035 : : }
2036 : :
2037 : 8 : PQclear(res);
2038 : :
2039 : : /*
2040 : : * The origin name is defined as pg_%u. %u is the subscription OID. See
2041 : : * ApplyWorkerMain().
2042 : : */
2043 : 8 : originname = psprintf("pg_%u", suboid);
2044 : :
135 alvherre@kurilemu.de 2045 [ + + ]: 8 : if (dry_run)
2046 : 6 : pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2047 : : originname, lsnstr, dbinfo->dbname);
2048 : : else
2049 : 2 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2050 : : originname, lsnstr, dbinfo->dbname);
2051 : :
720 peter@eisentraut.org 2052 : 8 : resetPQExpBuffer(str);
2053 : 8 : appendPQExpBuffer(str,
2054 : : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2055 : : originname, lsnstr);
2056 : :
2057 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
2058 : :
2059 [ + + ]: 8 : if (!dry_run)
2060 : : {
2061 : 2 : res = PQexec(conn, str->data);
2062 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2063 : : {
568 peter@eisentraut.org 2064 :UBC 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
2065 : : dbinfo->subname, PQresultErrorMessage(res));
720 2066 : 0 : disconnect_database(conn, true);
2067 : : }
720 peter@eisentraut.org 2068 :CBC 2 : PQclear(res);
2069 : : }
2070 : :
396 michael@paquier.xyz 2071 : 8 : PQfreemem(subname);
2072 : 8 : PQfreemem(dbname);
720 peter@eisentraut.org 2073 : 8 : pg_free(originname);
2074 : 8 : pg_free(lsnstr);
2075 : 8 : destroyPQExpBuffer(str);
2076 : 8 : }
2077 : :
2078 : : /*
2079 : : * Enables the subscription.
2080 : : *
2081 : : * The subscription was created in a previous step but it was disabled. After
2082 : : * adjusting the initial logical replication location, enable the subscription.
2083 : : */
2084 : : static void
2085 : 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2086 : : {
2087 : 8 : PQExpBuffer str = createPQExpBuffer();
2088 : : PGresult *res;
2089 : : char *subname;
2090 : :
2091 [ - + ]: 8 : Assert(conn != NULL);
2092 : :
2093 : 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2094 : :
135 alvherre@kurilemu.de 2095 [ + + ]: 8 : if (dry_run)
2096 : 6 : pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2097 : : dbinfo->subname, dbinfo->dbname);
2098 : : else
2099 : 2 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2100 : : dbinfo->subname, dbinfo->dbname);
2101 : :
720 peter@eisentraut.org 2102 : 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2103 : :
2104 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
2105 : :
2106 [ + + ]: 8 : if (!dry_run)
2107 : : {
2108 : 2 : res = PQexec(conn, str->data);
2109 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2110 : : {
720 peter@eisentraut.org 2111 :UBC 0 : pg_log_error("could not enable subscription \"%s\": %s",
2112 : : dbinfo->subname, PQresultErrorMessage(res));
2113 : 0 : disconnect_database(conn, true);
2114 : : }
2115 : :
720 peter@eisentraut.org 2116 :CBC 2 : PQclear(res);
2117 : : }
2118 : :
396 michael@paquier.xyz 2119 : 8 : PQfreemem(subname);
720 peter@eisentraut.org 2120 : 8 : destroyPQExpBuffer(str);
2121 : 8 : }
2122 : :
2123 : : /*
2124 : : * Fetch a list of all connectable non-template databases from the source server
2125 : : * and form a list such that they appear as if the user has specified multiple
2126 : : * --database options, one for each source database.
2127 : : */
2128 : : static void
352 akapila@postgresql.o 2129 : 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2130 : : bool dbnamespecified)
2131 : : {
2132 : : PGconn *conn;
2133 : : PGresult *res;
2134 : :
2135 : : /* If a database name was specified, just connect to it. */
2136 [ - + ]: 1 : if (dbnamespecified)
352 akapila@postgresql.o 2137 :UBC 0 : conn = connect_database(opt->pub_conninfo_str, true);
2138 : : else
2139 : : {
2140 : : /* Otherwise, try postgres first and then template1. */
2141 : : char *conninfo;
2142 : :
352 akapila@postgresql.o 2143 :CBC 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2144 : 1 : conn = connect_database(conninfo, false);
2145 : 1 : pg_free(conninfo);
2146 [ - + ]: 1 : if (!conn)
2147 : : {
352 akapila@postgresql.o 2148 :UBC 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2149 : 0 : conn = connect_database(conninfo, true);
2150 : 0 : pg_free(conninfo);
2151 : : }
2152 : : }
2153 : :
352 akapila@postgresql.o 2154 :CBC 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2155 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2156 : : {
352 akapila@postgresql.o 2157 :UBC 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2158 : 0 : PQclear(res);
2159 : 0 : disconnect_database(conn, true);
2160 : : }
2161 : :
352 akapila@postgresql.o 2162 [ + + ]:CBC 4 : for (int i = 0; i < PQntuples(res); i++)
2163 : : {
2164 : 3 : const char *dbname = PQgetvalue(res, i, 0);
2165 : :
2166 : 3 : simple_string_list_append(&opt->database_names, dbname);
2167 : :
2168 : : /* Increment num_dbs to reflect multiple --database options */
2169 : 3 : num_dbs++;
2170 : : }
2171 : :
2172 : 1 : PQclear(res);
2173 : 1 : disconnect_database(conn, false);
2174 : 1 : }
2175 : :
2176 : : int
720 peter@eisentraut.org 2177 : 23 : main(int argc, char **argv)
2178 : : {
2179 : : static struct option long_options[] =
2180 : : {
2181 : : {"all", no_argument, NULL, 'a'},
2182 : : {"database", required_argument, NULL, 'd'},
2183 : : {"pgdata", required_argument, NULL, 'D'},
2184 : : {"dry-run", no_argument, NULL, 'n'},
2185 : : {"subscriber-port", required_argument, NULL, 'p'},
2186 : : {"publisher-server", required_argument, NULL, 'P'},
2187 : : {"socketdir", required_argument, NULL, 's'},
2188 : : {"recovery-timeout", required_argument, NULL, 't'},
2189 : : {"enable-two-phase", no_argument, NULL, 'T'},
2190 : : {"subscriber-username", required_argument, NULL, 'U'},
2191 : : {"verbose", no_argument, NULL, 'v'},
2192 : : {"version", no_argument, NULL, 'V'},
2193 : : {"help", no_argument, NULL, '?'},
2194 : : {"config-file", required_argument, NULL, 1},
2195 : : {"publication", required_argument, NULL, 2},
2196 : : {"replication-slot", required_argument, NULL, 3},
2197 : : {"subscription", required_argument, NULL, 4},
2198 : : {"clean", required_argument, NULL, 5},
2199 : : {NULL, 0, NULL, 0}
2200 : : };
2201 : :
2202 : 23 : struct CreateSubscriberOptions opt = {0};
2203 : :
2204 : : int c;
2205 : : int option_index;
2206 : :
2207 : : char *pub_base_conninfo;
2208 : : char *sub_base_conninfo;
2209 : 23 : char *dbname_conninfo = NULL;
2210 : :
2211 : : uint64 pub_sysid;
2212 : : uint64 sub_sysid;
2213 : : struct stat statbuf;
2214 : :
2215 : : char *consistent_lsn;
2216 : :
2217 : : char pidfile[MAXPGPATH];
2218 : :
2219 : 23 : pg_logging_init(argv[0]);
2220 : 23 : pg_logging_set_level(PG_LOG_WARNING);
2221 : 23 : progname = get_progname(argv[0]);
590 alvherre@alvh.no-ip. 2222 : 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2223 : :
720 peter@eisentraut.org 2224 [ + + ]: 23 : if (argc > 1)
2225 : : {
2226 [ + + - + ]: 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2227 : : {
2228 : 1 : usage();
2229 : 1 : exit(0);
2230 : : }
2231 [ + - ]: 21 : else if (strcmp(argv[1], "-V") == 0
2232 [ + + ]: 21 : || strcmp(argv[1], "--version") == 0)
2233 : : {
2234 : 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2235 : 1 : exit(0);
2236 : : }
2237 : : }
2238 : :
2239 : : /* Default settings */
2240 : 21 : subscriber_dir = NULL;
2241 : 21 : opt.config_file = NULL;
2242 : 21 : opt.pub_conninfo_str = NULL;
2243 : 21 : opt.socket_dir = NULL;
2244 : 21 : opt.sub_port = DEFAULT_SUB_PORT;
2245 : 21 : opt.sub_username = NULL;
382 akapila@postgresql.o 2246 : 21 : opt.two_phase = false;
720 peter@eisentraut.org 2247 : 21 : opt.database_names = (SimpleStringList)
2248 : : {
2249 : : 0
2250 : : };
2251 : 21 : opt.recovery_timeout = 0;
352 akapila@postgresql.o 2252 : 21 : opt.all_dbs = false;
2253 : :
2254 : : /*
2255 : : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2256 : : * it either.
2257 : : */
2258 : : #ifndef WIN32
720 peter@eisentraut.org 2259 [ - + ]: 21 : if (geteuid() == 0)
2260 : : {
720 peter@eisentraut.org 2261 :UBC 0 : pg_log_error("cannot be executed by \"root\"");
2262 : 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2263 : : progname);
2264 : 0 : exit(1);
2265 : : }
2266 : : #endif
2267 : :
720 peter@eisentraut.org 2268 :CBC 21 : get_restricted_token();
2269 : :
263 2270 : 162 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
720 2271 [ + + ]: 162 : long_options, &option_index)) != -1)
2272 : : {
2273 [ + + + + : 144 : switch (c)
+ + + + +
- + - + +
+ + + ]
2274 : : {
352 akapila@postgresql.o 2275 : 3 : case 'a':
2276 : 3 : opt.all_dbs = true;
2277 : 3 : break;
720 peter@eisentraut.org 2278 : 25 : case 'd':
2279 [ + + ]: 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2280 : : {
2281 : 24 : simple_string_list_append(&opt.database_names, optarg);
2282 : 24 : num_dbs++;
2283 : : }
2284 : : else
345 akapila@postgresql.o 2285 : 1 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
720 peter@eisentraut.org 2286 : 24 : break;
2287 : 19 : case 'D':
2288 : 19 : subscriber_dir = pg_strdup(optarg);
2289 : 19 : canonicalize_path(subscriber_dir);
2290 : 19 : break;
2291 : 9 : case 'n':
2292 : 9 : dry_run = true;
2293 : 9 : break;
2294 : 12 : case 'p':
2295 : 12 : opt.sub_port = pg_strdup(optarg);
2296 : 12 : break;
2297 : 18 : case 'P':
2298 : 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2299 : 18 : break;
2300 : 12 : case 's':
2301 : 12 : opt.socket_dir = pg_strdup(optarg);
2302 : 12 : canonicalize_path(opt.socket_dir);
2303 : 12 : break;
2304 : 3 : case 't':
2305 : 3 : opt.recovery_timeout = atoi(optarg);
2306 : 3 : break;
382 akapila@postgresql.o 2307 : 1 : case 'T':
2308 : 1 : opt.two_phase = true;
2309 : 1 : break;
720 peter@eisentraut.org 2310 :UBC 0 : case 'U':
2311 : 0 : opt.sub_username = pg_strdup(optarg);
2312 : 0 : break;
720 peter@eisentraut.org 2313 :CBC 19 : case 'v':
2314 : 19 : pg_logging_increase_verbosity();
2315 : 19 : break;
720 peter@eisentraut.org 2316 :UBC 0 : case 1:
2317 : 0 : opt.config_file = pg_strdup(optarg);
2318 : 0 : break;
720 peter@eisentraut.org 2319 :CBC 12 : case 2:
2320 [ + + ]: 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2321 : : {
2322 : 11 : simple_string_list_append(&opt.pub_names, optarg);
2323 : 11 : num_pubs++;
2324 : : }
2325 : : else
345 akapila@postgresql.o 2326 : 1 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
720 peter@eisentraut.org 2327 : 11 : break;
2328 : 4 : case 3:
2329 [ + - ]: 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2330 : : {
2331 : 4 : simple_string_list_append(&opt.replslot_names, optarg);
2332 : 4 : num_replslots++;
2333 : : }
2334 : : else
345 akapila@postgresql.o 2335 :UBC 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
720 peter@eisentraut.org 2336 :CBC 4 : break;
2337 : 5 : case 4:
2338 [ + - ]: 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2339 : : {
2340 : 5 : simple_string_list_append(&opt.sub_names, optarg);
2341 : 5 : num_subs++;
2342 : : }
2343 : : else
345 akapila@postgresql.o 2344 :UBC 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
720 peter@eisentraut.org 2345 :CBC 5 : break;
263 2346 : 1 : case 5:
2347 [ + - ]: 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2348 : 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2349 : : else
263 peter@eisentraut.org 2350 :UBC 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
263 peter@eisentraut.org 2351 :CBC 1 : break;
720 2352 : 1 : default:
2353 : : /* getopt_long already emitted a complaint */
2354 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2355 : 1 : exit(1);
2356 : : }
2357 : : }
2358 : :
2359 : : /* Validate that --all is not used with incompatible options */
352 akapila@postgresql.o 2360 [ + + ]: 18 : if (opt.all_dbs)
2361 : : {
2362 : 3 : char *bad_switch = NULL;
2363 : :
2364 [ + + ]: 3 : if (num_dbs > 0)
2365 : 1 : bad_switch = "--database";
2366 [ + + ]: 2 : else if (num_pubs > 0)
2367 : 1 : bad_switch = "--publication";
2368 [ - + ]: 1 : else if (num_replslots > 0)
352 akapila@postgresql.o 2369 :UBC 0 : bad_switch = "--replication-slot";
352 akapila@postgresql.o 2370 [ - + ]:CBC 1 : else if (num_subs > 0)
352 akapila@postgresql.o 2371 :UBC 0 : bad_switch = "--subscription";
2372 : :
352 akapila@postgresql.o 2373 [ + + ]:CBC 3 : if (bad_switch)
2374 : : {
97 alvherre@kurilemu.de 2375 : 2 : pg_log_error("options %s and %s cannot be used together",
2376 : : bad_switch, "-a/--all");
352 akapila@postgresql.o 2377 : 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2378 : 2 : exit(1);
2379 : : }
2380 : : }
2381 : :
2382 : : /* Any non-option arguments? */
720 peter@eisentraut.org 2383 [ - + ]: 16 : if (optind < argc)
2384 : : {
720 peter@eisentraut.org 2385 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2386 : : argv[optind]);
2387 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2388 : 0 : exit(1);
2389 : : }
2390 : :
2391 : : /* Required arguments */
720 peter@eisentraut.org 2392 [ + + ]:CBC 16 : if (subscriber_dir == NULL)
2393 : : {
2394 : 1 : pg_log_error("no subscriber data directory specified");
2395 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2396 : 1 : exit(1);
2397 : : }
2398 : :
2399 : : /* If socket directory is not provided, use the current directory */
2400 [ + + ]: 15 : if (opt.socket_dir == NULL)
2401 : : {
2402 : : char cwd[MAXPGPATH];
2403 : :
2404 [ - + ]: 5 : if (!getcwd(cwd, MAXPGPATH))
720 peter@eisentraut.org 2405 :UBC 0 : pg_fatal("could not determine current directory");
720 peter@eisentraut.org 2406 :CBC 5 : opt.socket_dir = pg_strdup(cwd);
2407 : 5 : canonicalize_path(opt.socket_dir);
2408 : : }
2409 : :
2410 : : /*
2411 : : * Parse connection string. Build a base connection string that might be
2412 : : * reused by multiple databases.
2413 : : */
2414 [ + + ]: 15 : if (opt.pub_conninfo_str == NULL)
2415 : : {
2416 : : /*
2417 : : * TODO use primary_conninfo (if available) from subscriber and
2418 : : * extract publisher connection string. Assume that there are
2419 : : * identical entries for physical and logical replication. If there is
2420 : : * not, we would fail anyway.
2421 : : */
2422 : 1 : pg_log_error("no publisher connection string specified");
2423 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2424 : 1 : exit(1);
2425 : : }
2426 : :
117 alvherre@kurilemu.de 2427 [ + + ]:GNC 14 : if (dry_run)
2428 : 8 : pg_log_info("Executing in dry-run mode.\n"
2429 : : "The target directory will not be modified.");
2430 : :
590 alvherre@alvh.no-ip. 2431 :CBC 14 : pg_log_info("validating publisher connection string");
720 peter@eisentraut.org 2432 : 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2433 : : &dbname_conninfo);
2434 [ - + ]: 14 : if (pub_base_conninfo == NULL)
720 peter@eisentraut.org 2435 :UBC 0 : exit(1);
2436 : :
590 alvherre@alvh.no-ip. 2437 :CBC 14 : pg_log_info("validating subscriber connection string");
720 peter@eisentraut.org 2438 : 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2439 : :
2440 : : /*
2441 : : * Fetch all databases from the source (publisher) and treat them as if
2442 : : * the user specified has multiple --database options, one for each source
2443 : : * database.
2444 : : */
352 akapila@postgresql.o 2445 [ + + ]: 14 : if (opt.all_dbs)
2446 : : {
2447 : 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2448 : :
2449 : 1 : get_publisher_databases(&opt, dbnamespecified);
2450 : : }
2451 : :
720 peter@eisentraut.org 2452 [ + + ]: 14 : if (opt.database_names.head == NULL)
2453 : : {
2454 : 2 : pg_log_info("no database was specified");
2455 : :
2456 : : /*
2457 : : * Try to obtain the dbname from the publisher conninfo. If dbname
2458 : : * parameter is not available, error out.
2459 : : */
2460 [ + + ]: 2 : if (dbname_conninfo)
2461 : : {
2462 : 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2463 : 1 : num_dbs++;
2464 : :
568 2465 : 1 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2466 : : dbname_conninfo);
2467 : : }
2468 : : else
2469 : : {
720 2470 : 1 : pg_log_error("no database name specified");
2471 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.",
2472 : : progname);
2473 : 1 : exit(1);
2474 : : }
2475 : : }
2476 : :
2477 : : /* Number of object names must match number of databases */
2478 [ + + + + ]: 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2479 : : {
568 2480 : 1 : pg_log_error("wrong number of publication names specified");
2481 : 1 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2482 : : num_pubs, num_dbs);
720 2483 : 1 : exit(1);
2484 : : }
2485 [ + + + + ]: 12 : if (num_subs > 0 && num_subs != num_dbs)
2486 : : {
568 2487 : 1 : pg_log_error("wrong number of subscription names specified");
2488 : 1 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2489 : : num_subs, num_dbs);
720 2490 : 1 : exit(1);
2491 : : }
2492 [ + + + + ]: 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2493 : : {
568 2494 : 1 : pg_log_error("wrong number of replication slot names specified");
2495 : 1 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2496 : : num_replslots, num_dbs);
720 2497 : 1 : exit(1);
2498 : : }
2499 : :
2500 : : /* Verify the object types specified for removal from the subscriber */
263 2501 [ + + ]: 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2502 : : {
360 akapila@postgresql.o 2503 [ + - ]: 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
263 peter@eisentraut.org 2504 : 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2505 : : else
2506 : : {
97 alvherre@kurilemu.de 2507 :UBC 0 : pg_log_error("invalid object type \"%s\" specified for %s",
2508 : : cell->val, "--clean");
272 peter@eisentraut.org 2509 : 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
360 akapila@postgresql.o 2510 : 0 : exit(1);
2511 : : }
2512 : : }
2513 : :
2514 : : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
720 peter@eisentraut.org 2515 :CBC 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2516 : 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2517 : :
2518 : : /* Rudimentary check for a data directory */
2519 : 10 : check_data_directory(subscriber_dir);
2520 : :
382 akapila@postgresql.o 2521 : 10 : dbinfos.two_phase = opt.two_phase;
2522 : :
2523 : : /*
2524 : : * Store database information for publisher and subscriber. It should be
2525 : : * called before atexit() because its return is used in the
2526 : : * cleanup_objects_atexit().
2527 : : */
2528 : 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2529 : :
2530 : : /* Register a function to clean up objects in case of failure */
720 peter@eisentraut.org 2531 : 10 : atexit(cleanup_objects_atexit);
2532 : :
2533 : : /*
2534 : : * Check if the subscriber data directory has the same system identifier
2535 : : * than the publisher data directory.
2536 : : */
382 akapila@postgresql.o 2537 : 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
720 peter@eisentraut.org 2538 : 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2539 [ + + ]: 10 : if (pub_sysid != sub_sysid)
2540 : 1 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2541 : :
2542 : : /* Subscriber PID file */
2543 : 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2544 : :
2545 : : /*
2546 : : * The standby server must not be running. If the server is started under
2547 : : * service manager and pg_createsubscriber stops it, the service manager
2548 : : * might react to this action and start the server again. Therefore,
2549 : : * refuse to proceed if the server is running to avoid possible failures.
2550 : : */
2551 [ + + ]: 9 : if (stat(pidfile, &statbuf) == 0)
2552 : : {
568 2553 : 1 : pg_log_error("standby server is running");
2554 : 1 : pg_log_error_hint("Stop the standby server and try again.");
720 2555 : 1 : exit(1);
2556 : : }
2557 : :
2558 : : /*
2559 : : * Start a short-lived standby server with temporary parameters (provided
2560 : : * by command-line options). The goal is to avoid connections during the
2561 : : * transformation steps.
2562 : : */
568 2563 : 8 : pg_log_info("starting the standby server with command-line options");
621 akapila@postgresql.o 2564 : 8 : start_standby_server(&opt, true, false);
2565 : :
2566 : : /* Check if the standby server is ready for logical replication */
382 2567 : 8 : check_subscriber(dbinfos.dbinfo);
2568 : :
2569 : : /* Check if the primary server is ready for logical replication */
2570 : 6 : check_publisher(dbinfos.dbinfo);
2571 : :
2572 : : /*
2573 : : * Stop the target server. The recovery process requires that the server
2574 : : * reaches a consistent state before targeting the recovery stop point.
2575 : : * Make sure a consistent state is reached (stop the target server
2576 : : * guarantees it) *before* creating the replication slots in
2577 : : * setup_publisher().
2578 : : */
720 peter@eisentraut.org 2579 : 4 : pg_log_info("stopping the subscriber");
2580 : 4 : stop_standby_server(subscriber_dir);
2581 : :
2582 : : /* Create the required objects for each database on publisher */
382 akapila@postgresql.o 2583 : 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2584 : :
2585 : : /* Write the required recovery parameters */
2586 : 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2587 : :
2588 : : /*
2589 : : * Start subscriber so the recovery parameters will take effect. Wait
2590 : : * until accepting connections. We don't want to start logical replication
2591 : : * during setup.
2592 : : */
720 peter@eisentraut.org 2593 : 4 : pg_log_info("starting the subscriber");
621 akapila@postgresql.o 2594 : 4 : start_standby_server(&opt, true, true);
2595 : :
2596 : : /* Waiting the subscriber to be promoted */
382 2597 : 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2598 : :
2599 : : /*
2600 : : * Create the subscription for each database on subscriber. It does not
2601 : : * enable it immediately because it needs to adjust the replication start
2602 : : * point to the LSN reported by setup_publisher(). It also cleans up
2603 : : * publications created by this tool and replication to the standby.
2604 : : */
2605 : 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2606 : :
2607 : : /* Remove primary_slot_name if it exists on primary */
2608 : 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2609 : :
2610 : : /* Remove failover replication slots if they exist on subscriber */
2611 : 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2612 : :
2613 : : /* Stop the subscriber */
720 peter@eisentraut.org 2614 : 4 : pg_log_info("stopping the subscriber");
2615 : 4 : stop_standby_server(subscriber_dir);
2616 : :
2617 : : /* Change system identifier from subscriber */
2618 : 4 : modify_subscriber_sysid(&opt);
2619 : :
2620 : 4 : success = true;
2621 : :
2622 : 4 : pg_log_info("Done!");
2623 : :
2624 : 4 : return 0;
2625 : : }
|