Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : : * fashion and write it to a local file.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_recvlogical.c
10 : : *-------------------------------------------------------------------------
11 : : */
12 : :
13 : : #include "postgres_fe.h"
14 : :
15 : : #include <dirent.h>
16 : : #include <limits.h>
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "common/file_perm.h"
22 : : #include "common/logging.h"
23 : : #include "fe_utils/option_utils.h"
24 : : #include "getopt_long.h"
25 : : #include "libpq-fe.h"
26 : : #include "libpq/pqsignal.h"
27 : : #include "libpq/protocol.h"
28 : : #include "pqexpbuffer.h"
29 : : #include "streamutil.h"
30 : :
31 : : /* Time to sleep between reconnection attempts */
32 : : #define RECONNECT_SLEEP_TIME 5
33 : :
34 : : typedef enum
35 : : {
36 : : STREAM_STOP_NONE,
37 : : STREAM_STOP_END_OF_WAL,
38 : : STREAM_STOP_KEEPALIVE,
39 : : STREAM_STOP_SIGNAL
40 : : } StreamStopReason;
41 : :
42 : : /* Global Options */
43 : : static char *outfile = NULL;
44 : : static int verbose = 0;
45 : : static bool two_phase = false; /* enable-two-phase option */
46 : : static bool failover = false; /* enable-failover option */
47 : : static int noloop = 0;
48 : : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : : static bool do_create_slot = false;
53 : : static bool slot_exists_ok = false;
54 : : static bool do_start_slot = false;
55 : : static bool do_drop_slot = false;
56 : : static char *replication_slot = NULL;
57 : :
58 : : /* filled pairwise with option, value. value may be NULL */
59 : : static char **options;
60 : : static size_t noptions = 0;
61 : : static const char *plugin = "test_decoding";
62 : :
63 : : /* Global State */
64 : : static int outfd = -1;
65 : : static volatile sig_atomic_t time_to_abort = false;
66 : : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : : static volatile sig_atomic_t output_reopen = false;
68 : : static bool output_isfile;
69 : : static TimestampTz output_last_fsync = -1;
70 : : static bool output_needs_fsync = false;
71 : : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 : :
74 : : static void usage(void);
75 : : static void StreamLogicalLog(void);
76 : : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : : StreamStopReason reason,
79 : : XLogRecPtr lsn);
80 : :
81 : : static void
4190 rhaas@postgresql.org 82 :CBC 1 : usage(void)
83 : : {
3823 bruce@momjian.us 84 : 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : : progname);
4190 rhaas@postgresql.org 86 : 1 : printf(_("Usage:\n"));
87 : 1 : printf(_(" %s [OPTION]...\n"), progname);
3982 peter_e@gmx.net 88 : 1 : printf(_("\nAction to be performed:\n"));
89 : 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 : 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 : 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
4190 rhaas@postgresql.org 92 : 1 : printf(_("\nOptions:\n"));
69 peter@eisentraut.org 93 : 1 : printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 : : " creating a replication slot\n"));
95 : 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
3982 peter_e@gmx.net 96 : 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
4122 andres@anarazel.de 97 : 1 : printf(_(" -F --fsync-interval=SECS\n"
98 : : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
3643 peter_e@gmx.net 99 : 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
3982 100 : 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
4190 rhaas@postgresql.org 101 : 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
3982 peter_e@gmx.net 102 : 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
103 : : " pass option NAME with optional value VALUE to the\n"
104 : : " output plugin\n"));
105 : 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 : 1 : printf(_(" -s, --status-interval=SECS\n"
107 : : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 : 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
69 peter@eisentraut.org 109 : 1 : printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 : 1 : printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
4190 rhaas@postgresql.org 111 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
112 : 1 : printf(_(" -V, --version output version information, then exit\n"));
113 : 1 : printf(_(" -?, --help show this help, then exit\n"));
114 : 1 : printf(_("\nConnection options:\n"));
115 : 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 : 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 : 1 : printf(_(" -p, --port=PORT database server port number\n"));
118 : 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
119 : 1 : printf(_(" -w, --no-password never prompt for password\n"));
120 : 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
2017 peter@eisentraut.org 121 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
4190 rhaas@postgresql.org 123 : 1 : }
124 : :
125 : : /*
126 : : * Send a Standby Status Update message to server.
127 : : */
128 : : static bool
3117 tgl@sss.pgh.pa.us 129 : 11 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
130 : : {
131 : : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
132 : : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
133 : :
134 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
4190 rhaas@postgresql.org 135 : 11 : int len = 0;
136 : :
137 : : /*
138 : : * we normally don't want to send superfluous feedback, but if it's
139 : : * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 : : * us.
141 : : */
142 [ - + ]: 11 : if (!force &&
4190 rhaas@postgresql.org 143 [ # # ]:UBC 0 : last_written_lsn == output_written_lsn &&
1942 noah@leadboat.com 144 [ # # ]: 0 : last_fsync_lsn == output_fsync_lsn)
4190 rhaas@postgresql.org 145 : 0 : return true;
146 : :
4190 rhaas@postgresql.org 147 [ - + ]:CBC 11 : if (verbose)
61 alvherre@kurilemu.de 148 :UNC 0 : pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
149 : : LSN_FORMAT_ARGS(output_written_lsn),
150 : : LSN_FORMAT_ARGS(output_fsync_lsn),
151 : : replication_slot);
152 : :
31 nathan@postgresql.or 153 :GNC 11 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
4190 rhaas@postgresql.org 154 :CBC 11 : len += 1;
4141 bruce@momjian.us 155 : 11 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
4190 rhaas@postgresql.org 156 : 11 : len += 8;
2999 tgl@sss.pgh.pa.us 157 : 11 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
4190 rhaas@postgresql.org 158 : 11 : len += 8;
4141 bruce@momjian.us 159 : 11 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4190 rhaas@postgresql.org 160 : 11 : len += 8;
4141 bruce@momjian.us 161 : 11 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4190 rhaas@postgresql.org 162 : 11 : len += 8;
2999 tgl@sss.pgh.pa.us 163 : 11 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4190 rhaas@postgresql.org 164 : 11 : len += 1;
165 : :
166 : 11 : startpos = output_written_lsn;
167 : 11 : last_written_lsn = output_written_lsn;
168 : 11 : last_fsync_lsn = output_fsync_lsn;
169 : :
170 [ + - - + ]: 11 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 : : {
2350 peter@eisentraut.org 172 :UBC 0 : pg_log_error("could not send feedback packet: %s",
173 : : PQerrorMessage(conn));
4190 rhaas@postgresql.org 174 : 0 : return false;
175 : : }
176 : :
4190 rhaas@postgresql.org 177 :CBC 11 : return true;
178 : : }
179 : :
180 : : static void
2443 peter@eisentraut.org 181 : 11 : disconnect_atexit(void)
182 : : {
4190 rhaas@postgresql.org 183 [ + + ]: 11 : if (conn != NULL)
184 : 5 : PQfinish(conn);
185 : 11 : }
186 : :
187 : : static bool
3117 tgl@sss.pgh.pa.us 188 : 11 : OutputFsync(TimestampTz now)
189 : : {
4190 rhaas@postgresql.org 190 : 11 : output_last_fsync = now;
191 : :
192 : 11 : output_fsync_lsn = output_written_lsn;
193 : :
194 [ - + ]: 11 : if (fsync_interval <= 0)
4190 rhaas@postgresql.org 195 :UBC 0 : return true;
196 : :
4132 heikki.linnakangas@i 197 [ + + ]:CBC 11 : if (!output_needs_fsync)
4190 rhaas@postgresql.org 198 : 7 : return true;
199 : :
4132 heikki.linnakangas@i 200 : 4 : output_needs_fsync = false;
201 : :
202 : : /* can only fsync if it's a regular file */
3714 andres@anarazel.de 203 [ + + ]: 4 : if (!output_isfile)
204 : 2 : return true;
205 : :
206 [ - + ]: 2 : if (fsync(outfd) != 0)
1247 tgl@sss.pgh.pa.us 207 :UBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
208 : :
4190 rhaas@postgresql.org 209 :CBC 2 : return true;
210 : : }
211 : :
212 : : /*
213 : : * Start the log streaming
214 : : */
215 : : static void
3995 andres@anarazel.de 216 : 6 : StreamLogicalLog(void)
217 : : {
218 : : PGresult *res;
4190 rhaas@postgresql.org 219 : 6 : char *copybuf = NULL;
3117 tgl@sss.pgh.pa.us 220 : 6 : TimestampTz last_status = -1;
221 : : int i;
222 : : PQExpBuffer query;
223 : : XLogRecPtr cur_record_lsn;
224 : :
4190 rhaas@postgresql.org 225 : 6 : output_written_lsn = InvalidXLogRecPtr;
226 : 6 : output_fsync_lsn = InvalidXLogRecPtr;
779 michael@paquier.xyz 227 : 6 : cur_record_lsn = InvalidXLogRecPtr;
228 : :
229 : : /*
230 : : * Connect in replication mode to the server
231 : : */
4190 rhaas@postgresql.org 232 [ - + ]: 6 : if (!conn)
4190 rhaas@postgresql.org 233 :UBC 0 : conn = GetConnection();
4190 rhaas@postgresql.org 234 [ - + ]:CBC 6 : if (!conn)
235 : : /* Error message already written in GetConnection() */
4190 rhaas@postgresql.org 236 :UBC 0 : return;
237 : :
238 : : /*
239 : : * Start the replication
240 : : */
4190 rhaas@postgresql.org 241 [ - + ]:CBC 6 : if (verbose)
61 alvherre@kurilemu.de 242 :UNC 0 : pg_log_info("starting log streaming at %X/%08X (slot %s)",
243 : : LSN_FORMAT_ARGS(startpos),
244 : : replication_slot);
245 : :
246 : : /* Initiate the replication stream at specified location */
1385 tgl@sss.pgh.pa.us 247 :CBC 6 : query = createPQExpBuffer();
61 alvherre@kurilemu.de 248 :GNC 6 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
1656 peter@eisentraut.org 249 :CBC 6 : replication_slot, LSN_FORMAT_ARGS(startpos));
250 : :
251 : : /* print options if there are any */
4190 rhaas@postgresql.org 252 [ + + ]: 6 : if (noptions)
253 : 3 : appendPQExpBufferStr(query, " (");
254 : :
255 [ + + ]: 12 : for (i = 0; i < noptions; i++)
256 : : {
257 : : /* separator */
258 [ + + ]: 6 : if (i > 0)
259 : 3 : appendPQExpBufferStr(query, ", ");
260 : :
261 : : /* write option name */
262 : 6 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
263 : :
264 : : /* write option value if specified */
265 [ + - ]: 6 : if (options[(i * 2) + 1] != NULL)
266 : 6 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
267 : : }
268 : :
269 [ + + ]: 6 : if (noptions)
270 : 3 : appendPQExpBufferChar(query, ')');
271 : :
272 : 6 : res = PQexec(conn, query->data);
273 [ - + ]: 6 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
274 : : {
2350 peter@eisentraut.org 275 :UBC 0 : pg_log_error("could not send replication command \"%s\": %s",
276 : : query->data, PQresultErrorMessage(res));
4190 rhaas@postgresql.org 277 : 0 : PQclear(res);
278 : 0 : goto error;
279 : : }
4190 rhaas@postgresql.org 280 :CBC 6 : PQclear(res);
281 : 6 : resetPQExpBuffer(query);
282 : :
283 [ - + ]: 6 : if (verbose)
2350 peter@eisentraut.org 284 :UBC 0 : pg_log_info("streaming initiated");
285 : :
4190 rhaas@postgresql.org 286 [ + + ]:CBC 68 : while (!time_to_abort)
287 : : {
288 : : int r;
289 : : int bytes_left;
290 : : int bytes_written;
291 : : TimestampTz now;
292 : : int hdr_len;
293 : :
779 michael@paquier.xyz 294 : 67 : cur_record_lsn = InvalidXLogRecPtr;
295 : :
4190 rhaas@postgresql.org 296 [ + + ]: 67 : if (copybuf != NULL)
297 : : {
298 : 35 : PQfreemem(copybuf);
299 : 35 : copybuf = NULL;
300 : : }
301 : :
302 : : /*
303 : : * Potentially send a status message to the primary.
304 : : */
305 : 67 : now = feGetCurrentTimestamp();
306 : :
307 [ + + + + ]: 128 : if (outfd != -1 &&
308 : 61 : feTimestampDifferenceExceeds(output_last_fsync, now,
309 : : fsync_interval))
310 : : {
311 [ - + ]: 6 : if (!OutputFsync(now))
4190 rhaas@postgresql.org 312 :UBC 0 : goto error;
313 : : }
314 : :
4190 rhaas@postgresql.org 315 [ + - + + ]:CBC 134 : if (standby_message_timeout > 0 &&
316 : 67 : feTimestampDifferenceExceeds(last_status, now,
317 : : standby_message_timeout))
318 : : {
319 : : /* Time to send feedback! */
320 [ - + ]: 6 : if (!sendFeedback(conn, now, true, false))
4190 rhaas@postgresql.org 321 :UBC 0 : goto error;
322 : :
4190 rhaas@postgresql.org 323 :CBC 6 : last_status = now;
324 : : }
325 : :
326 : : /* got SIGHUP, close output file */
4132 heikki.linnakangas@i 327 [ + + - + : 67 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
- - ]
328 : : {
4132 heikki.linnakangas@i 329 :UBC 0 : now = feGetCurrentTimestamp();
330 [ # # ]: 0 : if (!OutputFsync(now))
331 : 0 : goto error;
332 : 0 : close(outfd);
333 : 0 : outfd = -1;
334 : : }
4132 heikki.linnakangas@i 335 :CBC 67 : output_reopen = false;
336 : :
337 : : /* open the output file, if not open yet */
4131 338 [ + + ]: 67 : if (outfd == -1)
339 : : {
340 : : struct stat statbuf;
341 : :
342 [ + - ]: 6 : if (strcmp(outfile, "-") == 0)
343 : 6 : outfd = fileno(stdout);
344 : : else
4131 heikki.linnakangas@i 345 :UBC 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
346 : : S_IRUSR | S_IWUSR);
4131 heikki.linnakangas@i 347 [ - + ]:CBC 6 : if (outfd == -1)
348 : : {
2350 peter@eisentraut.org 349 :UBC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
4131 heikki.linnakangas@i 350 : 0 : goto error;
351 : : }
352 : :
3714 andres@anarazel.de 353 [ - + ]:CBC 6 : if (fstat(outfd, &statbuf) != 0)
354 : : {
2350 peter@eisentraut.org 355 :UBC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
1500 michael@paquier.xyz 356 : 0 : goto error;
357 : : }
358 : :
3714 andres@anarazel.de 359 [ + + + - ]:CBC 6 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
360 : : }
361 : :
4190 rhaas@postgresql.org 362 : 67 : r = PQgetCopyData(conn, ©buf, 1);
363 [ + + ]: 67 : if (r == 0)
364 : 26 : {
365 : : /*
366 : : * In async mode, and no data available. We block on reading but
367 : : * not more than the specified timeout, so that we can send a
368 : : * response back to the client.
369 : : */
370 : : fd_set input_mask;
3117 tgl@sss.pgh.pa.us 371 : 27 : TimestampTz message_target = 0;
372 : 27 : TimestampTz fsync_target = 0;
373 : : struct timeval timeout;
4190 rhaas@postgresql.org 374 : 27 : struct timeval *timeoutptr = NULL;
375 : :
3469 peter_e@gmx.net 376 [ - + ]: 27 : if (PQsocket(conn) < 0)
377 : : {
2350 peter@eisentraut.org 378 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
3469 peter_e@gmx.net 379 : 0 : goto error;
380 : : }
381 : :
4190 rhaas@postgresql.org 382 [ + + ]:CBC 459 : FD_ZERO(&input_mask);
383 : 27 : FD_SET(PQsocket(conn), &input_mask);
384 : :
385 : : /* Compute when we need to wakeup to send a keepalive message. */
386 [ + - ]: 27 : if (standby_message_timeout)
387 : 27 : message_target = last_status + (standby_message_timeout - 1) *
388 : : ((int64) 1000);
389 : :
390 : : /* Compute when we need to wakeup to fsync the output file. */
4132 heikki.linnakangas@i 391 [ + - + + ]: 27 : if (fsync_interval > 0 && output_needs_fsync)
4190 rhaas@postgresql.org 392 : 19 : fsync_target = output_last_fsync + (fsync_interval - 1) *
393 : : ((int64) 1000);
394 : :
395 : : /* Now compute when to wakeup. */
396 [ - + - - ]: 27 : if (message_target > 0 || fsync_target > 0)
397 : : {
398 : : TimestampTz targettime;
399 : : long secs;
400 : : int usecs;
401 : :
402 : 27 : targettime = message_target;
403 : :
404 [ + + - + ]: 27 : if (fsync_target > 0 && fsync_target < targettime)
4190 rhaas@postgresql.org 405 :UBC 0 : targettime = fsync_target;
406 : :
4190 rhaas@postgresql.org 407 :CBC 27 : feTimestampDifference(now,
408 : : targettime,
409 : : &secs,
410 : : &usecs);
411 [ - + ]: 27 : if (secs <= 0)
4190 rhaas@postgresql.org 412 :UBC 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
413 : : else
4190 rhaas@postgresql.org 414 :CBC 27 : timeout.tv_sec = secs;
415 : 27 : timeout.tv_usec = usecs;
416 : 27 : timeoutptr = &timeout;
417 : : }
418 : :
419 : 27 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
420 [ + - + + : 27 : if (r == 0 || (r < 0 && errno == EINTR))
+ - ]
421 : : {
422 : : /*
423 : : * Got a timeout or signal. Continue the loop and either
424 : : * deliver a status packet to the server or just go back into
425 : : * blocking.
426 : : */
427 : 27 : continue;
428 : : }
429 [ - + ]: 26 : else if (r < 0)
430 : : {
1597 peter@eisentraut.org 431 :UBC 0 : pg_log_error("%s() failed: %m", "select");
4190 rhaas@postgresql.org 432 : 0 : goto error;
433 : : }
434 : :
435 : : /* Else there is actually data on the socket */
4190 rhaas@postgresql.org 436 [ - + ]:CBC 26 : if (PQconsumeInput(conn) == 0)
437 : : {
2350 peter@eisentraut.org 438 :UBC 0 : pg_log_error("could not receive data from WAL stream: %s",
439 : : PQerrorMessage(conn));
4190 rhaas@postgresql.org 440 : 0 : goto error;
441 : : }
4190 rhaas@postgresql.org 442 :CBC 26 : continue;
443 : : }
444 : :
445 : : /* End of copy stream */
446 [ - + ]: 40 : if (r == -1)
447 : 5 : break;
448 : :
449 : : /* Failure while reading the copy stream */
450 [ - + ]: 40 : if (r == -2)
451 : : {
2350 peter@eisentraut.org 452 :UBC 0 : pg_log_error("could not read COPY data: %s",
453 : : PQerrorMessage(conn));
4190 rhaas@postgresql.org 454 : 0 : goto error;
455 : : }
456 : :
457 : : /* Check the message type. */
31 nathan@postgresql.or 458 [ + + ]:GNC 40 : if (copybuf[0] == PqReplMsg_Keepalive)
4190 rhaas@postgresql.org 459 :CBC 6 : {
460 : : int pos;
461 : : bool replyRequested;
462 : : XLogRecPtr walEnd;
3167 simon@2ndQuadrant.co 463 : 7 : bool endposReached = false;
464 : :
465 : : /*
466 : : * Parse the keepalive message, enclosed in the CopyData message.
467 : : * We just check if the server requested a reply, and ignore the
468 : : * rest.
469 : : */
31 nathan@postgresql.or 470 :GNC 7 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
4190 rhaas@postgresql.org 471 :CBC 7 : walEnd = fe_recvint64(©buf[pos]);
472 : 7 : output_written_lsn = Max(walEnd, output_written_lsn);
473 : :
474 : 7 : pos += 8; /* read walEnd */
475 : :
476 : 7 : pos += 8; /* skip sendTime */
477 : :
478 [ - + ]: 7 : if (r < pos + 1)
479 : : {
2350 peter@eisentraut.org 480 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4190 rhaas@postgresql.org 481 : 0 : goto error;
482 : : }
4190 rhaas@postgresql.org 483 :CBC 7 : replyRequested = copybuf[pos];
484 : :
3167 simon@2ndQuadrant.co 485 [ + + + + ]: 7 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
486 : : {
487 : : /*
488 : : * If there's nothing to read on the socket until a keepalive
489 : : * we know that the server has nothing to send us; and if
490 : : * walEnd has passed endpos, we know nothing else can have
491 : : * committed before endpos. So we can bail out now.
492 : : */
493 : 1 : endposReached = true;
494 : : }
495 : :
496 : : /* Send a reply, if necessary */
497 [ + - + + ]: 7 : if (replyRequested || endposReached)
498 : : {
499 [ - + ]: 1 : if (!flushAndSendFeedback(conn, &now))
4190 rhaas@postgresql.org 500 :UBC 0 : goto error;
4190 rhaas@postgresql.org 501 :CBC 1 : last_status = now;
502 : : }
503 : :
3167 simon@2ndQuadrant.co 504 [ + + ]: 7 : if (endposReached)
505 : : {
779 michael@paquier.xyz 506 : 1 : stop_reason = STREAM_STOP_KEEPALIVE;
3167 simon@2ndQuadrant.co 507 : 1 : time_to_abort = true;
508 : 1 : break;
509 : : }
510 : :
4190 rhaas@postgresql.org 511 : 6 : continue;
512 : : }
31 nathan@postgresql.or 513 [ - + ]:GNC 33 : else if (copybuf[0] != PqReplMsg_WALData)
514 : : {
2350 peter@eisentraut.org 515 :UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
516 : : copybuf[0]);
4190 rhaas@postgresql.org 517 : 0 : goto error;
518 : : }
519 : :
520 : : /*
521 : : * Read the header of the WALData message, enclosed in the CopyData
522 : : * message. We only need the WAL location field (dataStart), the rest
523 : : * of the header is ignored.
524 : : */
31 nathan@postgresql.or 525 :GNC 33 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
4190 rhaas@postgresql.org 526 :CBC 33 : hdr_len += 8; /* dataStart */
527 : 33 : hdr_len += 8; /* walEnd */
528 : 33 : hdr_len += 8; /* sendTime */
529 [ - + ]: 33 : if (r < hdr_len + 1)
530 : : {
2350 peter@eisentraut.org 531 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4190 rhaas@postgresql.org 532 : 0 : goto error;
533 : : }
534 : :
535 : : /* Extract WAL location for this block */
3167 simon@2ndQuadrant.co 536 :CBC 33 : cur_record_lsn = fe_recvint64(©buf[1]);
537 : :
538 [ + - - + ]: 33 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
539 : : {
540 : : /*
541 : : * We've read past our endpoint, so prepare to go away being
542 : : * cautious about what happens to our output data.
543 : : */
3167 simon@2ndQuadrant.co 544 [ # # ]:UBC 0 : if (!flushAndSendFeedback(conn, &now))
545 : 0 : goto error;
779 michael@paquier.xyz 546 : 0 : stop_reason = STREAM_STOP_END_OF_WAL;
3167 simon@2ndQuadrant.co 547 : 0 : time_to_abort = true;
548 : 0 : break;
549 : : }
550 : :
3167 simon@2ndQuadrant.co 551 :CBC 33 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
552 : :
4190 rhaas@postgresql.org 553 : 33 : bytes_left = r - hdr_len;
554 : 33 : bytes_written = 0;
555 : :
556 : : /* signal that a fsync is needed */
4132 heikki.linnakangas@i 557 : 33 : output_needs_fsync = true;
558 : :
4190 rhaas@postgresql.org 559 [ + + ]: 66 : while (bytes_left)
560 : : {
561 : : int ret;
562 : :
563 : 66 : ret = write(outfd,
564 : 33 : copybuf + hdr_len + bytes_written,
565 : : bytes_left);
566 : :
567 [ - + ]: 33 : if (ret < 0)
568 : : {
1389 peter@eisentraut.org 569 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
570 : : bytes_left, outfile);
4190 rhaas@postgresql.org 571 : 0 : goto error;
572 : : }
573 : :
574 : : /* Write was successful, advance our position */
4190 rhaas@postgresql.org 575 :CBC 33 : bytes_written += ret;
576 : 33 : bytes_left -= ret;
577 : : }
578 : :
579 [ - + ]: 33 : if (write(outfd, "\n", 1) != 1)
580 : : {
1389 peter@eisentraut.org 581 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
582 : : 1, outfile);
4190 rhaas@postgresql.org 583 : 0 : goto error;
584 : : }
585 : :
3167 simon@2ndQuadrant.co 586 [ + - + + ]:CBC 33 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
587 : : {
588 : : /* endpos was exactly the record we just processed, we're done */
589 [ - + ]: 4 : if (!flushAndSendFeedback(conn, &now))
3167 simon@2ndQuadrant.co 590 :UBC 0 : goto error;
779 michael@paquier.xyz 591 :CBC 4 : stop_reason = STREAM_STOP_END_OF_WAL;
3167 simon@2ndQuadrant.co 592 : 4 : time_to_abort = true;
593 : 4 : break;
594 : : }
595 : : }
596 : :
597 : : /* Clean up connection state if stream has been aborted */
779 michael@paquier.xyz 598 [ + - ]: 6 : if (time_to_abort)
599 : 6 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
600 : :
4190 rhaas@postgresql.org 601 : 6 : res = PQgetResult(conn);
3167 simon@2ndQuadrant.co 602 [ + - ]: 6 : if (PQresultStatus(res) == PGRES_COPY_OUT)
603 : : {
1942 noah@leadboat.com 604 : 6 : PQclear(res);
605 : :
606 : : /*
607 : : * We're doing a client-initiated clean exit and have sent CopyDone to
608 : : * the server. Drain any messages, so we don't miss a last-minute
609 : : * ErrorResponse. The walsender stops generating WALData records once
610 : : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
611 : : * it's too late for sendFeedback(), even if this were to take a long
612 : : * time. Hence, use synchronous-mode PQgetCopyData().
613 : : */
614 : : while (1)
615 : 4 : {
616 : : int r;
617 : :
618 [ + + ]: 10 : if (copybuf != NULL)
619 : : {
620 : 9 : PQfreemem(copybuf);
621 : 9 : copybuf = NULL;
622 : : }
623 : 10 : r = PQgetCopyData(conn, ©buf, 0);
624 [ + + ]: 10 : if (r == -1)
625 : 6 : break;
626 [ - + ]: 4 : if (r == -2)
627 : : {
1942 noah@leadboat.com 628 :UBC 0 : pg_log_error("could not read COPY data: %s",
629 : : PQerrorMessage(conn));
630 : 0 : time_to_abort = false; /* unclean exit */
631 : 0 : goto error;
632 : : }
633 : : }
634 : :
1942 noah@leadboat.com 635 :CBC 6 : res = PQgetResult(conn);
636 : : }
637 [ - + ]: 6 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
638 : : {
2350 peter@eisentraut.org 639 :UBC 0 : pg_log_error("unexpected termination of replication stream: %s",
640 : : PQresultErrorMessage(res));
165 dgustafsson@postgres 641 : 0 : PQclear(res);
4190 rhaas@postgresql.org 642 : 0 : goto error;
643 : : }
4190 rhaas@postgresql.org 644 :CBC 6 : PQclear(res);
645 : :
646 [ + - - + ]: 6 : if (outfd != -1 && strcmp(outfile, "-") != 0)
647 : : {
3117 tgl@sss.pgh.pa.us 648 :UBC 0 : TimestampTz t = feGetCurrentTimestamp();
649 : :
650 : : /* no need to jump to error on failure here, we're finishing anyway */
4190 rhaas@postgresql.org 651 : 0 : OutputFsync(t);
652 : :
653 [ # # ]: 0 : if (close(outfd) != 0)
2350 peter@eisentraut.org 654 : 0 : pg_log_error("could not close file \"%s\": %m", outfile);
655 : : }
4190 rhaas@postgresql.org 656 :CBC 6 : outfd = -1;
657 : 6 : error:
4142 heikki.linnakangas@i 658 [ - + ]: 6 : if (copybuf != NULL)
659 : : {
4142 heikki.linnakangas@i 660 :UBC 0 : PQfreemem(copybuf);
661 : 0 : copybuf = NULL;
662 : : }
4190 rhaas@postgresql.org 663 :CBC 6 : destroyPQExpBuffer(query);
664 : 6 : PQfinish(conn);
665 : 6 : conn = NULL;
666 : : }
667 : :
668 : : /*
669 : : * Unfortunately we can't do sensible signal handling on windows...
670 : : */
671 : : #ifndef WIN32
672 : :
673 : : /*
674 : : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
675 : : * possible moment.
676 : : */
677 : : static void
1088 tgl@sss.pgh.pa.us 678 : 1 : sigexit_handler(SIGNAL_ARGS)
679 : : {
779 michael@paquier.xyz 680 : 1 : stop_reason = STREAM_STOP_SIGNAL;
4190 rhaas@postgresql.org 681 : 1 : time_to_abort = true;
682 : 1 : }
683 : :
684 : : /*
685 : : * Trigger the output file to be reopened.
686 : : */
687 : : static void
1088 tgl@sss.pgh.pa.us 688 :UBC 0 : sighup_handler(SIGNAL_ARGS)
689 : : {
4190 rhaas@postgresql.org 690 : 0 : output_reopen = true;
691 : 0 : }
692 : : #endif
693 : :
694 : :
695 : : int
4190 rhaas@postgresql.org 696 :CBC 19 : main(int argc, char **argv)
697 : : {
698 : : static struct option long_options[] = {
699 : : /* general options */
700 : : {"file", required_argument, NULL, 'f'},
701 : : {"fsync-interval", required_argument, NULL, 'F'},
702 : : {"no-loop", no_argument, NULL, 'n'},
703 : : {"enable-failover", no_argument, NULL, 5},
704 : : {"enable-two-phase", no_argument, NULL, 't'},
705 : : {"two-phase", no_argument, NULL, 't'}, /* deprecated */
706 : : {"verbose", no_argument, NULL, 'v'},
707 : : {"version", no_argument, NULL, 'V'},
708 : : {"help", no_argument, NULL, '?'},
709 : : /* connection options */
710 : : {"dbname", required_argument, NULL, 'd'},
711 : : {"host", required_argument, NULL, 'h'},
712 : : {"port", required_argument, NULL, 'p'},
713 : : {"username", required_argument, NULL, 'U'},
714 : : {"no-password", no_argument, NULL, 'w'},
715 : : {"password", no_argument, NULL, 'W'},
716 : : /* replication options */
717 : : {"startpos", required_argument, NULL, 'I'},
718 : : {"endpos", required_argument, NULL, 'E'},
719 : : {"option", required_argument, NULL, 'o'},
720 : : {"plugin", required_argument, NULL, 'P'},
721 : : {"status-interval", required_argument, NULL, 's'},
722 : : {"slot", required_argument, NULL, 'S'},
723 : : /* action */
724 : : {"create-slot", no_argument, NULL, 1},
725 : : {"start", no_argument, NULL, 2},
726 : : {"drop-slot", no_argument, NULL, 3},
727 : : {"if-not-exists", no_argument, NULL, 4},
728 : : {NULL, 0, NULL, 0}
729 : : };
730 : : int c;
731 : : int option_index;
732 : : uint32 hi,
733 : : lo;
734 : : char *db_name;
735 : :
2350 peter@eisentraut.org 736 : 19 : pg_logging_init(argv[0]);
4190 rhaas@postgresql.org 737 : 19 : progname = get_progname(argv[0]);
3540 alvherre@alvh.no-ip. 738 : 19 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
739 : :
4190 rhaas@postgresql.org 740 [ + + ]: 19 : if (argc > 1)
741 : : {
742 [ + + - + ]: 18 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
743 : : {
744 : 1 : usage();
745 : 1 : exit(0);
746 : : }
747 [ + - ]: 17 : else if (strcmp(argv[1], "-V") == 0 ||
748 [ + + ]: 17 : strcmp(argv[1], "--version") == 0)
749 : : {
750 : 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
751 : 1 : exit(0);
752 : : }
753 : : }
754 : :
999 peter@eisentraut.org 755 : 86 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
4190 rhaas@postgresql.org 756 [ + + ]: 86 : long_options, &option_index)) != -1)
757 : : {
758 [ + - + + : 70 : switch (c)
- + + - -
- - - - +
+ - - + +
+ + - + ]
759 : : {
760 : : /* general options */
761 : 7 : case 'f':
762 : 7 : outfile = pg_strdup(optarg);
763 : 7 : break;
4122 andres@anarazel.de 764 :UBC 0 : case 'F':
1505 michael@paquier.xyz 765 [ # # ]: 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
766 : : INT_MAX / 1000,
767 : : &fsync_interval))
4122 andres@anarazel.de 768 : 0 : exit(1);
1505 michael@paquier.xyz 769 : 0 : fsync_interval *= 1000;
4122 andres@anarazel.de 770 : 0 : break;
4190 rhaas@postgresql.org 771 :CBC 6 : case 'n':
772 : 6 : noloop = 1;
773 : 6 : break;
1529 akapila@postgresql.o 774 : 2 : case 't':
775 : 2 : two_phase = true;
776 : 2 : break;
999 peter@eisentraut.org 777 :UBC 0 : case 'v':
778 : 0 : verbose++;
779 : 0 : break;
155 msawada@postgresql.o 780 :CBC 1 : case 5:
781 : 1 : failover = true;
782 : 1 : break;
783 : : /* connection options */
4190 rhaas@postgresql.org 784 : 13 : case 'd':
785 : 13 : dbname = pg_strdup(optarg);
786 : 13 : break;
4190 rhaas@postgresql.org 787 :UBC 0 : case 'h':
788 : 0 : dbhost = pg_strdup(optarg);
789 : 0 : break;
790 : 0 : case 'p':
791 : 0 : dbport = pg_strdup(optarg);
792 : 0 : break;
793 : 0 : case 'U':
794 : 0 : dbuser = pg_strdup(optarg);
795 : 0 : break;
796 : 0 : case 'w':
797 : 0 : dbgetpassword = -1;
798 : 0 : break;
799 : 0 : case 'W':
800 : 0 : dbgetpassword = 1;
801 : 0 : break;
802 : : /* replication options */
4122 andres@anarazel.de 803 : 0 : case 'I':
61 alvherre@kurilemu.de 804 [ # # ]:UNC 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1247 tgl@sss.pgh.pa.us 805 :UBC 0 : pg_fatal("could not parse start position \"%s\"", optarg);
4122 andres@anarazel.de 806 : 0 : startpos = ((uint64) hi) << 32 | lo;
807 : 0 : break;
3167 simon@2ndQuadrant.co 808 :CBC 6 : case 'E':
61 alvherre@kurilemu.de 809 [ - + ]:GNC 6 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1247 tgl@sss.pgh.pa.us 810 :UBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
3167 simon@2ndQuadrant.co 811 :CBC 6 : endpos = ((uint64) hi) << 32 | lo;
812 : 6 : break;
4190 rhaas@postgresql.org 813 : 6 : case 'o':
814 : : {
4141 bruce@momjian.us 815 : 6 : char *data = pg_strdup(optarg);
816 : 6 : char *val = strchr(data, '=');
817 : :
4190 rhaas@postgresql.org 818 [ + - ]: 6 : if (val != NULL)
819 : : {
820 : : /* remove =; separate data from val */
821 : 6 : *val = '\0';
822 : 6 : val++;
823 : : }
824 : :
825 : 6 : noptions += 1;
4141 bruce@momjian.us 826 : 6 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
827 : :
4190 rhaas@postgresql.org 828 : 6 : options[(noptions - 1) * 2] = data;
829 : 6 : options[(noptions - 1) * 2 + 1] = val;
830 : : }
831 : :
832 : 6 : break;
4190 rhaas@postgresql.org 833 :UBC 0 : case 'P':
834 : 0 : plugin = pg_strdup(optarg);
835 : 0 : break;
836 : 0 : case 's':
1505 michael@paquier.xyz 837 [ # # ]: 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
838 : : INT_MAX / 1000,
839 : : &standby_message_timeout))
4190 rhaas@postgresql.org 840 : 0 : exit(1);
1505 michael@paquier.xyz 841 : 0 : standby_message_timeout *= 1000;
4190 rhaas@postgresql.org 842 : 0 : break;
4190 rhaas@postgresql.org 843 :CBC 15 : case 'S':
844 : 15 : replication_slot = pg_strdup(optarg);
845 : 15 : break;
846 : : /* action */
847 : 3 : case 1:
848 : 3 : do_create_slot = true;
849 : 3 : break;
850 : 8 : case 2:
851 : 8 : do_start_slot = true;
852 : 8 : break;
853 : 2 : case 3:
854 : 2 : do_drop_slot = true;
855 : 2 : break;
3709 andres@anarazel.de 856 :UBC 0 : case 4:
857 : 0 : slot_exists_ok = true;
858 : 0 : break;
859 : :
4190 rhaas@postgresql.org 860 :CBC 1 : default:
861 : : /* getopt_long already emitted a complaint */
1247 tgl@sss.pgh.pa.us 862 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 863 : 1 : exit(1);
864 : : }
865 : : }
866 : :
867 : : /*
868 : : * Any non-option arguments?
869 : : */
870 [ - + ]: 16 : if (optind < argc)
871 : : {
2350 peter@eisentraut.org 872 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
873 : : argv[optind]);
1247 tgl@sss.pgh.pa.us 874 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 875 : 0 : exit(1);
876 : : }
877 : :
878 : : /*
879 : : * Required arguments
880 : : */
4190 rhaas@postgresql.org 881 [ + + ]:CBC 16 : if (replication_slot == NULL)
882 : : {
2350 peter@eisentraut.org 883 : 1 : pg_log_error("no slot specified");
1247 tgl@sss.pgh.pa.us 884 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 885 : 1 : exit(1);
886 : : }
887 : :
888 [ + + + + ]: 15 : if (do_start_slot && outfile == NULL)
889 : : {
2350 peter@eisentraut.org 890 : 1 : pg_log_error("no target file specified");
1247 tgl@sss.pgh.pa.us 891 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 892 : 1 : exit(1);
893 : : }
894 : :
895 [ + + + + ]: 14 : if (!do_drop_slot && dbname == NULL)
896 : : {
2350 peter@eisentraut.org 897 : 1 : pg_log_error("no database specified");
1247 tgl@sss.pgh.pa.us 898 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 899 : 1 : exit(1);
900 : : }
901 : :
902 [ + + + + : 13 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
+ + ]
903 : : {
2350 peter@eisentraut.org 904 : 1 : pg_log_error("at least one action needs to be specified");
1247 tgl@sss.pgh.pa.us 905 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 906 : 1 : exit(1);
907 : : }
908 : :
909 [ + + + - : 12 : if (do_drop_slot && (do_create_slot || do_start_slot))
- + ]
910 : : {
2350 peter@eisentraut.org 911 :UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
1247 tgl@sss.pgh.pa.us 912 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 913 : 0 : exit(1);
914 : : }
915 : :
4043 andres@anarazel.de 916 [ - + - - :CBC 12 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
- - ]
917 : : {
2350 peter@eisentraut.org 918 :UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
1247 tgl@sss.pgh.pa.us 919 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4190 rhaas@postgresql.org 920 : 0 : exit(1);
921 : : }
922 : :
3167 simon@2ndQuadrant.co 923 [ + + - + ]:CBC 12 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
924 : : {
2350 peter@eisentraut.org 925 :UBC 0 : pg_log_error("--endpos may only be specified with --start");
1247 tgl@sss.pgh.pa.us 926 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3167 simon@2ndQuadrant.co 927 : 0 : exit(1);
928 : : }
929 : :
155 msawada@postgresql.o 930 [ + + ]:CBC 12 : if (!do_create_slot)
931 : : {
932 [ + + ]: 9 : if (two_phase)
933 : : {
69 peter@eisentraut.org 934 : 1 : pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
155 msawada@postgresql.o 935 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
936 : 1 : exit(1);
937 : : }
938 : :
939 [ - + ]: 8 : if (failover)
940 : : {
69 peter@eisentraut.org 941 :UBC 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
155 msawada@postgresql.o 942 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
943 : 0 : exit(1);
944 : : }
945 : : }
946 : :
947 : : /*
948 : : * Obtain a connection to server. Notably, if we need a password, we want
949 : : * to collect it from the user immediately.
950 : : */
3993 andres@anarazel.de 951 :CBC 11 : conn = GetConnection();
952 [ - + ]: 11 : if (!conn)
953 : : /* Error message already written in GetConnection() */
3993 andres@anarazel.de 954 :UBC 0 : exit(1);
2443 peter@eisentraut.org 955 :CBC 11 : atexit(disconnect_atexit);
956 : :
957 : : /*
958 : : * Trap signals. (Don't do this until after the initial password prompt,
959 : : * if one is needed, in GetConnection.)
960 : : */
961 : : #ifndef WIN32
1088 dgustafsson@postgres 962 : 11 : pqsignal(SIGINT, sigexit_handler);
963 : 11 : pqsignal(SIGTERM, sigexit_handler);
1385 tgl@sss.pgh.pa.us 964 : 11 : pqsignal(SIGHUP, sighup_handler);
965 : : #endif
966 : :
967 : : /*
968 : : * Run IDENTIFY_SYSTEM to check the connection type for each action.
969 : : * --create-slot and --start actions require a database-specific
970 : : * replication connection because they handle logical replication slots.
971 : : * --drop-slot can remove replication slots from any replication
972 : : * connection without this restriction.
973 : : */
3993 andres@anarazel.de 974 [ - + ]: 11 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
2443 peter@eisentraut.org 975 :UBC 0 : exit(1);
976 : :
165 fujii@postgresql.org 977 [ + + - + ]:CBC 11 : if (!do_drop_slot && db_name == NULL)
1247 tgl@sss.pgh.pa.us 978 :UBC 0 : pg_fatal("could not establish database-specific replication connection");
979 : :
980 : : /*
981 : : * Set umask so that directories/files are created with the same
982 : : * permissions as directories/files in the source data directory.
983 : : *
984 : : * pg_mode_mask is set to owner-only by default and then updated in
985 : : * GetConnection() where we get the mode from the server-side with
986 : : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
987 : : */
2709 sfrost@snowman.net 988 :CBC 11 : umask(pg_mode_mask);
989 : :
990 : : /* Drop a replication slot. */
4190 rhaas@postgresql.org 991 [ + + ]: 11 : if (do_drop_slot)
992 : : {
993 [ - + ]: 2 : if (verbose)
2350 peter@eisentraut.org 994 :UBC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
995 : :
3993 andres@anarazel.de 996 [ - + ]:CBC 2 : if (!DropReplicationSlot(conn, replication_slot))
2443 peter@eisentraut.org 997 :UBC 0 : exit(1);
998 : : }
999 : :
1000 : : /* Create a replication slot. */
4190 rhaas@postgresql.org 1001 [ + + ]:CBC 11 : if (do_create_slot)
1002 : : {
1003 [ - + ]: 3 : if (verbose)
2350 peter@eisentraut.org 1004 :UBC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1005 : :
2902 peter_e@gmx.net 1006 [ - + ]:CBC 3 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1007 : : false, false, slot_exists_ok, two_phase,
1008 : : failover))
2443 peter@eisentraut.org 1009 :UBC 0 : exit(1);
3709 andres@anarazel.de 1010 :CBC 3 : startpos = InvalidXLogRecPtr;
1011 : : }
1012 : :
4190 rhaas@postgresql.org 1013 [ + + ]: 11 : if (!do_start_slot)
2443 peter@eisentraut.org 1014 : 5 : exit(0);
1015 : :
1016 : : /* Stream loop */
1017 : : while (true)
1018 : : {
3993 andres@anarazel.de 1019 : 6 : StreamLogicalLog();
4190 rhaas@postgresql.org 1020 [ + - ]: 6 : if (time_to_abort)
1021 : : {
1022 : : /*
1023 : : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1024 : : * not an error, so exit without an errorcode.
1025 : : */
2443 peter@eisentraut.org 1026 : 6 : exit(0);
1027 : : }
4190 rhaas@postgresql.org 1028 [ # # ]:UBC 0 : else if (noloop)
1247 tgl@sss.pgh.pa.us 1029 : 0 : pg_fatal("disconnected");
1030 : : else
1031 : : {
1032 : : /* translator: check source for value for %d */
2350 peter@eisentraut.org 1033 : 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1034 : : RECONNECT_SLEEP_TIME);
4190 rhaas@postgresql.org 1035 : 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1036 : : }
1037 : : }
1038 : : }
1039 : :
1040 : : /*
1041 : : * Fsync our output data, and send a feedback message to the server. Returns
1042 : : * true if successful, false otherwise.
1043 : : *
1044 : : * If successful, *now is updated to the current timestamp just before sending
1045 : : * feedback.
1046 : : */
1047 : : static bool
3167 simon@2ndQuadrant.co 1048 :CBC 5 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1049 : : {
1050 : : /* flush data to disk, so that we send a recent flush pointer */
1051 [ - + ]: 5 : if (!OutputFsync(*now))
3167 simon@2ndQuadrant.co 1052 :UBC 0 : return false;
3167 simon@2ndQuadrant.co 1053 :CBC 5 : *now = feGetCurrentTimestamp();
1054 [ - + ]: 5 : if (!sendFeedback(conn, *now, true, false))
3167 simon@2ndQuadrant.co 1055 :UBC 0 : return false;
1056 : :
3167 simon@2ndQuadrant.co 1057 :CBC 5 : return true;
1058 : : }
1059 : :
1060 : : /*
1061 : : * Try to inform the server about our upcoming demise, but don't wait around or
1062 : : * retry on failure.
1063 : : */
1064 : : static void
779 michael@paquier.xyz 1065 : 6 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1066 : : XLogRecPtr lsn)
1067 : : {
3167 simon@2ndQuadrant.co 1068 : 6 : (void) PQputCopyEnd(conn, NULL);
1069 : 6 : (void) PQflush(conn);
1070 : :
1071 [ - + ]: 6 : if (verbose)
1072 : : {
779 michael@paquier.xyz 1073 [ # # # # :UBC 0 : switch (reason)
# ]
1074 : : {
1075 : 0 : case STREAM_STOP_SIGNAL:
1076 : 0 : pg_log_info("received interrupt signal, exiting");
1077 : 0 : break;
1078 : 0 : case STREAM_STOP_KEEPALIVE:
61 alvherre@kurilemu.de 1079 :UNC 0 : pg_log_info("end position %X/%08X reached by keepalive",
1080 : : LSN_FORMAT_ARGS(endpos));
779 michael@paquier.xyz 1081 :UBC 0 : break;
1082 : 0 : case STREAM_STOP_END_OF_WAL:
1083 [ # # ]: 0 : Assert(!XLogRecPtrIsInvalid(lsn));
61 alvherre@kurilemu.de 1084 :UNC 0 : pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1085 : : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
779 michael@paquier.xyz 1086 :UBC 0 : break;
1087 : 0 : case STREAM_STOP_NONE:
1088 : 0 : Assert(false);
1089 : : break;
1090 : : }
1091 : : }
3167 simon@2ndQuadrant.co 1092 :CBC 6 : }
|