Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_createsubscriber.c
4 : : * Create a new logical replica from a standby server
5 : : *
6 : : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres_fe.h"
15 : :
16 : : #include <sys/stat.h>
17 : : #include <sys/time.h>
18 : : #include <sys/wait.h>
19 : : #include <time.h>
20 : :
21 : : #include "common/connect.h"
22 : : #include "common/controldata_utils.h"
23 : : #include "common/file_perm.h"
24 : : #include "common/file_utils.h"
25 : : #include "common/logging.h"
26 : : #include "common/pg_prng.h"
27 : : #include "common/restricted_token.h"
28 : : #include "datatype/timestamp.h"
29 : : #include "fe_utils/recovery_gen.h"
30 : : #include "fe_utils/simple_list.h"
31 : : #include "fe_utils/string_utils.h"
32 : : #include "fe_utils/version.h"
33 : : #include "getopt_long.h"
34 : :
35 : : #define DEFAULT_SUB_PORT "50432"
36 : : #define OBJECTTYPE_PUBLICATIONS 0x0001
37 : :
38 : : /*
39 : : * Configuration files for recovery parameters.
40 : : *
41 : : * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
42 : : * the server through an include_if_exists in postgresql.auto.conf.
43 : : *
44 : : * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
45 : : * so as the recovery parameters set by this tool never take effect on node
46 : : * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
47 : : * debugging.
48 : : */
49 : : #define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
50 : : #define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
51 : : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
52 : :
53 : : #define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
54 : : #define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
55 : :
56 : : /* Command-line options */
57 : : struct CreateSubscriberOptions
58 : : {
59 : : char *config_file; /* configuration file */
60 : : char *log_dir; /* log directory name */
61 : : char *pub_conninfo_str; /* publisher connection string */
62 : : char *socket_dir; /* directory for Unix-domain socket, if any */
63 : : char *sub_port; /* subscriber port number */
64 : : const char *sub_username; /* subscriber username */
65 : : bool two_phase; /* enable-two-phase option */
66 : : SimpleStringList database_names; /* list of database names */
67 : : SimpleStringList pub_names; /* list of publication names */
68 : : SimpleStringList sub_names; /* list of subscription names */
69 : : SimpleStringList replslot_names; /* list of replication slot names */
70 : : int recovery_timeout; /* stop recovery after this time */
71 : : bool all_dbs; /* all option */
72 : : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
73 : : };
74 : :
75 : : /* per-database publication/subscription info */
76 : : struct LogicalRepInfo
77 : : {
78 : : char *dbname; /* database name */
79 : : char *pubconninfo; /* publisher connection string */
80 : : char *subconninfo; /* subscriber connection string */
81 : : char *pubname; /* publication name */
82 : : char *subname; /* subscription name */
83 : : char *replslotname; /* replication slot name */
84 : :
85 : : bool made_replslot; /* replication slot was created */
86 : : bool made_publication; /* publication was created */
87 : : };
88 : :
89 : : /*
90 : : * Information shared across all the databases (or publications and
91 : : * subscriptions).
92 : : */
93 : : struct LogicalRepInfos
94 : : {
95 : : struct LogicalRepInfo *dbinfo;
96 : : bool two_phase; /* enable-two-phase option */
97 : : uint32 objecttypes_to_clean; /* flags indicating which object types
98 : : * to clean up on subscriber */
99 : : };
100 : :
101 : : static void cleanup_objects_atexit(void);
102 : : static void usage(void);
103 : : static char *get_base_conninfo(const char *conninfo, char **dbname);
104 : : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
105 : : static char *get_exec_path(const char *argv0, const char *progname);
106 : : static void check_data_directory(const char *datadir);
107 : : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
108 : : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
109 : : const char *pub_base_conninfo,
110 : : const char *sub_base_conninfo);
111 : : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
112 : : static void disconnect_database(PGconn *conn, bool exit_on_error);
113 : : static uint64 get_primary_sysid(const char *conninfo);
114 : : static uint64 get_standby_sysid(const char *datadir);
115 : : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
116 : : static bool server_is_in_recovery(PGconn *conn);
117 : : static char *generate_object_name(PGconn *conn);
118 : : static void check_publisher(const struct LogicalRepInfo *dbinfo);
119 : : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
120 : : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
121 : : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
122 : : const char *consistent_lsn);
123 : : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
124 : : const char *lsn);
125 : : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
126 : : const char *slotname);
127 : : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
128 : : static char *create_logical_replication_slot(PGconn *conn,
129 : : struct LogicalRepInfo *dbinfo);
130 : : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
131 : : const char *slot_name);
132 : : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
133 : : static void start_standby_server(const struct CreateSubscriberOptions *opt,
134 : : bool restricted_access,
135 : : bool restrict_logical_worker);
136 : : static void stop_standby_server(const char *datadir);
137 : : static void wait_for_end_recovery(const char *conninfo,
138 : : const struct CreateSubscriberOptions *opt);
139 : : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
140 : : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
141 : : static void drop_publication(PGconn *conn, const char *pubname,
142 : : const char *dbname);
143 : : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
144 : : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
145 : : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
146 : : const char *lsn);
147 : : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
148 : : static void check_and_drop_existing_subscriptions(PGconn *conn,
149 : : const struct LogicalRepInfo *dbinfo);
150 : : static void drop_existing_subscription(PGconn *conn, const char *subname,
151 : : const char *dbname);
152 : : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
153 : : bool dbnamespecified);
154 : :
155 : : #define WAIT_INTERVAL 1 /* 1 second */
156 : :
157 : : static const char *progname;
158 : :
159 : : static char *primary_slot_name = NULL;
160 : : static bool dry_run = false;
161 : :
162 : : static bool success = false;
163 : :
164 : : static struct LogicalRepInfos dbinfos;
165 : : static int num_dbs = 0; /* number of specified databases */
166 : : static int num_pubs = 0; /* number of specified publications */
167 : : static int num_subs = 0; /* number of specified subscriptions */
168 : : static int num_replslots = 0; /* number of specified replication slots */
169 : :
170 : : static pg_prng_state prng_state;
171 : :
172 : : static char *pg_ctl_path = NULL;
173 : : static char *pg_resetwal_path = NULL;
174 : :
175 : : static char *logdir = NULL; /* Subdirectory of the user specified logdir
176 : : * where the log files are written (if
177 : : * specified) */
178 : :
179 : : /* standby / subscriber data directory */
180 : : static char *subscriber_dir = NULL;
181 : :
182 : : static bool recovery_ended = false;
183 : : static bool standby_running = false;
184 : : static bool recovery_params_set = false;
185 : :
186 : :
187 : : /*
188 : : * Clean up objects created by pg_createsubscriber.
189 : : *
190 : : * Publications and replication slots are created on the primary. Depending
191 : : * on the step where it failed, already-created objects should be removed if
192 : : * possible (sometimes this won't work due to a connection issue).
193 : : * There is no cleanup on the target server *after* its promotion, because any
194 : : * failure at this point means recreating the physical replica and starting
195 : : * again.
196 : : *
197 : : * The recovery configuration is always removed, by renaming the included
198 : : * configuration file out of the way.
199 : : */
200 : : static void
796 peter@eisentraut.org 201 :CBC 10 : cleanup_objects_atexit(void)
202 : : {
203 : : /* Rename the included configuration file, if necessary. */
142 michael@paquier.xyz 204 [ + + ]:GNC 10 : if (recovery_params_set)
205 : : {
206 : : char conf_filename[MAXPGPATH];
207 : : char conf_filename_disabled[MAXPGPATH];
208 : :
209 : 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
210 : : INCLUDED_CONF_FILE);
211 : 1 : snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
212 : : INCLUDED_CONF_FILE_DISABLED);
213 : :
214 [ - + ]: 1 : if (durable_rename(conf_filename, conf_filename_disabled) != 0)
215 : : {
216 : : /* durable_rename() has already logged something. */
23 peter@eisentraut.org 217 :UNC 0 : pg_log_warning_hint("A manual removal of the recovery parameters might be required.");
218 : : }
219 : : }
220 : :
796 peter@eisentraut.org 221 [ + + ]:CBC 10 : if (success)
222 : 4 : return;
223 : :
224 : : /*
225 : : * If the server is promoted, there is no way to use the current setup
226 : : * again. Warn the user that a new replication setup should be done before
227 : : * trying again.
228 : : */
229 [ - + ]: 6 : if (recovery_ended)
230 : : {
47 peter@eisentraut.org 231 :UBC 0 : pg_log_warning("failed after the end of recovery");
232 : 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
233 : : "You must recreate the physical replica before continuing.");
234 : : }
235 : :
796 peter@eisentraut.org 236 [ + + ]:CBC 18 : for (int i = 0; i < num_dbs; i++)
237 : : {
440 michael@paquier.xyz 238 : 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
239 : :
240 [ + - - + ]: 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
241 : : {
242 : : PGconn *conn;
243 : :
440 michael@paquier.xyz 244 :UBC 0 : conn = connect_database(dbinfo->pubconninfo, false);
796 peter@eisentraut.org 245 [ # # ]: 0 : if (conn != NULL)
246 : : {
440 michael@paquier.xyz 247 [ # # ]: 0 : if (dbinfo->made_publication)
3 fujii@postgresql.org 248 : 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname);
440 michael@paquier.xyz 249 [ # # ]: 0 : if (dbinfo->made_replslot)
250 : 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
796 peter@eisentraut.org 251 : 0 : disconnect_database(conn, false);
252 : : }
253 : : else
254 : : {
255 : : /*
256 : : * If a connection could not be established, inform the user
257 : : * that some objects were left on primary and should be
258 : : * removed before trying again.
259 : : */
440 michael@paquier.xyz 260 [ # # ]: 0 : if (dbinfo->made_publication)
261 : : {
47 peter@eisentraut.org 262 : 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
263 : : dbinfo->pubname,
264 : : dbinfo->dbname);
265 : 0 : pg_log_warning_hint("Drop this publication before trying again.");
266 : : }
440 michael@paquier.xyz 267 [ # # ]: 0 : if (dbinfo->made_replslot)
268 : : {
47 peter@eisentraut.org 269 : 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
270 : : dbinfo->replslotname,
271 : : dbinfo->dbname);
272 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
273 : : }
274 : : }
275 : : }
276 : : }
277 : :
796 peter@eisentraut.org 278 [ + + ]:CBC 6 : if (standby_running)
279 : 4 : stop_standby_server(subscriber_dir);
280 : : }
281 : :
282 : : static void
283 : 1 : usage(void)
284 : : {
285 : 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
286 : : progname);
287 : 1 : printf(_("Usage:\n"));
288 : 1 : printf(_(" %s [OPTION]...\n"), progname);
289 : 1 : printf(_("\nOptions:\n"));
428 akapila@postgresql.o 290 : 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
291 : : " databases and databases that don't allow connections\n"));
644 peter@eisentraut.org 292 : 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
709 293 : 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
66 akapila@postgresql.o 294 :GNC 1 : printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
709 peter@eisentraut.org 295 :CBC 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
296 : 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
297 : 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
667 298 : 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
709 299 : 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
458 akapila@postgresql.o 300 : 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
644 peter@eisentraut.org 301 : 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
709 302 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
339 303 : 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
304 : : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
709 305 : 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
306 : : " file when running target cluster\n"));
307 : 1 : printf(_(" --publication=NAME publication name\n"));
308 : 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
309 : 1 : printf(_(" --subscription=NAME subscription name\n"));
310 : 1 : printf(_(" -V, --version output version information, then exit\n"));
311 : 1 : printf(_(" -?, --help show this help, then exit\n"));
796 312 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
313 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
314 : 1 : }
315 : :
316 : : /*
317 : : * Subroutine to append "keyword=value" to a connection string,
318 : : * with proper quoting of the value. (We assume keywords don't need that.)
319 : : */
320 : : static void
699 tgl@sss.pgh.pa.us 321 : 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
322 : : {
323 [ + + ]: 107 : if (buf->len > 0)
324 : 79 : appendPQExpBufferChar(buf, ' ');
325 : 107 : appendPQExpBufferStr(buf, keyword);
326 : 107 : appendPQExpBufferChar(buf, '=');
327 : 107 : appendConnStrVal(buf, val);
328 : 107 : }
329 : :
330 : : /*
331 : : * Validate a connection string. Returns a base connection string that is a
332 : : * connection string without a database name.
333 : : *
334 : : * Since we might process multiple databases, each database name will be
335 : : * appended to this base connection string to provide a final connection
336 : : * string. If the second argument (dbname) is not null, returns dbname if the
337 : : * provided connection string contains it.
338 : : *
339 : : * It is the caller's responsibility to free the returned connection string and
340 : : * dbname.
341 : : */
342 : : static char *
796 peter@eisentraut.org 343 : 14 : get_base_conninfo(const char *conninfo, char **dbname)
344 : : {
345 : : PQExpBuffer buf;
346 : : PQconninfoOption *conn_opts;
347 : : PQconninfoOption *conn_opt;
348 : 14 : char *errmsg = NULL;
349 : : char *ret;
350 : :
351 : 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
352 [ - + ]: 14 : if (conn_opts == NULL)
353 : : {
47 peter@eisentraut.org 354 :UBC 0 : pg_log_error("could not parse connection string: %s", errmsg);
789 tgl@sss.pgh.pa.us 355 : 0 : PQfreemem(errmsg);
796 peter@eisentraut.org 356 : 0 : return NULL;
357 : : }
358 : :
789 tgl@sss.pgh.pa.us 359 :CBC 14 : buf = createPQExpBuffer();
796 peter@eisentraut.org 360 [ + + ]: 742 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
361 : : {
362 [ + + + - ]: 728 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
363 : : {
699 tgl@sss.pgh.pa.us 364 [ + + ]: 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
365 : : {
366 [ + - ]: 9 : if (dbname)
367 : 9 : *dbname = pg_strdup(conn_opt->val);
368 : 9 : continue;
369 : : }
370 : 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
371 : : }
372 : : }
373 : :
796 peter@eisentraut.org 374 : 14 : ret = pg_strdup(buf->data);
375 : :
376 : 14 : destroyPQExpBuffer(buf);
377 : 14 : PQconninfoFree(conn_opts);
378 : :
379 : 14 : return ret;
380 : : }
381 : :
382 : : /*
383 : : * Build a subscriber connection string. Only a few parameters are supported
384 : : * since it starts a server with restricted access.
385 : : */
386 : : static char *
387 : 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
388 : : {
389 : 14 : PQExpBuffer buf = createPQExpBuffer();
390 : : char *ret;
391 : :
699 tgl@sss.pgh.pa.us 392 : 14 : appendConnStrItem(buf, "port", opt->sub_port);
393 : : #if !defined(WIN32)
394 : 14 : appendConnStrItem(buf, "host", opt->socket_dir);
395 : : #endif
796 peter@eisentraut.org 396 [ - + ]: 14 : if (opt->sub_username != NULL)
699 tgl@sss.pgh.pa.us 397 :UBC 0 : appendConnStrItem(buf, "user", opt->sub_username);
699 tgl@sss.pgh.pa.us 398 :CBC 14 : appendConnStrItem(buf, "fallback_application_name", progname);
399 : :
796 peter@eisentraut.org 400 : 14 : ret = pg_strdup(buf->data);
401 : :
402 : 14 : destroyPQExpBuffer(buf);
403 : :
404 : 14 : return ret;
405 : : }
406 : :
407 : : /*
408 : : * Verify if a PostgreSQL binary (progname) is available in the same directory as
409 : : * pg_createsubscriber and it has the same version. It returns the absolute
410 : : * path of the progname.
411 : : */
412 : : static char *
413 : 20 : get_exec_path(const char *argv0, const char *progname)
414 : : {
415 : : char *versionstr;
416 : : char *exec_path;
417 : : int ret;
418 : :
419 : 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
420 : 20 : exec_path = pg_malloc(MAXPGPATH);
421 : 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
422 : :
423 [ - + ]: 20 : if (ret < 0)
424 : : {
425 : : char full_path[MAXPGPATH];
426 : :
796 peter@eisentraut.org 427 [ # # ]:UBC 0 : if (find_my_exec(argv0, full_path) < 0)
428 : 0 : strlcpy(full_path, progname, sizeof(full_path));
429 : :
430 [ # # ]: 0 : if (ret == -1)
47 431 : 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
432 : : progname, "pg_createsubscriber", full_path);
433 : : else
434 : 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
435 : : progname, full_path, "pg_createsubscriber");
436 : : }
437 : :
47 peter@eisentraut.org 438 [ + + ]:CBC 20 : pg_log_debug("%s path is: %s", progname, exec_path);
439 : :
796 440 : 20 : return exec_path;
441 : : }
442 : :
443 : : /*
444 : : * Is it a cluster directory? These are preliminary checks. It is far from
445 : : * making an accurate check. If it is not a clone from the publisher, it will
446 : : * eventually fail in a future step.
447 : : */
448 : : static void
449 : 10 : check_data_directory(const char *datadir)
450 : : {
451 : : struct stat statbuf;
452 : : uint32 major_version;
453 : : char *version_str;
454 : :
47 455 : 10 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
456 : : datadir);
457 : :
796 458 [ - + ]: 10 : if (stat(datadir, &statbuf) != 0)
459 : : {
796 peter@eisentraut.org 460 [ # # ]:UBC 0 : if (errno == ENOENT)
47 461 : 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
462 : : else
463 : 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
464 : : }
465 : :
466 : : /*
467 : : * Retrieve the contents of this cluster's PG_VERSION. We require
468 : : * compatibility with the same major version as the one this tool is
469 : : * compiled with.
470 : : */
227 michael@paquier.xyz 471 :GNC 10 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
472 [ - + ]: 10 : if (major_version != PG_MAJORVERSION_NUM)
473 : : {
47 peter@eisentraut.org 474 :UNC 0 : pg_log_error("data directory is of wrong version");
475 : 0 : pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
476 : : "PG_VERSION", version_str, PG_MAJORVERSION);
227 michael@paquier.xyz 477 : 0 : exit(1);
478 : : }
796 peter@eisentraut.org 479 :CBC 10 : }
480 : :
481 : : /*
482 : : * Append database name into a base connection string.
483 : : *
484 : : * dbname is the only parameter that changes so it is not included in the base
485 : : * connection string. This function concatenates dbname to build a "real"
486 : : * connection string.
487 : : */
488 : : static char *
489 : 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
490 : : {
491 : 41 : PQExpBuffer buf = createPQExpBuffer();
492 : : char *ret;
493 : :
494 [ - + ]: 41 : Assert(conninfo != NULL);
495 : :
496 : 41 : appendPQExpBufferStr(buf, conninfo);
699 tgl@sss.pgh.pa.us 497 : 41 : appendConnStrItem(buf, "dbname", dbname);
498 : :
796 peter@eisentraut.org 499 : 41 : ret = pg_strdup(buf->data);
500 : 41 : destroyPQExpBuffer(buf);
501 : :
502 : 41 : return ret;
503 : : }
504 : :
505 : : /*
506 : : * Store publication and subscription information.
507 : : *
508 : : * If publication, replication slot and subscription names were specified,
509 : : * store it here. Otherwise, a generated name will be assigned to the object in
510 : : * setup_publisher().
511 : : */
512 : : static struct LogicalRepInfo *
513 : 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
514 : : const char *pub_base_conninfo,
515 : : const char *sub_base_conninfo)
516 : : {
517 : : struct LogicalRepInfo *dbinfo;
518 : 10 : SimpleStringListCell *pubcell = NULL;
519 : 10 : SimpleStringListCell *subcell = NULL;
520 : 10 : SimpleStringListCell *replslotcell = NULL;
521 : 10 : int i = 0;
522 : :
523 : 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
524 : :
525 [ + + ]: 10 : if (num_pubs > 0)
526 : 2 : pubcell = opt->pub_names.head;
527 [ + + ]: 10 : if (num_subs > 0)
528 : 1 : subcell = opt->sub_names.head;
529 [ + + ]: 10 : if (num_replslots > 0)
530 : 2 : replslotcell = opt->replslot_names.head;
531 : :
532 [ + + ]: 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
533 : : {
534 : : char *conninfo;
535 : :
536 : : /* Fill publisher attributes */
537 : 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
538 : 20 : dbinfo[i].pubconninfo = conninfo;
539 : 20 : dbinfo[i].dbname = cell->val;
540 [ + + ]: 20 : if (num_pubs > 0)
541 : 4 : dbinfo[i].pubname = pubcell->val;
542 : : else
543 : 16 : dbinfo[i].pubname = NULL;
544 [ + + ]: 20 : if (num_replslots > 0)
545 : 3 : dbinfo[i].replslotname = replslotcell->val;
546 : : else
547 : 17 : dbinfo[i].replslotname = NULL;
548 : 20 : dbinfo[i].made_replslot = false;
549 : 20 : dbinfo[i].made_publication = false;
550 : : /* Fill subscriber attributes */
551 : 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
552 : 20 : dbinfo[i].subconninfo = conninfo;
553 [ + + ]: 20 : if (num_subs > 0)
554 : 2 : dbinfo[i].subname = subcell->val;
555 : : else
556 : 18 : dbinfo[i].subname = NULL;
557 : : /* Other fields will be filled later */
558 : :
47 559 [ + + + - : 20 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ - ]
560 : : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
561 : : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
562 : : dbinfo[i].pubconninfo);
563 [ + + + - : 20 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
- + ]
564 : : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
565 : : dbinfo[i].subconninfo,
566 : : dbinfos.two_phase ? "true" : "false");
567 : :
796 568 [ + + ]: 20 : if (num_pubs > 0)
569 : 4 : pubcell = pubcell->next;
570 [ + + ]: 20 : if (num_subs > 0)
571 : 2 : subcell = subcell->next;
572 [ + + ]: 20 : if (num_replslots > 0)
573 : 3 : replslotcell = replslotcell->next;
574 : :
575 : 20 : i++;
576 : : }
577 : :
578 : 10 : return dbinfo;
579 : : }
580 : :
581 : : /*
582 : : * Open a new connection. If exit_on_error is true, it has an undesired
583 : : * condition and it should exit immediately.
584 : : */
585 : : static PGconn *
586 : 57 : connect_database(const char *conninfo, bool exit_on_error)
587 : : {
588 : : PGconn *conn;
589 : : PGresult *res;
590 : :
591 : 57 : conn = PQconnectdb(conninfo);
592 [ - + ]: 57 : if (PQstatus(conn) != CONNECTION_OK)
593 : : {
47 peter@eisentraut.org 594 :UBC 0 : pg_log_error("connection to database failed: %s",
595 : : PQerrorMessage(conn));
789 tgl@sss.pgh.pa.us 596 : 0 : PQfinish(conn);
597 : :
796 peter@eisentraut.org 598 [ # # ]: 0 : if (exit_on_error)
599 : 0 : exit(1);
600 : 0 : return NULL;
601 : : }
602 : :
603 : : /* Secure search_path */
796 peter@eisentraut.org 604 :CBC 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
605 [ - + ]: 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
606 : : {
47 peter@eisentraut.org 607 :UBC 0 : pg_log_error("could not clear \"search_path\": %s",
608 : : PQresultErrorMessage(res));
789 tgl@sss.pgh.pa.us 609 : 0 : PQclear(res);
610 : 0 : PQfinish(conn);
611 : :
796 peter@eisentraut.org 612 [ # # ]: 0 : if (exit_on_error)
613 : 0 : exit(1);
614 : 0 : return NULL;
615 : : }
796 peter@eisentraut.org 616 :CBC 57 : PQclear(res);
617 : :
618 : 57 : return conn;
619 : : }
620 : :
621 : : /*
622 : : * Close the connection. If exit_on_error is true, it has an undesired
623 : : * condition and it should exit immediately.
624 : : */
625 : : static void
626 : 57 : disconnect_database(PGconn *conn, bool exit_on_error)
627 : : {
628 [ - + ]: 57 : Assert(conn != NULL);
629 : :
630 : 57 : PQfinish(conn);
631 : :
632 [ + + ]: 57 : if (exit_on_error)
633 : 2 : exit(1);
634 : 55 : }
635 : :
636 : : /*
637 : : * Obtain the system identifier using the provided connection. It will be used
638 : : * to compare if a data directory is a clone of another one.
639 : : */
640 : : static uint64
641 : 10 : get_primary_sysid(const char *conninfo)
642 : : {
643 : : PGconn *conn;
644 : : PGresult *res;
645 : : uint64 sysid;
646 : :
47 647 : 10 : pg_log_info("getting system identifier from publisher");
648 : :
796 649 : 10 : conn = connect_database(conninfo, true);
650 : :
651 : 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
652 [ - + ]: 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
653 : : {
47 peter@eisentraut.org 654 :UBC 0 : pg_log_error("could not get system identifier: %s",
655 : : PQresultErrorMessage(res));
796 656 : 0 : disconnect_database(conn, true);
657 : : }
796 peter@eisentraut.org 658 [ - + ]:CBC 10 : if (PQntuples(res) != 1)
659 : : {
47 peter@eisentraut.org 660 :UBC 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
661 : : PQntuples(res), 1);
796 662 : 0 : disconnect_database(conn, true);
663 : : }
664 : :
796 peter@eisentraut.org 665 :CBC 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
666 : :
47 667 : 10 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
668 : :
796 669 : 10 : PQclear(res);
670 : 10 : disconnect_database(conn, false);
671 : :
672 : 10 : return sysid;
673 : : }
674 : :
675 : : /*
676 : : * Obtain the system identifier from control file. It will be used to compare
677 : : * if a data directory is a clone of another one. This routine is used locally
678 : : * and avoids a connection.
679 : : */
680 : : static uint64
681 : 10 : get_standby_sysid(const char *datadir)
682 : : {
683 : : ControlFileData *cf;
684 : : bool crc_ok;
685 : : uint64 sysid;
686 : :
47 687 : 10 : pg_log_info("getting system identifier from subscriber");
688 : :
796 689 : 10 : cf = get_controlfile(datadir, &crc_ok);
690 [ - + ]: 10 : if (!crc_ok)
47 peter@eisentraut.org 691 :UBC 0 : pg_fatal("control file appears to be corrupt");
692 : :
796 peter@eisentraut.org 693 :CBC 10 : sysid = cf->system_identifier;
694 : :
47 695 : 10 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
696 : :
796 697 : 10 : pg_free(cf);
698 : :
699 : 10 : return sysid;
700 : : }
701 : :
702 : : /*
703 : : * Modify the system identifier. Since a standby server preserves the system
704 : : * identifier, it makes sense to change it to avoid situations in which WAL
705 : : * files from one of the systems might be used in the other one.
706 : : */
707 : : static void
708 : 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
709 : : {
710 : : ControlFileData *cf;
711 : : bool crc_ok;
712 : : struct timeval tv;
713 : :
714 : : char *out_file;
715 : : char *cmd_str;
716 : :
47 717 : 4 : pg_log_info("modifying system identifier of subscriber");
718 : :
796 719 : 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
720 [ - + ]: 4 : if (!crc_ok)
47 peter@eisentraut.org 721 :UBC 0 : pg_fatal("control file appears to be corrupt");
722 : :
723 : : /*
724 : : * Select a new system identifier.
725 : : *
726 : : * XXX this code was extracted from BootStrapXLOG().
727 : : */
796 peter@eisentraut.org 728 :CBC 4 : gettimeofday(&tv, NULL);
729 : 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
730 : 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
731 : 4 : cf->system_identifier |= getpid() & 0xFFF;
732 : :
211 alvherre@kurilemu.de 733 [ + + ]: 4 : if (dry_run)
47 peter@eisentraut.org 734 : 3 : pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
735 : : cf->system_identifier);
736 : : else
737 : : {
796 738 : 1 : update_controlfile(subscriber_dir, cf, true);
47 739 : 1 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
740 : : cf->system_identifier);
741 : : }
742 : :
211 alvherre@kurilemu.de 743 [ + + ]: 4 : if (dry_run)
47 peter@eisentraut.org 744 : 3 : pg_log_info("dry-run: would run pg_resetwal on the subscriber");
745 : : else
746 : 1 : pg_log_info("running pg_resetwal on the subscriber");
747 : :
748 : : /*
749 : : * Redirecting the output to the logfile if specified. Since the output
750 : : * would be very short, around one line, we do not provide a separate file
751 : : * for it; it's done as a part of the server log.
752 : : */
66 akapila@postgresql.o 753 [ + + ]:GNC 4 : if (opt->log_dir)
754 : 1 : out_file = psprintf("%s/%s", logdir, SERVER_LOG_FILE_NAME);
755 : : else
756 : 3 : out_file = DEVNULL;
757 : :
758 : 4 : cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
759 : : subscriber_dir, out_file);
760 [ + + ]: 4 : if (opt->log_dir)
761 : 1 : pg_free(out_file);
762 : :
47 peter@eisentraut.org 763 [ + + ]:CBC 4 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
764 : :
796 765 [ + + ]: 4 : if (!dry_run)
766 : : {
767 : 1 : int rc = system(cmd_str);
768 : :
769 [ + - ]: 1 : if (rc == 0)
47 770 : 1 : pg_log_info("successfully reset WAL on the subscriber");
771 : : else
47 peter@eisentraut.org 772 :UBC 0 : pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
773 : : }
774 : :
796 peter@eisentraut.org 775 :CBC 4 : pg_free(cf);
66 akapila@postgresql.o 776 :GNC 4 : pg_free(cmd_str);
796 peter@eisentraut.org 777 :CBC 4 : }
778 : :
779 : : /*
780 : : * Generate an object name using a prefix, database oid and a random integer.
781 : : * It is used in case the user does not specify an object name (publication,
782 : : * subscription, replication slot).
783 : : */
784 : : static char *
785 : 8 : generate_object_name(PGconn *conn)
786 : : {
787 : : PGresult *res;
788 : : Oid oid;
789 : : uint32 rand;
790 : : char *objname;
791 : :
792 : 8 : res = PQexec(conn,
793 : : "SELECT oid FROM pg_catalog.pg_database "
794 : : "WHERE datname = pg_catalog.current_database()");
795 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
796 : : {
47 peter@eisentraut.org 797 :UBC 0 : pg_log_error("could not obtain database OID: %s",
798 : : PQresultErrorMessage(res));
796 799 : 0 : disconnect_database(conn, true);
800 : : }
801 : :
796 peter@eisentraut.org 802 [ - + ]:CBC 8 : if (PQntuples(res) != 1)
803 : : {
47 peter@eisentraut.org 804 :UBC 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
805 : : PQntuples(res), 1);
796 806 : 0 : disconnect_database(conn, true);
807 : : }
808 : :
809 : : /* Database OID */
796 peter@eisentraut.org 810 :CBC 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
811 : :
812 : 8 : PQclear(res);
813 : :
814 : : /* Random unsigned integer */
815 : 8 : rand = pg_prng_uint32(&prng_state);
816 : :
817 : : /*
818 : : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
819 : : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
820 : : * '\0').
821 : : */
822 : 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
823 : :
824 : 8 : return objname;
825 : : }
826 : :
827 : : /*
828 : : * Does the publication exist in the specified database?
829 : : */
830 : : static bool
164 akapila@postgresql.o 831 :GNC 8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
832 : : {
833 : 8 : PQExpBuffer str = createPQExpBuffer();
834 : : PGresult *res;
835 : 8 : bool found = false;
836 : 8 : char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
837 : :
838 : 8 : appendPQExpBuffer(str,
839 : : "SELECT 1 FROM pg_catalog.pg_publication "
840 : : "WHERE pubname = %s",
841 : : pubname_esc);
842 : 8 : res = PQexec(conn, str->data);
843 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
844 : : {
47 peter@eisentraut.org 845 :UNC 0 : pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
846 : : pubname, dbname, PQerrorMessage(conn));
164 akapila@postgresql.o 847 : 0 : disconnect_database(conn, true);
848 : : }
849 : :
164 akapila@postgresql.o 850 [ + + ]:GNC 8 : if (PQntuples(res) == 1)
851 : 1 : found = true;
852 : :
853 : 8 : PQclear(res);
854 : 8 : PQfreemem(pubname_esc);
855 : 8 : destroyPQExpBuffer(str);
856 : :
857 : 8 : return found;
858 : : }
859 : :
860 : : /*
861 : : * Create the publications and replication slots in preparation for logical
862 : : * replication. Returns the LSN from latest replication slot. It will be the
863 : : * replication start point that is used to adjust the subscriptions (see
864 : : * set_replication_progress).
865 : : */
866 : : static char *
796 peter@eisentraut.org 867 :CBC 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
868 : : {
869 : 4 : char *lsn = NULL;
870 : :
871 : 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
872 : :
873 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
874 : : {
875 : : PGconn *conn;
876 : 8 : char *genname = NULL;
877 : :
878 : 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
879 : :
880 : : /*
881 : : * If an object name was not specified as command-line options, assign
882 : : * a generated object name. The replication slot has a different rule.
883 : : * The subscription name is assigned to the replication slot name if
884 : : * no replication slot is specified. It follows the same rule as
885 : : * CREATE SUBSCRIPTION.
886 : : */
887 [ + + + + : 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+ - ]
888 : 8 : genname = generate_object_name(conn);
889 [ + + ]: 8 : if (num_pubs == 0)
890 : 4 : dbinfo[i].pubname = pg_strdup(genname);
891 [ + + ]: 8 : if (num_subs == 0)
892 : 6 : dbinfo[i].subname = pg_strdup(genname);
893 [ + + ]: 8 : if (num_replslots == 0)
894 : 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
895 : :
164 akapila@postgresql.o 896 [ + + ]:GNC 8 : if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
897 : : {
898 : : /* Reuse existing publication on publisher. */
23 peter@eisentraut.org 899 : 1 : pg_log_info("using existing publication \"%s\" in database \"%s\"",
900 : : dbinfo[i].pubname, dbinfo[i].dbname);
901 : : /* Don't remove pre-existing publication if an error occurs. */
164 akapila@postgresql.o 902 : 1 : dbinfo[i].made_publication = false;
903 : : }
904 : : else
905 : : {
906 : : /*
907 : : * Create publication on publisher. This step should be executed
908 : : * *before* promoting the subscriber to avoid any transactions
909 : : * between consistent LSN and the new publication rows (such
910 : : * transactions wouldn't see the new publication rows resulting in
911 : : * an error).
912 : : */
913 : 7 : create_publication(conn, &dbinfo[i]);
914 : : }
915 : :
916 : : /* Create replication slot on publisher */
796 peter@eisentraut.org 917 [ + + ]:CBC 8 : if (lsn)
918 : 1 : pg_free(lsn);
919 : 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
233 michael@paquier.xyz 920 [ + + - + ]:GNC 8 : if (lsn == NULL && !dry_run)
796 peter@eisentraut.org 921 :UBC 0 : exit(1);
922 : :
923 : : /*
924 : : * Since we are using the LSN returned by the last replication slot as
925 : : * recovery_target_lsn, this LSN is ahead of the current WAL position
926 : : * and the recovery waits until the publisher writes a WAL record to
927 : : * reach the target and ends the recovery. On idle systems, this wait
928 : : * time is unpredictable and could lead to failure in promoting the
929 : : * subscriber. To avoid that, insert a harmless WAL record.
930 : : */
669 akapila@postgresql.o 931 [ + + + + ]:CBC 8 : if (i == num_dbs - 1 && !dry_run)
[ + + + + ]
932 : : {
933 : : PGresult *res;
934 : :
935 : 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
936 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
937 : : {
47 peter@eisentraut.org 938 :UBC 0 : pg_log_error("could not write an additional WAL record: %s",
939 : : PQresultErrorMessage(res));
669 akapila@postgresql.o 940 : 0 : disconnect_database(conn, true);
941 : : }
669 akapila@postgresql.o 942 :CBC 1 : PQclear(res);
943 : : }
944 : :
796 peter@eisentraut.org 945 : 8 : disconnect_database(conn, false);
946 : : }
947 : :
948 : 4 : return lsn;
949 : : }
950 : :
951 : : /*
952 : : * Is recovery still in progress?
953 : : */
954 : : static bool
955 : 15 : server_is_in_recovery(PGconn *conn)
956 : : {
957 : : PGresult *res;
958 : : int ret;
959 : :
960 : 15 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
961 : :
962 [ - + ]: 15 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
963 : : {
47 peter@eisentraut.org 964 :UBC 0 : pg_log_error("could not obtain recovery progress: %s",
965 : : PQresultErrorMessage(res));
796 966 : 0 : disconnect_database(conn, true);
967 : : }
968 : :
969 : :
796 peter@eisentraut.org 970 :CBC 15 : ret = strcmp("t", PQgetvalue(res, 0, 0));
971 : :
972 : 15 : PQclear(res);
973 : :
974 : 15 : return ret == 0;
975 : : }
976 : :
977 : : static void
66 akapila@postgresql.o 978 :GNC 1 : make_output_dirs(const char *log_basedir)
979 : : {
980 : : char timestamp[128];
981 : : struct timeval tval;
982 : : time_t now;
983 : : struct tm tmbuf;
984 : :
985 : : /* Generate timestamp */
986 : 1 : gettimeofday(&tval, NULL);
987 : 1 : now = tval.tv_sec;
988 : :
989 : 1 : strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
990 : 1 : localtime_r(&now, &tmbuf));
991 : :
992 : : /* Append milliseconds */
993 : 1 : snprintf(timestamp + strlen(timestamp),
994 : 1 : sizeof(timestamp) - strlen(timestamp), ".%03u",
995 : 1 : (unsigned int) (tval.tv_usec / 1000));
996 : :
997 : : /* Build timestamp directory path */
47 peter@eisentraut.org 998 : 1 : logdir = psprintf("%s/%s", log_basedir, timestamp);
999 : :
1000 : : /* Create base directory (ignore if exists) */
66 akapila@postgresql.o 1001 [ + - - + ]: 1 : if (mkdir(log_basedir, pg_dir_create_mode) < 0 && errno != EEXIST)
47 peter@eisentraut.org 1002 :UNC 0 : pg_fatal("could not create directory \"%s\": %m", log_basedir);
1003 : :
1004 : : /* Create a timestamp-named subdirectory under the base directory */
66 akapila@postgresql.o 1005 [ - + ]:GNC 1 : if (mkdir(logdir, pg_dir_create_mode) < 0)
47 peter@eisentraut.org 1006 :UNC 0 : pg_fatal("could not create directory \"%s\": %m", logdir);
66 akapila@postgresql.o 1007 :GNC 1 : }
1008 : :
1009 : : /*
1010 : : * Is the primary server ready for logical replication?
1011 : : *
1012 : : * XXX Does it not allow a synchronous replica?
1013 : : */
1014 : : static void
796 peter@eisentraut.org 1015 :CBC 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
1016 : : {
1017 : : PGconn *conn;
1018 : : PGresult *res;
1019 : 6 : bool failed = false;
1020 : :
1021 : : char *wal_level;
1022 : : int max_repslots;
1023 : : int cur_repslots;
1024 : : int max_walsenders;
1025 : : int cur_walsenders;
1026 : : int max_prepared_transactions;
1027 : : char *max_slot_wal_keep_size;
1028 : :
47 1029 : 6 : pg_log_info("checking settings on publisher");
1030 : :
796 1031 : 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1032 : :
1033 : : /*
1034 : : * If the primary server is in recovery (i.e. cascading replication),
1035 : : * objects (publication) cannot be created because it is read only.
1036 : : */
1037 [ + + ]: 6 : if (server_is_in_recovery(conn))
1038 : : {
47 1039 : 1 : pg_log_error("primary server cannot be in recovery");
796 1040 : 1 : disconnect_database(conn, true);
1041 : : }
1042 : :
1043 : : /*------------------------------------------------------------------------
1044 : : * Logical replication requires a few parameters to be set on publisher.
1045 : : * Since these parameters are not a requirement for physical replication,
1046 : : * we should check it to make sure it won't fail.
1047 : : *
1048 : : * - wal_level >= replica
1049 : : * - max_replication_slots >= current + number of dbs to be converted
1050 : : * - max_wal_senders >= current + number of dbs to be converted
1051 : : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1052 : : * -----------------------------------------------------------------------
1053 : : */
1054 : 5 : res = PQexec(conn,
1055 : : "SELECT pg_catalog.current_setting('wal_level'),"
1056 : : " pg_catalog.current_setting('max_replication_slots'),"
1057 : : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1058 : : " pg_catalog.current_setting('max_wal_senders'),"
1059 : : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1060 : : " pg_catalog.current_setting('max_prepared_transactions'),"
1061 : : " pg_catalog.current_setting('max_slot_wal_keep_size')");
1062 : :
1063 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1064 : : {
47 peter@eisentraut.org 1065 :UBC 0 : pg_log_error("could not obtain publisher settings: %s",
1066 : : PQresultErrorMessage(res));
796 1067 : 0 : disconnect_database(conn, true);
1068 : : }
1069 : :
796 peter@eisentraut.org 1070 :CBC 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1071 : 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
1072 : 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
1073 : 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
1074 : 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
699 tgl@sss.pgh.pa.us 1075 : 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
466 akapila@postgresql.o 1076 : 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1077 : :
796 peter@eisentraut.org 1078 : 5 : PQclear(res);
1079 : :
47 1080 [ + + ]: 5 : pg_log_debug("publisher: wal_level: %s", wal_level);
1081 [ + + ]: 5 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1082 [ + + ]: 5 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1083 [ + + ]: 5 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1084 [ + + ]: 5 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1085 [ + + ]: 5 : pg_log_debug("publisher: max_prepared_transactions: %d",
1086 : : max_prepared_transactions);
1087 [ + + ]: 5 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1088 : : max_slot_wal_keep_size);
1089 : :
796 1090 : 5 : disconnect_database(conn, false);
1091 : :
158 msawada@postgresql.o 1092 [ - + ]:GNC 5 : if (strcmp(wal_level, "minimal") == 0)
1093 : : {
47 peter@eisentraut.org 1094 :UNC 0 : pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
796 peter@eisentraut.org 1095 :LBC (1) : failed = true;
1096 : : }
1097 : :
796 peter@eisentraut.org 1098 [ + + ]:CBC 5 : if (max_repslots - cur_repslots < num_dbs)
1099 : : {
47 1100 : 1 : pg_log_error("publisher requires %d replication slots, but only %d remain",
1101 : : num_dbs, max_repslots - cur_repslots);
1102 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1103 : : "max_replication_slots", cur_repslots + num_dbs);
796 1104 : 1 : failed = true;
1105 : : }
1106 : :
1107 [ + + ]: 5 : if (max_walsenders - cur_walsenders < num_dbs)
1108 : : {
47 1109 : 1 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1110 : : num_dbs, max_walsenders - cur_walsenders);
1111 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1112 : : "max_wal_senders", cur_walsenders + num_dbs);
796 1113 : 1 : failed = true;
1114 : : }
1115 : :
458 akapila@postgresql.o 1116 [ - + - - ]: 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1117 : : {
47 peter@eisentraut.org 1118 :UBC 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
1119 : 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1120 : : "Prepared transactions will be replicated at COMMIT PREPARED.");
1121 : 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1122 : : }
1123 : :
1124 : : /*
1125 : : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1126 : : * is set to a non-default value, it may cause replication failures due to
1127 : : * required WAL files being prematurely removed.
1128 : : */
466 akapila@postgresql.o 1129 [ + + - + ]:CBC 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1130 : : {
47 peter@eisentraut.org 1131 :UBC 0 : pg_log_warning("required WAL could be removed from the publisher");
1132 : 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1133 : : "max_slot_wal_keep_size");
1134 : : }
1135 : :
789 tgl@sss.pgh.pa.us 1136 :CBC 5 : pg_free(wal_level);
1137 : :
796 peter@eisentraut.org 1138 [ + + ]: 5 : if (failed)
1139 : 1 : exit(1);
1140 : 4 : }
1141 : :
1142 : : /*
1143 : : * Is the standby server ready for logical replication?
1144 : : *
1145 : : * XXX Does it not allow a time-delayed replica?
1146 : : *
1147 : : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1148 : : * is S, it cannot detect there is a replica (server C) because server S starts
1149 : : * accepting only local connections and server C cannot connect to it. Hence,
1150 : : * there is not a reliable way to provide a suitable error saying the server C
1151 : : * will be broken at the end of this process (due to pg_resetwal).
1152 : : */
1153 : : static void
1154 : 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1155 : : {
1156 : : PGconn *conn;
1157 : : PGresult *res;
1158 : 8 : bool failed = false;
1159 : :
1160 : : int max_lrworkers;
1161 : : int max_replorigins;
1162 : : int max_wprocs;
1163 : :
47 1164 : 8 : pg_log_info("checking settings on subscriber");
1165 : :
796 1166 : 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1167 : :
1168 : : /* The target server must be a standby */
1169 [ + + ]: 8 : if (!server_is_in_recovery(conn))
1170 : : {
47 1171 : 1 : pg_log_error("target server must be a standby");
796 1172 : 1 : disconnect_database(conn, true);
1173 : : }
1174 : :
1175 : : /*------------------------------------------------------------------------
1176 : : * Logical replication requires a few parameters to be set on subscriber.
1177 : : * Since these parameters are not a requirement for physical replication,
1178 : : * we should check it to make sure it won't fail.
1179 : : *
1180 : : * - max_active_replication_origins >= number of dbs to be converted
1181 : : * - max_logical_replication_workers >= number of dbs to be converted
1182 : : * - max_worker_processes >= 1 + number of dbs to be converted
1183 : : *------------------------------------------------------------------------
1184 : : */
1185 : 7 : res = PQexec(conn,
1186 : : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1187 : : "'max_logical_replication_workers', "
1188 : : "'max_active_replication_origins', "
1189 : : "'max_worker_processes', "
1190 : : "'primary_slot_name') "
1191 : : "ORDER BY name");
1192 : :
1193 [ - + ]: 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1194 : : {
47 peter@eisentraut.org 1195 :UBC 0 : pg_log_error("could not obtain subscriber settings: %s",
1196 : : PQresultErrorMessage(res));
796 1197 : 0 : disconnect_database(conn, true);
1198 : : }
1199 : :
122 msawada@postgresql.o 1200 :GNC 7 : max_replorigins = atoi(PQgetvalue(res, 0, 0));
435 msawada@postgresql.o 1201 :CBC 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
796 peter@eisentraut.org 1202 : 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1203 [ + + ]: 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1204 : 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1205 : :
47 1206 [ + + ]: 7 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1207 : : max_lrworkers);
47 peter@eisentraut.org 1208 [ + + ]:GNC 7 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
47 peter@eisentraut.org 1209 [ + + ]:CBC 7 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
796 1210 [ + + ]: 7 : if (primary_slot_name)
47 1211 [ + + ]: 6 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1212 : :
796 1213 : 7 : PQclear(res);
1214 : :
1215 : 7 : disconnect_database(conn, false);
1216 : :
122 msawada@postgresql.o 1217 [ + + ]:GNC 7 : if (max_replorigins < num_dbs)
1218 : : {
47 peter@eisentraut.org 1219 :CBC 1 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1220 : : num_dbs, max_replorigins);
1221 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1222 : : "max_active_replication_origins", num_dbs);
796 1223 : 1 : failed = true;
1224 : : }
1225 : :
1226 [ + + ]: 7 : if (max_lrworkers < num_dbs)
1227 : : {
47 1228 : 1 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1229 : : num_dbs, max_lrworkers);
1230 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1231 : : "max_logical_replication_workers", num_dbs);
796 1232 : 1 : failed = true;
1233 : : }
1234 : :
1235 [ + + ]: 7 : if (max_wprocs < num_dbs + 1)
1236 : : {
47 1237 : 1 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1238 : : num_dbs + 1, max_wprocs);
1239 : 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1240 : : "max_worker_processes", num_dbs + 1);
796 1241 : 1 : failed = true;
1242 : : }
1243 : :
1244 [ + + ]: 7 : if (failed)
1245 : 1 : exit(1);
1246 : 6 : }
1247 : :
1248 : : /*
1249 : : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1250 : : * the primary (publisher node) and the newly created subscriber. We
1251 : : * shouldn't drop the associated slot as that would be used by the publisher
1252 : : * node.
1253 : : */
1254 : : static void
211 alvherre@kurilemu.de 1255 :GNC 4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1256 : : {
697 akapila@postgresql.o 1257 :CBC 4 : PQExpBuffer query = createPQExpBuffer();
1258 : : PGresult *res;
1259 : : char *subname_esc;
1260 : :
1261 [ - + ]: 4 : Assert(conn != NULL);
1262 : :
19 nathan@postgresql.or 1263 : 4 : subname_esc = PQescapeIdentifier(conn, subname, strlen(subname));
1264 : :
1265 : : /*
1266 : : * Construct a query string. These commands are allowed to be executed
1267 : : * within a transaction.
1268 : : */
697 akapila@postgresql.o 1269 : 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1270 : : subname_esc);
1271 : 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1272 : : subname_esc);
19 nathan@postgresql.or 1273 : 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname_esc);
1274 : :
1275 : 4 : PQfreemem(subname_esc);
1276 : :
211 alvherre@kurilemu.de 1277 [ + + ]: 4 : if (dry_run)
47 peter@eisentraut.org 1278 : 3 : pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1279 : : subname, dbname);
1280 : : else
1281 : : {
1282 : 1 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1283 : : subname, dbname);
1284 : :
697 akapila@postgresql.o 1285 : 1 : res = PQexec(conn, query->data);
1286 : :
1287 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1288 : : {
47 peter@eisentraut.org 1289 :UBC 0 : pg_log_error("could not drop subscription \"%s\": %s",
1290 : : subname, PQresultErrorMessage(res));
697 akapila@postgresql.o 1291 : 0 : disconnect_database(conn, true);
1292 : : }
1293 : :
697 akapila@postgresql.o 1294 :CBC 1 : PQclear(res);
1295 : : }
1296 : :
1297 : 4 : destroyPQExpBuffer(query);
1298 : 4 : }
1299 : :
1300 : : /*
1301 : : * Retrieve and drop the pre-existing subscriptions.
1302 : : */
1303 : : static void
1304 : 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1305 : : const struct LogicalRepInfo *dbinfo)
1306 : : {
1307 : 8 : PQExpBuffer query = createPQExpBuffer();
1308 : : char *dbname;
1309 : : PGresult *res;
1310 : :
1311 [ - + ]: 8 : Assert(conn != NULL);
1312 : :
1313 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1314 : :
1315 : 8 : appendPQExpBuffer(query,
1316 : : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1317 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1318 : : "WHERE d.datname = %s",
1319 : : dbname);
1320 : 8 : res = PQexec(conn, query->data);
1321 : :
1322 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1323 : : {
47 peter@eisentraut.org 1324 :UBC 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1325 : : PQresultErrorMessage(res));
697 akapila@postgresql.o 1326 : 0 : disconnect_database(conn, true);
1327 : : }
1328 : :
697 akapila@postgresql.o 1329 [ + + ]:CBC 12 : for (int i = 0; i < PQntuples(res); i++)
211 alvherre@kurilemu.de 1330 :GNC 4 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1331 : 4 : dbinfo->dbname);
1332 : :
697 akapila@postgresql.o 1333 :CBC 8 : PQclear(res);
1334 : 8 : destroyPQExpBuffer(query);
472 michael@paquier.xyz 1335 : 8 : PQfreemem(dbname);
697 akapila@postgresql.o 1336 : 8 : }
1337 : :
1338 : : /*
1339 : : * Create the subscriptions, adjust the initial location for logical
1340 : : * replication and enable the subscriptions. That's the last step for logical
1341 : : * replication setup.
1342 : : */
1343 : : static void
796 peter@eisentraut.org 1344 : 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1345 : : {
1346 [ + + ]: 12 : for (int i = 0; i < num_dbs; i++)
1347 : : {
1348 : : PGconn *conn;
1349 : :
1350 : : /* Connect to subscriber. */
1351 : 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1352 : :
1353 : : /*
1354 : : * We don't need the pre-existing subscriptions on the newly formed
1355 : : * subscriber. They can connect to other publisher nodes and either
1356 : : * get some unwarranted data or can lead to ERRORs in connecting to
1357 : : * such nodes.
1358 : : */
697 akapila@postgresql.o 1359 : 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1360 : :
1361 : : /* Check and drop the required publications in the given database. */
436 1362 : 8 : check_and_drop_publications(conn, &dbinfo[i]);
1363 : :
796 peter@eisentraut.org 1364 : 8 : create_subscription(conn, &dbinfo[i]);
1365 : :
1366 : : /* Set the replication progress to the correct LSN */
1367 : 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1368 : :
1369 : : /* Enable subscription */
1370 : 8 : enable_subscription(conn, &dbinfo[i]);
1371 : :
1372 : 8 : disconnect_database(conn, false);
1373 : : }
1374 : 4 : }
1375 : :
1376 : : /*
1377 : : * Write the required recovery parameters.
1378 : : */
1379 : : static void
1380 : 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1381 : : {
1382 : : PGconn *conn;
1383 : : PQExpBuffer recoveryconfcontents;
1384 : :
1385 : : /*
1386 : : * Despite of the recovery parameters will be written to the subscriber,
1387 : : * use a publisher connection. The primary_conninfo is generated using the
1388 : : * connection settings.
1389 : : */
1390 : 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1391 : :
1392 : : /*
1393 : : * Write recovery parameters.
1394 : : *
1395 : : * The subscriber is not running yet. In dry run mode, the recovery
1396 : : * parameters *won't* be written. An invalid LSN is used for printing
1397 : : * purposes. Additional recovery parameters are added here. It avoids
1398 : : * unexpected behavior such as end of recovery as soon as a consistent
1399 : : * state is reached (recovery_target) and failure due to multiple recovery
1400 : : * targets (name, time, xid, LSN).
1401 : : */
1402 : 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
407 drowley@postgresql.o 1403 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1404 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1405 : : "recovery_target_timeline = 'latest'\n");
1406 : :
1407 : : /*
1408 : : * Set recovery_target_inclusive = false to avoid reapplying the
1409 : : * transaction committed at 'lsn' after subscription is enabled. This is
1410 : : * because the provided 'lsn' is also used as the replication start point
1411 : : * for the subscription. So, the server can send the transaction committed
1412 : : * at that 'lsn' after replication is started which can lead to applying
1413 : : * the same transaction twice if we keep recovery_target_inclusive = true.
1414 : : */
1415 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1416 : : "recovery_target_inclusive = false\n");
1417 : 4 : appendPQExpBufferStr(recoveryconfcontents,
1418 : : "recovery_target_action = promote\n");
1419 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1420 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1421 : 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1422 : :
796 peter@eisentraut.org 1423 [ + + ]: 4 : if (dry_run)
1424 : : {
211 alvherre@kurilemu.de 1425 : 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
796 peter@eisentraut.org 1426 : 3 : appendPQExpBuffer(recoveryconfcontents,
1427 : : "recovery_target_lsn = '%X/%08X'\n",
796 peter@eisentraut.org 1428 :ECB (3) : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1429 : : }
1430 : : else
1431 : : {
796 peter@eisentraut.org 1432 :CBC 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1433 : : lsn);
1434 : : }
1435 : :
47 1436 [ + + ]: 4 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1437 : :
142 michael@paquier.xyz 1438 [ + + ]:GNC 4 : if (!dry_run)
1439 : : {
1440 : : char conf_filename[MAXPGPATH];
1441 : : FILE *fd;
1442 : :
1443 : : /* Write the recovery parameters to INCLUDED_CONF_FILE */
1444 : 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1445 : : INCLUDED_CONF_FILE);
1446 : 1 : fd = fopen(conf_filename, "w");
1447 [ - + ]: 1 : if (fd == NULL)
47 peter@eisentraut.org 1448 :UNC 0 : pg_fatal("could not open file \"%s\": %m", conf_filename);
1449 : :
142 michael@paquier.xyz 1450 [ - + ]:GNC 1 : if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
47 peter@eisentraut.org 1451 :UNC 0 : pg_fatal("could not write to file \"%s\": %m", conf_filename);
1452 : :
142 michael@paquier.xyz 1453 :GNC 1 : fclose(fd);
1454 : 1 : recovery_params_set = true;
1455 : :
1456 : : /* Include conditionally the recovery parameters. */
1457 : 1 : resetPQExpBuffer(recoveryconfcontents);
1458 : 1 : appendPQExpBufferStr(recoveryconfcontents,
1459 : : "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1460 : 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1461 : : }
1462 : :
1463 : 4 : disconnect_database(conn, false);
796 peter@eisentraut.org 1464 :CBC 4 : }
1465 : :
1466 : : /*
1467 : : * Drop physical replication slot on primary if the standby was using it. After
1468 : : * the transformation, it has no use.
1469 : : *
1470 : : * XXX we might not fail here. Instead, we provide a warning so the user
1471 : : * eventually drops this replication slot later.
1472 : : */
1473 : : static void
1474 : 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1475 : : {
1476 : : PGconn *conn;
1477 : :
1478 : : /* Replication slot does not exist, do nothing */
1479 [ - + ]: 4 : if (!primary_slot_name)
796 peter@eisentraut.org 1480 :UBC 0 : return;
1481 : :
796 peter@eisentraut.org 1482 :CBC 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1483 [ + - ]: 4 : if (conn != NULL)
1484 : : {
1485 : 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1486 : 4 : disconnect_database(conn, false);
1487 : : }
1488 : : else
1489 : : {
47 peter@eisentraut.org 1490 :UBC 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1491 : : slotname);
1492 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1493 : : }
1494 : : }
1495 : :
1496 : : /*
1497 : : * Drop failover replication slots on subscriber. After the transformation,
1498 : : * they have no use.
1499 : : *
1500 : : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1501 : : * them later.
1502 : : */
1503 : : static void
712 peter@eisentraut.org 1504 :CBC 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1505 : : {
1506 : : PGconn *conn;
1507 : : PGresult *res;
1508 : :
1509 : 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1510 [ + - ]: 4 : if (conn != NULL)
1511 : : {
1512 : : /* Get failover replication slot names */
1513 : 4 : res = PQexec(conn,
1514 : : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1515 : :
1516 [ + - ]: 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1517 : : {
1518 : : /* Remove failover replication slots from subscriber */
1519 [ + + ]: 8 : for (int i = 0; i < PQntuples(res); i++)
1520 : 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1521 : : }
1522 : : else
1523 : : {
47 peter@eisentraut.org 1524 :UBC 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1525 : : PQresultErrorMessage(res));
1526 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1527 : : }
1528 : :
712 peter@eisentraut.org 1529 :CBC 4 : PQclear(res);
1530 : 4 : disconnect_database(conn, false);
1531 : : }
1532 : : else
1533 : : {
47 peter@eisentraut.org 1534 :UBC 0 : pg_log_warning("could not drop failover replication slot");
1535 : 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1536 : : }
712 peter@eisentraut.org 1537 :CBC 4 : }
1538 : :
1539 : : /*
1540 : : * Create a logical replication slot and returns a LSN.
1541 : : *
1542 : : * CreateReplicationSlot() is not used because it does not provide the one-row
1543 : : * result set that contains the LSN.
1544 : : */
1545 : : static char *
796 1546 : 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1547 : : {
1548 : 8 : PQExpBuffer str = createPQExpBuffer();
1549 : 8 : PGresult *res = NULL;
1550 : 8 : const char *slot_name = dbinfo->replslotname;
1551 : : char *slot_name_esc;
1552 : 8 : char *lsn = NULL;
1553 : :
1554 [ - + ]: 8 : Assert(conn != NULL);
1555 : :
211 alvherre@kurilemu.de 1556 [ + + ]: 8 : if (dry_run)
47 peter@eisentraut.org 1557 : 6 : pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1558 : : slot_name, dbinfo->dbname);
1559 : : else
1560 : 2 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1561 : : slot_name, dbinfo->dbname);
1562 : :
796 1563 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1564 : :
1565 : 8 : appendPQExpBuffer(str,
1566 : : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1567 : : slot_name_esc,
458 akapila@postgresql.o 1568 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
1569 : :
472 michael@paquier.xyz 1570 : 8 : PQfreemem(slot_name_esc);
1571 : :
47 peter@eisentraut.org 1572 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1573 : :
796 1574 [ + + ]: 8 : if (!dry_run)
1575 : : {
1576 : 2 : res = PQexec(conn, str->data);
1577 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1578 : : {
47 peter@eisentraut.org 1579 :UBC 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1580 : : slot_name, dbinfo->dbname,
1581 : : PQresultErrorMessage(res));
789 tgl@sss.pgh.pa.us 1582 : 0 : PQclear(res);
1583 : 0 : destroyPQExpBuffer(str);
796 peter@eisentraut.org 1584 : 0 : return NULL;
1585 : : }
1586 : :
796 peter@eisentraut.org 1587 :CBC 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1588 : 2 : PQclear(res);
1589 : : }
1590 : :
1591 : : /* For cleanup purposes */
1592 : 8 : dbinfo->made_replslot = true;
1593 : :
1594 : 8 : destroyPQExpBuffer(str);
1595 : :
1596 : 8 : return lsn;
1597 : : }
1598 : :
1599 : : static void
1600 : 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1601 : : const char *slot_name)
1602 : : {
1603 : 8 : PQExpBuffer str = createPQExpBuffer();
1604 : : char *slot_name_esc;
1605 : : PGresult *res;
1606 : :
1607 [ - + ]: 8 : Assert(conn != NULL);
1608 : :
211 alvherre@kurilemu.de 1609 [ + + ]: 8 : if (dry_run)
47 peter@eisentraut.org 1610 : 6 : pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1611 : : slot_name, dbinfo->dbname);
1612 : : else
1613 : 2 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1614 : : slot_name, dbinfo->dbname);
1615 : :
796 1616 : 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1617 : :
1618 : 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1619 : :
472 michael@paquier.xyz 1620 : 8 : PQfreemem(slot_name_esc);
1621 : :
47 peter@eisentraut.org 1622 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
1623 : :
796 1624 [ + + ]: 8 : if (!dry_run)
1625 : : {
1626 : 2 : res = PQexec(conn, str->data);
1627 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1628 : : {
47 peter@eisentraut.org 1629 :UBC 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1630 : : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1631 : : }
1632 : :
796 peter@eisentraut.org 1633 :CBC 2 : PQclear(res);
1634 : : }
1635 : :
1636 : 8 : destroyPQExpBuffer(str);
1637 : 8 : }
1638 : :
1639 : : /*
1640 : : * Reports a suitable message if pg_ctl fails.
1641 : : */
1642 : : static void
1643 : 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1644 : : {
1645 [ - + ]: 24 : if (rc != 0)
1646 : : {
796 peter@eisentraut.org 1647 [ # # ]:UBC 0 : if (WIFEXITED(rc))
1648 : : {
47 1649 : 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1650 : : }
796 1651 [ # # ]: 0 : else if (WIFSIGNALED(rc))
1652 : : {
1653 : : #if defined(WIN32)
1654 : : pg_log_error("pg_ctl was terminated by exception 0x%X",
1655 : : WTERMSIG(rc));
1656 : : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1657 : : #else
47 1658 : 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1659 : : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1660 : : #endif
1661 : : }
1662 : : else
1663 : : {
1664 : 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1665 : : }
1666 : :
1667 : 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
796 1668 : 0 : exit(1);
1669 : : }
796 peter@eisentraut.org 1670 :CBC 24 : }
1671 : :
1672 : : static void
697 akapila@postgresql.o 1673 : 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1674 : : bool restrict_logical_worker)
1675 : : {
796 peter@eisentraut.org 1676 : 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1677 : : int rc;
1678 : :
699 tgl@sss.pgh.pa.us 1679 : 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1680 : 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
407 drowley@postgresql.o 1681 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1682 : :
1683 : : /* Prevent unintended slot invalidation */
408 1684 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1685 : :
796 peter@eisentraut.org 1686 [ + - ]: 12 : if (restricted_access)
1687 : : {
1688 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1689 : : #if !defined(WIN32)
1690 : :
1691 : : /*
1692 : : * An empty listen_addresses list means the server does not listen on
1693 : : * any IP interfaces; only Unix-domain sockets can be used to connect
1694 : : * to the server. Prevent external connections to minimize the chance
1695 : : * of failure.
1696 : : */
1697 : 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1698 [ + - ]: 12 : if (opt->socket_dir)
1699 : 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1700 : 12 : opt->socket_dir);
1701 : 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1702 : : #endif
1703 : : }
1704 [ - + ]: 12 : if (opt->config_file != NULL)
796 peter@eisentraut.org 1705 :UBC 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1706 : 0 : opt->config_file);
1707 : :
1708 : : /* Suppress to start logical replication if requested */
697 akapila@postgresql.o 1709 [ + + ]:CBC 12 : if (restrict_logical_worker)
408 drowley@postgresql.o 1710 : 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1711 : :
66 akapila@postgresql.o 1712 [ + + ]:GNC 12 : if (opt->log_dir)
1713 : 2 : appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s\"", logdir, SERVER_LOG_FILE_NAME);
1714 : :
47 peter@eisentraut.org 1715 [ + + ]:CBC 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
796 1716 : 12 : rc = system(pg_ctl_cmd->data);
1717 : 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1718 : 12 : standby_running = true;
1719 : 12 : destroyPQExpBuffer(pg_ctl_cmd);
47 1720 : 12 : pg_log_info("server was started");
796 1721 : 12 : }
1722 : :
1723 : : static void
1724 : 12 : stop_standby_server(const char *datadir)
1725 : : {
1726 : : char *pg_ctl_cmd;
1727 : : int rc;
1728 : :
1729 : 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1730 : : datadir);
47 1731 [ + + ]: 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
796 1732 : 12 : rc = system(pg_ctl_cmd);
1733 : 12 : pg_ctl_status(pg_ctl_cmd, rc);
1734 : 12 : standby_running = false;
47 1735 : 12 : pg_log_info("server was stopped");
796 1736 : 12 : }
1737 : :
1738 : : /*
1739 : : * Returns after the server finishes the recovery process.
1740 : : *
1741 : : * If recovery_timeout option is set, terminate abnormally without finishing
1742 : : * the recovery process. By default, it waits forever.
1743 : : *
1744 : : * XXX Is the recovery process still in progress? When recovery process has a
1745 : : * better progress reporting mechanism, it should be added here.
1746 : : */
1747 : : static void
1748 : 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1749 : : {
1750 : : PGconn *conn;
208 alvherre@kurilemu.de 1751 :GNC 4 : bool ready = false;
796 peter@eisentraut.org 1752 :CBC 4 : int timer = 0;
1753 : :
47 1754 : 4 : pg_log_info("waiting for the target server to reach the consistent state");
1755 : :
796 1756 : 4 : conn = connect_database(conninfo, true);
1757 : :
1758 : : for (;;)
1759 : : {
1760 : : /* Did the recovery process finish? We're done if so. */
211 alvherre@kurilemu.de 1761 [ + + + - ]: 4 : if (dry_run || !server_is_in_recovery(conn))
1762 : : {
208 alvherre@kurilemu.de 1763 :GNC 4 : ready = true;
796 peter@eisentraut.org 1764 :CBC 4 : recovery_ended = true;
1765 : 4 : break;
1766 : : }
1767 : :
1768 : : /* Bail out after recovery_timeout seconds if this option is set */
796 peter@eisentraut.org 1769 [ # # # # ]:UBC 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1770 : : {
1771 : 0 : stop_standby_server(subscriber_dir);
47 1772 : 0 : pg_log_error("recovery timed out");
796 1773 : 0 : disconnect_database(conn, true);
1774 : : }
1775 : :
1776 : : /* Keep waiting */
207 alvherre@kurilemu.de 1777 :UNC 0 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
796 peter@eisentraut.org 1778 :UBC 0 : timer += WAIT_INTERVAL;
1779 : : }
1780 : :
796 peter@eisentraut.org 1781 :CBC 4 : disconnect_database(conn, false);
1782 : :
208 alvherre@kurilemu.de 1783 [ - + ]:GNC 4 : if (!ready)
47 peter@eisentraut.org 1784 :UBC 0 : pg_fatal("server did not end recovery");
1785 : :
47 peter@eisentraut.org 1786 :CBC 4 : pg_log_info("target server reached the consistent state");
1787 : 4 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
796 1788 : 4 : }
1789 : :
1790 : : /*
1791 : : * Create a publication that includes all tables in the database.
1792 : : */
1793 : : static void
1794 : 7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1795 : : {
1796 : 7 : PQExpBuffer str = createPQExpBuffer();
1797 : : PGresult *res;
1798 : : char *ipubname_esc;
1799 : : char *spubname_esc;
1800 : :
1801 [ - + ]: 7 : Assert(conn != NULL);
1802 : :
1803 : 7 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1804 : 7 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1805 : :
1806 : : /* Check if the publication already exists */
1807 : 7 : appendPQExpBuffer(str,
1808 : : "SELECT 1 FROM pg_catalog.pg_publication "
1809 : : "WHERE pubname = %s",
1810 : : spubname_esc);
1811 : 7 : res = PQexec(conn, str->data);
1812 [ - + ]: 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1813 : : {
47 peter@eisentraut.org 1814 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1815 : : PQresultErrorMessage(res));
796 1816 : 0 : disconnect_database(conn, true);
1817 : : }
1818 : :
796 peter@eisentraut.org 1819 [ - + ]:CBC 7 : if (PQntuples(res) == 1)
1820 : : {
1821 : : /*
1822 : : * Unfortunately, if it reaches this code path, it will always fail
1823 : : * (unless you decide to change the existing publication name). That's
1824 : : * bad but it is very unlikely that the user will choose a name with
1825 : : * pg_createsubscriber_ prefix followed by the exact database oid and
1826 : : * a random number.
1827 : : */
47 peter@eisentraut.org 1828 :UBC 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1829 : 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
796 1830 : 0 : disconnect_database(conn, true);
1831 : : }
1832 : :
796 peter@eisentraut.org 1833 :CBC 7 : PQclear(res);
1834 : 7 : resetPQExpBuffer(str);
1835 : :
211 alvherre@kurilemu.de 1836 [ + + ]: 7 : if (dry_run)
47 peter@eisentraut.org 1837 : 6 : pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1838 : : dbinfo->pubname, dbinfo->dbname);
1839 : : else
1840 : 1 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1841 : : dbinfo->pubname, dbinfo->dbname);
1842 : :
796 1843 : 7 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1844 : : ipubname_esc);
1845 : :
47 1846 [ + + ]: 7 : pg_log_debug("command is: %s", str->data);
1847 : :
796 1848 [ + + ]: 7 : if (!dry_run)
1849 : : {
1850 : 1 : res = PQexec(conn, str->data);
1851 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1852 : : {
47 peter@eisentraut.org 1853 :UBC 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1854 : : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
796 1855 : 0 : disconnect_database(conn, true);
1856 : : }
796 peter@eisentraut.org 1857 :CBC 1 : PQclear(res);
1858 : : }
1859 : :
1860 : : /* For cleanup purposes */
1861 : 7 : dbinfo->made_publication = true;
1862 : :
472 michael@paquier.xyz 1863 : 7 : PQfreemem(ipubname_esc);
1864 : 7 : PQfreemem(spubname_esc);
796 peter@eisentraut.org 1865 : 7 : destroyPQExpBuffer(str);
1866 : 7 : }
1867 : :
1868 : : /*
1869 : : * Drop the specified publication in the given database.
1870 : : */
1871 : : static void
3 fujii@postgresql.org 1872 : 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname)
1873 : : {
796 peter@eisentraut.org 1874 : 10 : PQExpBuffer str = createPQExpBuffer();
1875 : : PGresult *res;
1876 : : char *pubname_esc;
1877 : :
1878 [ - + ]: 10 : Assert(conn != NULL);
1879 : :
436 akapila@postgresql.o 1880 : 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1881 : :
211 alvherre@kurilemu.de 1882 [ + + ]: 10 : if (dry_run)
47 peter@eisentraut.org 1883 : 6 : pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1884 : : pubname, dbname);
1885 : : else
1886 : 4 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1887 : : pubname, dbname);
1888 : :
796 1889 : 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1890 : :
472 michael@paquier.xyz 1891 : 10 : PQfreemem(pubname_esc);
1892 : :
47 peter@eisentraut.org 1893 [ + + ]: 10 : pg_log_debug("command is: %s", str->data);
1894 : :
796 1895 [ + + ]: 10 : if (!dry_run)
1896 : : {
1897 : 4 : res = PQexec(conn, str->data);
1898 [ - + ]: 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1899 : : {
47 peter@eisentraut.org 1900 :UBC 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1901 : : pubname, dbname, PQresultErrorMessage(res));
1902 : :
1903 : : /*
1904 : : * Don't disconnect and exit here. This routine is used by primary
1905 : : * (cleanup publication / replication slot due to an error) and
1906 : : * subscriber (remove the replicated publications). In both cases,
1907 : : * it can continue and provide instructions for the user to remove
1908 : : * it later if cleanup fails.
1909 : : */
1910 : : }
796 peter@eisentraut.org 1911 :CBC 4 : PQclear(res);
1912 : : }
1913 : :
1914 : 10 : destroyPQExpBuffer(str);
1915 : 10 : }
1916 : :
1917 : : /*
1918 : : * Retrieve and drop the publications.
1919 : : *
1920 : : * Publications copied during physical replication remain on the subscriber
1921 : : * after promotion. If --clean=publications is specified, drop all existing
1922 : : * publications in the subscriber database. Otherwise, only drop publications
1923 : : * that were created by pg_createsubscriber during this operation.
1924 : : */
1925 : : static void
436 akapila@postgresql.o 1926 : 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1927 : : {
1928 : : PGresult *res;
339 peter@eisentraut.org 1929 : 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1930 : :
436 akapila@postgresql.o 1931 [ - + ]: 8 : Assert(conn != NULL);
1932 : :
1933 [ + + ]: 8 : if (drop_all_pubs)
1934 : : {
47 peter@eisentraut.org 1935 : 2 : pg_log_info("dropping all existing publications in database \"%s\"",
1936 : : dbinfo->dbname);
1937 : :
1938 : : /* Fetch all publication names */
436 akapila@postgresql.o 1939 : 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1940 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1941 : : {
47 peter@eisentraut.org 1942 :UBC 0 : pg_log_error("could not obtain publication information: %s",
1943 : : PQresultErrorMessage(res));
436 akapila@postgresql.o 1944 : 0 : PQclear(res);
1945 : 0 : disconnect_database(conn, true);
1946 : : }
1947 : :
1948 : : /* Drop each publication */
228 msawada@postgresql.o 1949 [ + + ]:CBC 6 : for (int i = 0; i < PQntuples(res); i++)
3 fujii@postgresql.org 1950 : 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname);
1951 : :
436 akapila@postgresql.o 1952 : 2 : PQclear(res);
1953 : : }
1954 : : else
1955 : : {
1956 : : /* Drop publication only if it was created by this tool */
164 akapila@postgresql.o 1957 [ + - ]:GNC 6 : if (dbinfo->made_publication)
1958 : : {
3 fujii@postgresql.org 1959 : 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname);
1960 : : }
1961 : : else
1962 : : {
164 akapila@postgresql.o 1963 [ # # ]:UNC 0 : if (dry_run)
47 peter@eisentraut.org 1964 : 0 : pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1965 : : dbinfo->pubname, dbinfo->dbname);
1966 : : else
23 1967 : 0 : pg_log_info("preserving existing publication \"%s\" in database \"%s\"",
1968 : : dbinfo->pubname, dbinfo->dbname);
1969 : : }
1970 : : }
436 akapila@postgresql.o 1971 :CBC 8 : }
1972 : :
1973 : : /*
1974 : : * Create a subscription with some predefined options.
1975 : : *
1976 : : * A replication slot was already created in a previous step. Let's use it. It
1977 : : * is not required to copy data. The subscription will be created but it will
1978 : : * not be enabled now. That's because the replication progress must be set and
1979 : : * the replication origin name (one of the function arguments) contains the
1980 : : * subscription OID in its name. Once the subscription is created,
1981 : : * set_replication_progress() can obtain the chosen origin name and set up its
1982 : : * initial location.
1983 : : */
1984 : : static void
796 peter@eisentraut.org 1985 : 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1986 : : {
1987 : 8 : PQExpBuffer str = createPQExpBuffer();
1988 : : PGresult *res;
1989 : : char *pubname_esc;
1990 : : char *subname_esc;
1991 : : char *pubconninfo_esc;
1992 : : char *replslotname_esc;
1993 : :
1994 [ - + ]: 8 : Assert(conn != NULL);
1995 : :
1996 : 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1997 : 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1998 : 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1999 : 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
2000 : :
211 alvherre@kurilemu.de 2001 [ + + ]: 8 : if (dry_run)
47 peter@eisentraut.org 2002 : 6 : pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
2003 : : dbinfo->subname, dbinfo->dbname);
2004 : : else
2005 : 2 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
2006 : : dbinfo->subname, dbinfo->dbname);
2007 : :
796 2008 : 8 : appendPQExpBuffer(str,
2009 : : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2010 : : "WITH (create_slot = false, enabled = false, "
2011 : : "slot_name = %s, copy_data = false, two_phase = %s)",
2012 : : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
458 akapila@postgresql.o 2013 [ + + ]: 8 : dbinfos.two_phase ? "true" : "false");
2014 : :
472 michael@paquier.xyz 2015 : 8 : PQfreemem(pubname_esc);
2016 : 8 : PQfreemem(subname_esc);
2017 : 8 : PQfreemem(pubconninfo_esc);
2018 : 8 : PQfreemem(replslotname_esc);
2019 : :
47 peter@eisentraut.org 2020 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
2021 : :
796 2022 [ + + ]: 8 : if (!dry_run)
2023 : : {
2024 : 2 : res = PQexec(conn, str->data);
2025 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2026 : : {
47 peter@eisentraut.org 2027 :UBC 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
2028 : : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
796 2029 : 0 : disconnect_database(conn, true);
2030 : : }
796 peter@eisentraut.org 2031 :CBC 2 : PQclear(res);
2032 : : }
2033 : :
2034 : 8 : destroyPQExpBuffer(str);
2035 : 8 : }
2036 : :
2037 : : /*
2038 : : * Sets the replication progress to the consistent LSN.
2039 : : *
2040 : : * The subscriber caught up to the consistent LSN provided by the last
2041 : : * replication slot that was created. The goal is to set up the initial
2042 : : * location for the logical replication that is the exact LSN that the
2043 : : * subscriber was promoted. Once the subscription is enabled it will start
2044 : : * streaming from that location onwards. In dry run mode, the subscription OID
2045 : : * and LSN are set to invalid values for printing purposes.
2046 : : */
2047 : : static void
2048 : 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2049 : : {
2050 : 8 : PQExpBuffer str = createPQExpBuffer();
2051 : : PGresult *res;
2052 : : Oid suboid;
2053 : : char *subname;
2054 : : char *dbname;
2055 : : char *originname;
2056 : : char *lsnstr;
2057 : :
2058 [ - + ]: 8 : Assert(conn != NULL);
2059 : :
2060 : 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2061 : 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2062 : :
2063 : 8 : appendPQExpBuffer(str,
2064 : : "SELECT s.oid FROM pg_catalog.pg_subscription s "
2065 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2066 : : "WHERE s.subname = %s AND d.datname = %s",
2067 : : subname, dbname);
2068 : :
2069 : 8 : res = PQexec(conn, str->data);
2070 [ - + ]: 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2071 : : {
47 peter@eisentraut.org 2072 :UBC 0 : pg_log_error("could not obtain subscription OID: %s",
2073 : : PQresultErrorMessage(res));
796 2074 : 0 : disconnect_database(conn, true);
2075 : : }
2076 : :
796 peter@eisentraut.org 2077 [ + + - + ]:CBC 8 : if (PQntuples(res) != 1 && !dry_run)
2078 : : {
47 peter@eisentraut.org 2079 :UBC 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2080 : : PQntuples(res), 1);
796 2081 : 0 : disconnect_database(conn, true);
2082 : : }
2083 : :
796 peter@eisentraut.org 2084 [ + + ]:CBC 8 : if (dry_run)
2085 : : {
2086 : 6 : suboid = InvalidOid;
327 alvherre@kurilemu.de 2087 :GNC 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2088 : : }
2089 : : else
2090 : : {
796 peter@eisentraut.org 2091 :CBC 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2092 : 2 : lsnstr = psprintf("%s", lsn);
2093 : : }
2094 : :
2095 : 8 : PQclear(res);
2096 : :
2097 : : /*
2098 : : * The origin name is defined as pg_%u. %u is the subscription OID. See
2099 : : * ApplyWorkerMain().
2100 : : */
2101 : 8 : originname = psprintf("pg_%u", suboid);
2102 : :
211 alvherre@kurilemu.de 2103 [ + + ]: 8 : if (dry_run)
47 peter@eisentraut.org 2104 : 6 : pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2105 : : originname, lsnstr, dbinfo->dbname);
2106 : : else
2107 : 2 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2108 : : originname, lsnstr, dbinfo->dbname);
2109 : :
796 2110 : 8 : resetPQExpBuffer(str);
2111 : 8 : appendPQExpBuffer(str,
2112 : : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2113 : : originname, lsnstr);
2114 : :
47 2115 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
2116 : :
796 2117 [ + + ]: 8 : if (!dry_run)
2118 : : {
2119 : 2 : res = PQexec(conn, str->data);
2120 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2121 : : {
47 peter@eisentraut.org 2122 :UBC 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
2123 : : dbinfo->subname, PQresultErrorMessage(res));
796 2124 : 0 : disconnect_database(conn, true);
2125 : : }
796 peter@eisentraut.org 2126 :CBC 2 : PQclear(res);
2127 : : }
2128 : :
472 michael@paquier.xyz 2129 : 8 : PQfreemem(subname);
2130 : 8 : PQfreemem(dbname);
796 peter@eisentraut.org 2131 : 8 : pg_free(originname);
2132 : 8 : pg_free(lsnstr);
2133 : 8 : destroyPQExpBuffer(str);
2134 : 8 : }
2135 : :
2136 : : /*
2137 : : * Enables the subscription.
2138 : : *
2139 : : * The subscription was created in a previous step but it was disabled. After
2140 : : * adjusting the initial logical replication location, enable the subscription.
2141 : : */
2142 : : static void
2143 : 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2144 : : {
2145 : 8 : PQExpBuffer str = createPQExpBuffer();
2146 : : PGresult *res;
2147 : : char *subname;
2148 : :
2149 [ - + ]: 8 : Assert(conn != NULL);
2150 : :
2151 : 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2152 : :
211 alvherre@kurilemu.de 2153 [ + + ]: 8 : if (dry_run)
47 peter@eisentraut.org 2154 : 6 : pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2155 : : dbinfo->subname, dbinfo->dbname);
2156 : : else
2157 : 2 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2158 : : dbinfo->subname, dbinfo->dbname);
2159 : :
796 2160 : 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2161 : :
47 2162 [ + + ]: 8 : pg_log_debug("command is: %s", str->data);
2163 : :
796 2164 [ + + ]: 8 : if (!dry_run)
2165 : : {
2166 : 2 : res = PQexec(conn, str->data);
2167 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2168 : : {
47 peter@eisentraut.org 2169 :UBC 0 : pg_log_error("could not enable subscription \"%s\": %s",
2170 : : dbinfo->subname, PQresultErrorMessage(res));
796 2171 : 0 : disconnect_database(conn, true);
2172 : : }
2173 : :
796 peter@eisentraut.org 2174 :CBC 2 : PQclear(res);
2175 : : }
2176 : :
472 michael@paquier.xyz 2177 : 8 : PQfreemem(subname);
796 peter@eisentraut.org 2178 : 8 : destroyPQExpBuffer(str);
2179 : 8 : }
2180 : :
2181 : : /*
2182 : : * Fetch a list of all connectable non-template databases from the source server
2183 : : * and form a list such that they appear as if the user has specified multiple
2184 : : * --database options, one for each source database.
2185 : : */
2186 : : static void
428 akapila@postgresql.o 2187 : 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2188 : : bool dbnamespecified)
2189 : : {
2190 : : PGconn *conn;
2191 : : PGresult *res;
2192 : :
2193 : : /* If a database name was specified, just connect to it. */
2194 [ - + ]: 1 : if (dbnamespecified)
428 akapila@postgresql.o 2195 :UBC 0 : conn = connect_database(opt->pub_conninfo_str, true);
2196 : : else
2197 : : {
2198 : : /* Otherwise, try postgres first and then template1. */
2199 : : char *conninfo;
2200 : :
428 akapila@postgresql.o 2201 :CBC 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2202 : 1 : conn = connect_database(conninfo, false);
2203 : 1 : pg_free(conninfo);
2204 [ - + ]: 1 : if (!conn)
2205 : : {
428 akapila@postgresql.o 2206 :UBC 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2207 : 0 : conn = connect_database(conninfo, true);
2208 : 0 : pg_free(conninfo);
2209 : : }
2210 : : }
2211 : :
428 akapila@postgresql.o 2212 :CBC 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2213 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2214 : : {
47 peter@eisentraut.org 2215 :UBC 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
428 akapila@postgresql.o 2216 : 0 : PQclear(res);
2217 : 0 : disconnect_database(conn, true);
2218 : : }
2219 : :
428 akapila@postgresql.o 2220 [ + + ]:CBC 4 : for (int i = 0; i < PQntuples(res); i++)
2221 : : {
2222 : 3 : const char *dbname = PQgetvalue(res, i, 0);
2223 : :
2224 : 3 : simple_string_list_append(&opt->database_names, dbname);
2225 : :
2226 : : /* Increment num_dbs to reflect multiple --database options */
2227 : 3 : num_dbs++;
2228 : : }
2229 : :
2230 : 1 : PQclear(res);
2231 : 1 : disconnect_database(conn, false);
2232 : 1 : }
2233 : :
2234 : : int
796 peter@eisentraut.org 2235 : 23 : main(int argc, char **argv)
2236 : : {
2237 : : static struct option long_options[] =
2238 : : {
2239 : : {"all", no_argument, NULL, 'a'},
2240 : : {"database", required_argument, NULL, 'd'},
2241 : : {"pgdata", required_argument, NULL, 'D'},
2242 : : {"logdir", required_argument, NULL, 'l'},
2243 : : {"dry-run", no_argument, NULL, 'n'},
2244 : : {"subscriber-port", required_argument, NULL, 'p'},
2245 : : {"publisher-server", required_argument, NULL, 'P'},
2246 : : {"socketdir", required_argument, NULL, 's'},
2247 : : {"recovery-timeout", required_argument, NULL, 't'},
2248 : : {"enable-two-phase", no_argument, NULL, 'T'},
2249 : : {"subscriber-username", required_argument, NULL, 'U'},
2250 : : {"verbose", no_argument, NULL, 'v'},
2251 : : {"version", no_argument, NULL, 'V'},
2252 : : {"help", no_argument, NULL, '?'},
2253 : : {"config-file", required_argument, NULL, 1},
2254 : : {"publication", required_argument, NULL, 2},
2255 : : {"replication-slot", required_argument, NULL, 3},
2256 : : {"subscription", required_argument, NULL, 4},
2257 : : {"clean", required_argument, NULL, 5},
2258 : : {NULL, 0, NULL, 0}
2259 : : };
2260 : :
2261 : 23 : struct CreateSubscriberOptions opt = {0};
2262 : :
2263 : : int c;
2264 : : int option_index;
2265 : :
2266 : : char *pub_base_conninfo;
2267 : : char *sub_base_conninfo;
2268 : 23 : char *dbname_conninfo = NULL;
2269 : :
2270 : : uint64 pub_sysid;
2271 : : uint64 sub_sysid;
2272 : : struct stat statbuf;
2273 : :
2274 : : char *consistent_lsn;
2275 : :
2276 : : char pidfile[MAXPGPATH];
2277 : :
2278 : 23 : pg_logging_init(argv[0]);
2279 : 23 : pg_logging_set_level(PG_LOG_WARNING);
2280 : 23 : progname = get_progname(argv[0]);
666 alvherre@alvh.no-ip. 2281 : 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2282 : :
796 peter@eisentraut.org 2283 [ + + ]: 23 : if (argc > 1)
2284 : : {
2285 [ + + - + ]: 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2286 : : {
2287 : 1 : usage();
2288 : 1 : exit(0);
2289 : : }
2290 [ + - ]: 21 : else if (strcmp(argv[1], "-V") == 0
2291 [ + + ]: 21 : || strcmp(argv[1], "--version") == 0)
2292 : : {
2293 : 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2294 : 1 : exit(0);
2295 : : }
2296 : : }
2297 : :
2298 : : /* Default settings */
2299 : 21 : subscriber_dir = NULL;
2300 : 21 : opt.config_file = NULL;
66 akapila@postgresql.o 2301 :GNC 21 : opt.log_dir = NULL;
796 peter@eisentraut.org 2302 :CBC 21 : opt.pub_conninfo_str = NULL;
2303 : 21 : opt.socket_dir = NULL;
2304 : 21 : opt.sub_port = DEFAULT_SUB_PORT;
2305 : 21 : opt.sub_username = NULL;
458 akapila@postgresql.o 2306 : 21 : opt.two_phase = false;
796 peter@eisentraut.org 2307 : 21 : opt.database_names = (SimpleStringList)
2308 : : {
2309 : : 0
2310 : : };
2311 : 21 : opt.recovery_timeout = 0;
428 akapila@postgresql.o 2312 : 21 : opt.all_dbs = false;
2313 : :
2314 : : /*
2315 : : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2316 : : * it either.
2317 : : */
2318 : : #ifndef WIN32
796 peter@eisentraut.org 2319 [ - + ]: 21 : if (geteuid() == 0)
2320 : : {
47 peter@eisentraut.org 2321 :UBC 0 : pg_log_error("cannot be executed by \"root\"");
2322 : 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2323 : : progname);
796 2324 : 0 : exit(1);
2325 : : }
2326 : : #endif
2327 : :
796 peter@eisentraut.org 2328 :CBC 21 : get_restricted_token();
2329 : :
66 akapila@postgresql.o 2330 :GNC 163 : while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
796 peter@eisentraut.org 2331 [ + + ]:CBC 163 : long_options, &option_index)) != -1)
2332 : : {
2333 : 145 : switch (c)
[ + + + +
+ + + + +
- + - + +
+ + + ]
[ + + + +
+ + + + +
+ - + - +
+ + + + ]
2334 : : {
428 akapila@postgresql.o 2335 : 3 : case 'a':
2336 : 3 : opt.all_dbs = true;
2337 : 3 : break;
796 peter@eisentraut.org 2338 : 25 : case 'd':
2339 [ + + ]: 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2340 : : {
2341 : 24 : simple_string_list_append(&opt.database_names, optarg);
2342 : 24 : num_dbs++;
2343 : : }
2344 : : else
47 2345 : 1 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
796 2346 : 24 : break;
2347 : 19 : case 'D':
2348 : 19 : subscriber_dir = pg_strdup(optarg);
2349 : 19 : canonicalize_path(subscriber_dir);
2350 : 19 : break;
66 akapila@postgresql.o 2351 :GNC 1 : case 'l':
2352 : 1 : opt.log_dir = pg_strdup(optarg);
2353 : 1 : canonicalize_path(opt.log_dir);
2354 : 1 : break;
796 peter@eisentraut.org 2355 :CBC 9 : case 'n':
2356 : 9 : dry_run = true;
2357 : 9 : break;
2358 : 12 : case 'p':
2359 : 12 : opt.sub_port = pg_strdup(optarg);
2360 : 12 : break;
2361 : 18 : case 'P':
2362 : 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2363 : 18 : break;
2364 : 12 : case 's':
2365 : 12 : opt.socket_dir = pg_strdup(optarg);
2366 : 12 : canonicalize_path(opt.socket_dir);
2367 : 12 : break;
2368 : 3 : case 't':
2369 : 3 : opt.recovery_timeout = atoi(optarg);
2370 : 3 : break;
458 akapila@postgresql.o 2371 : 1 : case 'T':
2372 : 1 : opt.two_phase = true;
2373 : 1 : break;
796 peter@eisentraut.org 2374 :UBC 0 : case 'U':
2375 : 0 : opt.sub_username = pg_strdup(optarg);
2376 : 0 : break;
796 peter@eisentraut.org 2377 :CBC 19 : case 'v':
2378 : 19 : pg_logging_increase_verbosity();
2379 : 19 : break;
796 peter@eisentraut.org 2380 :UBC 0 : case 1:
2381 : 0 : opt.config_file = pg_strdup(optarg);
2382 : 0 : break;
796 peter@eisentraut.org 2383 :CBC 12 : case 2:
2384 [ + + ]: 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2385 : : {
2386 : 11 : simple_string_list_append(&opt.pub_names, optarg);
2387 : 11 : num_pubs++;
2388 : : }
2389 : : else
47 2390 : 1 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
796 2391 : 11 : break;
2392 : 4 : case 3:
2393 [ + - ]: 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2394 : : {
2395 : 4 : simple_string_list_append(&opt.replslot_names, optarg);
2396 : 4 : num_replslots++;
2397 : : }
2398 : : else
47 peter@eisentraut.org 2399 :UBC 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
796 peter@eisentraut.org 2400 :CBC 4 : break;
2401 : 5 : case 4:
2402 [ + - ]: 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2403 : : {
2404 : 5 : simple_string_list_append(&opt.sub_names, optarg);
2405 : 5 : num_subs++;
2406 : : }
2407 : : else
47 peter@eisentraut.org 2408 :UBC 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
796 peter@eisentraut.org 2409 :CBC 5 : break;
339 2410 : 1 : case 5:
2411 [ + - ]: 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2412 : 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2413 : : else
47 peter@eisentraut.org 2414 :UBC 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
339 peter@eisentraut.org 2415 :CBC 1 : break;
796 2416 : 1 : default:
2417 : : /* getopt_long already emitted a complaint */
47 2418 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
796 2419 : 1 : exit(1);
2420 : : }
2421 : : }
2422 : :
2423 : : /* Validate that --all is not used with incompatible options */
428 akapila@postgresql.o 2424 [ + + ]: 18 : if (opt.all_dbs)
2425 : : {
2426 : 3 : char *bad_switch = NULL;
2427 : :
2428 [ + + ]: 3 : if (num_dbs > 0)
2429 : 1 : bad_switch = "--database";
2430 [ + + ]: 2 : else if (num_pubs > 0)
2431 : 1 : bad_switch = "--publication";
2432 [ - + ]: 1 : else if (num_replslots > 0)
428 akapila@postgresql.o 2433 :UBC 0 : bad_switch = "--replication-slot";
428 akapila@postgresql.o 2434 [ - + ]:CBC 1 : else if (num_subs > 0)
428 akapila@postgresql.o 2435 :UBC 0 : bad_switch = "--subscription";
2436 : :
428 akapila@postgresql.o 2437 [ + + ]:CBC 3 : if (bad_switch)
2438 : : {
47 peter@eisentraut.org 2439 : 2 : pg_log_error("options %s and %s cannot be used together",
2440 : : bad_switch, "-a/--all");
2441 : 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
428 akapila@postgresql.o 2442 : 2 : exit(1);
2443 : : }
2444 : : }
2445 : :
2446 : : /* Any non-option arguments? */
796 peter@eisentraut.org 2447 [ - + ]: 16 : if (optind < argc)
2448 : : {
47 peter@eisentraut.org 2449 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2450 : : argv[optind]);
2451 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
796 2452 : 0 : exit(1);
2453 : : }
2454 : :
2455 : : /* Required arguments */
796 peter@eisentraut.org 2456 [ + + ]:CBC 16 : if (subscriber_dir == NULL)
2457 : : {
47 2458 : 1 : pg_log_error("no subscriber data directory specified");
2459 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
796 2460 : 1 : exit(1);
2461 : : }
2462 : :
2463 : : /* If socket directory is not provided, use the current directory */
2464 [ + + ]: 15 : if (opt.socket_dir == NULL)
2465 : : {
2466 : : char cwd[MAXPGPATH];
2467 : :
2468 [ - + ]: 5 : if (!getcwd(cwd, MAXPGPATH))
47 peter@eisentraut.org 2469 :UBC 0 : pg_fatal("could not determine current directory");
796 peter@eisentraut.org 2470 :CBC 5 : opt.socket_dir = pg_strdup(cwd);
2471 : 5 : canonicalize_path(opt.socket_dir);
2472 : : }
2473 : :
2474 : : /*
2475 : : * Parse connection string. Build a base connection string that might be
2476 : : * reused by multiple databases.
2477 : : */
2478 [ + + ]: 15 : if (opt.pub_conninfo_str == NULL)
2479 : : {
2480 : : /*
2481 : : * TODO use primary_conninfo (if available) from subscriber and
2482 : : * extract publisher connection string. Assume that there are
2483 : : * identical entries for physical and logical replication. If there is
2484 : : * not, we would fail anyway.
2485 : : */
47 2486 : 1 : pg_log_error("no publisher connection string specified");
2487 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
796 2488 : 1 : exit(1);
2489 : : }
2490 : :
66 akapila@postgresql.o 2491 [ + + ]:GNC 14 : if (opt.log_dir != NULL)
2492 : : {
2493 : : char *internal_log_file;
2494 : : FILE *internal_log_file_fp;
2495 : :
2496 : 1 : umask(PG_MODE_MASK_OWNER);
2497 : :
2498 : : /*
2499 : : * Set mask based on PGDATA permissions, needed for the creation of
2500 : : * the output directories with correct permissions, similar with
2501 : : * pg_ctl and pg_upgrade.
2502 : : *
2503 : : * Don't error here if the data directory cannot be stat'd. Upcoming
2504 : : * checks for the data directory would raise the fatal error later.
2505 : : */
2506 [ + - ]: 1 : if (GetDataDirectoryCreatePerm(subscriber_dir))
2507 : 1 : umask(pg_mode_mask);
2508 : :
2509 : 1 : make_output_dirs(opt.log_dir);
2510 : 1 : internal_log_file = psprintf("%s/%s", logdir, INTERNAL_LOG_FILE_NAME);
2511 : :
47 peter@eisentraut.org 2512 : 1 : internal_log_file_fp = fopen(internal_log_file, "a");
2513 [ - + ]: 1 : if (!internal_log_file_fp)
47 peter@eisentraut.org 2514 :UNC 0 : pg_fatal("could not open log file \"%s\": %m", internal_log_file);
2515 : :
66 akapila@postgresql.o 2516 :GNC 1 : pg_free(internal_log_file);
2517 : :
47 peter@eisentraut.org 2518 : 1 : pg_logging_set_logfile(internal_log_file_fp);
2519 : : }
2520 : :
193 alvherre@kurilemu.de 2521 [ + + ]: 14 : if (dry_run)
47 peter@eisentraut.org 2522 : 8 : pg_log_info("Executing in dry-run mode.\n"
2523 : : "The target directory will not be modified.");
2524 : :
47 peter@eisentraut.org 2525 :CBC 14 : pg_log_info("validating publisher connection string");
796 2526 : 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2527 : : &dbname_conninfo);
2528 [ - + ]: 14 : if (pub_base_conninfo == NULL)
796 peter@eisentraut.org 2529 :UBC 0 : exit(1);
2530 : :
47 peter@eisentraut.org 2531 :CBC 14 : pg_log_info("validating subscriber connection string");
796 2532 : 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2533 : :
2534 : : /*
2535 : : * Fetch all databases from the source (publisher) and treat them as if
2536 : : * the user specified has multiple --database options, one for each source
2537 : : * database.
2538 : : */
428 akapila@postgresql.o 2539 [ + + ]: 14 : if (opt.all_dbs)
2540 : : {
2541 : 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2542 : :
2543 : 1 : get_publisher_databases(&opt, dbnamespecified);
2544 : : }
2545 : :
796 peter@eisentraut.org 2546 [ + + ]: 14 : if (opt.database_names.head == NULL)
2547 : : {
47 2548 : 2 : pg_log_info("no database was specified");
2549 : :
2550 : : /*
2551 : : * Try to obtain the dbname from the publisher conninfo. If dbname
2552 : : * parameter is not available, error out.
2553 : : */
796 2554 [ + + ]: 2 : if (dbname_conninfo)
2555 : : {
2556 : 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2557 : 1 : num_dbs++;
2558 : :
47 2559 : 1 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2560 : : dbname_conninfo);
2561 : : }
2562 : : else
2563 : : {
2564 : 1 : pg_log_error("no database name specified");
2565 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.",
2566 : : progname);
796 2567 : 1 : exit(1);
2568 : : }
2569 : : }
2570 : :
2571 : : /* Number of object names must match number of databases */
2572 [ + + + + ]: 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2573 : : {
47 2574 : 1 : pg_log_error("wrong number of publication names specified");
2575 : 1 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2576 : : num_pubs, num_dbs);
796 2577 : 1 : exit(1);
2578 : : }
2579 [ + + + + ]: 12 : if (num_subs > 0 && num_subs != num_dbs)
2580 : : {
47 2581 : 1 : pg_log_error("wrong number of subscription names specified");
2582 : 1 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2583 : : num_subs, num_dbs);
796 2584 : 1 : exit(1);
2585 : : }
2586 [ + + + + ]: 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2587 : : {
47 2588 : 1 : pg_log_error("wrong number of replication slot names specified");
2589 : 1 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2590 : : num_replslots, num_dbs);
796 2591 : 1 : exit(1);
2592 : : }
2593 : :
2594 : : /* Verify the object types specified for removal from the subscriber */
339 2595 [ + + ]: 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2596 : : {
436 akapila@postgresql.o 2597 [ + - ]: 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
339 peter@eisentraut.org 2598 : 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2599 : : else
2600 : : {
23 peter@eisentraut.org 2601 :UNC 0 : pg_log_error("invalid object type \"%s\" specified for option %s",
2602 : : cell->val, "--clean");
47 peter@eisentraut.org 2603 :UBC 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
436 akapila@postgresql.o 2604 : 0 : exit(1);
2605 : : }
2606 : : }
2607 : :
2608 : : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
796 peter@eisentraut.org 2609 :CBC 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2610 : 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2611 : :
2612 : : /* Rudimentary check for a data directory */
2613 : 10 : check_data_directory(subscriber_dir);
2614 : :
458 akapila@postgresql.o 2615 : 10 : dbinfos.two_phase = opt.two_phase;
2616 : :
2617 : : /*
2618 : : * Store database information for publisher and subscriber. It should be
2619 : : * called before atexit() because its return is used in the
2620 : : * cleanup_objects_atexit().
2621 : : */
2622 : 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2623 : :
2624 : : /* Register a function to clean up objects in case of failure */
796 peter@eisentraut.org 2625 : 10 : atexit(cleanup_objects_atexit);
2626 : :
2627 : : /*
2628 : : * Check if the subscriber data directory has the same system identifier
2629 : : * than the publisher data directory.
2630 : : */
458 akapila@postgresql.o 2631 : 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
796 peter@eisentraut.org 2632 : 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2633 [ + + ]: 10 : if (pub_sysid != sub_sysid)
47 2634 : 1 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2635 : :
2636 : : /* Subscriber PID file */
796 2637 : 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2638 : :
2639 : : /*
2640 : : * The standby server must not be running. If the server is started under
2641 : : * service manager and pg_createsubscriber stops it, the service manager
2642 : : * might react to this action and start the server again. Therefore,
2643 : : * refuse to proceed if the server is running to avoid possible failures.
2644 : : */
2645 [ + + ]: 9 : if (stat(pidfile, &statbuf) == 0)
2646 : : {
47 2647 : 1 : pg_log_error("standby server is running");
2648 : 1 : pg_log_error_hint("Stop the standby server and try again.");
796 2649 : 1 : exit(1);
2650 : : }
2651 : :
2652 : : /*
2653 : : * Start a short-lived standby server with temporary parameters (provided
2654 : : * by command-line options). The goal is to avoid connections during the
2655 : : * transformation steps.
2656 : : */
47 2657 : 8 : pg_log_info("starting the standby server with command-line options");
697 akapila@postgresql.o 2658 : 8 : start_standby_server(&opt, true, false);
2659 : :
2660 : : /* Check if the standby server is ready for logical replication */
458 2661 : 8 : check_subscriber(dbinfos.dbinfo);
2662 : :
2663 : : /* Check if the primary server is ready for logical replication */
2664 : 6 : check_publisher(dbinfos.dbinfo);
2665 : :
2666 : : /*
2667 : : * Stop the target server. The recovery process requires that the server
2668 : : * reaches a consistent state before targeting the recovery stop point.
2669 : : * Make sure a consistent state is reached (stop the target server
2670 : : * guarantees it) *before* creating the replication slots in
2671 : : * setup_publisher().
2672 : : */
47 peter@eisentraut.org 2673 : 4 : pg_log_info("stopping the subscriber");
796 2674 : 4 : stop_standby_server(subscriber_dir);
2675 : :
2676 : : /* Create the required objects for each database on publisher */
458 akapila@postgresql.o 2677 : 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2678 : :
2679 : : /* Write the required recovery parameters */
2680 : 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2681 : :
2682 : : /*
2683 : : * Start subscriber so the recovery parameters will take effect. Wait
2684 : : * until accepting connections. We don't want to start logical replication
2685 : : * during setup.
2686 : : */
47 peter@eisentraut.org 2687 : 4 : pg_log_info("starting the subscriber");
697 akapila@postgresql.o 2688 : 4 : start_standby_server(&opt, true, true);
2689 : :
2690 : : /* Waiting the subscriber to be promoted */
458 2691 : 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2692 : :
2693 : : /*
2694 : : * Create the subscription for each database on subscriber. It does not
2695 : : * enable it immediately because it needs to adjust the replication start
2696 : : * point to the LSN reported by setup_publisher(). It also cleans up
2697 : : * publications created by this tool and replication to the standby.
2698 : : */
2699 : 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2700 : :
2701 : : /* Remove primary_slot_name if it exists on primary */
2702 : 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2703 : :
2704 : : /* Remove failover replication slots if they exist on subscriber */
2705 : 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2706 : :
2707 : : /* Stop the subscriber */
47 peter@eisentraut.org 2708 : 4 : pg_log_info("stopping the subscriber");
796 2709 : 4 : stop_standby_server(subscriber_dir);
2710 : :
2711 : : /* Change system identifier from subscriber */
2712 : 4 : modify_subscriber_sysid(&opt);
2713 : :
2714 : 4 : success = true;
2715 : :
47 2716 : 4 : pg_log_info("Done!");
2717 : :
796 2718 : 4 : return 0;
2719 : : }
|