Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : : * fashion and write it to a local file.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_recvlogical.c
10 : : *-------------------------------------------------------------------------
11 : : */
12 : :
13 : : #include "postgres_fe.h"
14 : :
15 : : #include <dirent.h>
16 : : #include <limits.h>
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "common/file_perm.h"
22 : : #include "common/logging.h"
23 : : #include "fe_utils/option_utils.h"
24 : : #include "getopt_long.h"
25 : : #include "libpq-fe.h"
26 : : #include "libpq/pqsignal.h"
27 : : #include "libpq/protocol.h"
28 : : #include "pqexpbuffer.h"
29 : : #include "streamutil.h"
30 : :
31 : : /* Time to sleep between reconnection attempts */
32 : : #define RECONNECT_SLEEP_TIME 5
33 : :
34 : : typedef enum
35 : : {
36 : : STREAM_STOP_NONE,
37 : : STREAM_STOP_END_OF_WAL,
38 : : STREAM_STOP_KEEPALIVE,
39 : : STREAM_STOP_SIGNAL
40 : : } StreamStopReason;
41 : :
42 : : /* Global Options */
43 : : static char *outfile = NULL;
44 : : static int verbose = 0;
45 : : static bool two_phase = false; /* enable-two-phase option */
46 : : static bool failover = false; /* enable-failover option */
47 : : static int noloop = 0;
48 : : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : : static bool do_create_slot = false;
53 : : static bool slot_exists_ok = false;
54 : : static bool do_start_slot = false;
55 : : static bool do_drop_slot = false;
56 : : static char *replication_slot = NULL;
57 : :
58 : : /* filled pairwise with option, value. value may be NULL */
59 : : static char **options;
60 : : static size_t noptions = 0;
61 : : static const char *plugin = "test_decoding";
62 : :
63 : : /* Global State */
64 : : static int outfd = -1;
65 : : static volatile sig_atomic_t time_to_abort = false;
66 : : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : : static volatile sig_atomic_t output_reopen = false;
68 : : static bool output_isfile;
69 : : static TimestampTz output_last_fsync = -1;
70 : : static bool output_needs_fsync = false;
71 : : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 : :
74 : : static void usage(void);
75 : : static void StreamLogicalLog(void);
76 : : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : : StreamStopReason reason,
79 : : XLogRecPtr lsn);
80 : :
81 : : static void
4380 rhaas@postgresql.org 82 :CBC 1 : usage(void)
83 : : {
4013 bruce@momjian.us 84 : 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : : progname);
4380 rhaas@postgresql.org 86 : 1 : printf(_("Usage:\n"));
87 : 1 : printf(_(" %s [OPTION]...\n"), progname);
4172 peter_e@gmx.net 88 : 1 : printf(_("\nAction to be performed:\n"));
89 : 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 : 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 : 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
4380 rhaas@postgresql.org 92 : 1 : printf(_("\nOptions:\n"));
259 peter@eisentraut.org 93 : 1 : printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 : : " creating a replication slot\n"));
95 : 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
4172 peter_e@gmx.net 96 : 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
4312 andres@anarazel.de 97 : 1 : printf(_(" -F --fsync-interval=SECS\n"
98 : : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
3833 peter_e@gmx.net 99 : 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
4172 100 : 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
4380 rhaas@postgresql.org 101 : 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
4172 peter_e@gmx.net 102 : 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
103 : : " pass option NAME with optional value VALUE to the\n"
104 : : " output plugin\n"));
105 : 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 : 1 : printf(_(" -s, --status-interval=SECS\n"
107 : : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 : 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
259 peter@eisentraut.org 109 : 1 : printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 : 1 : printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
4380 rhaas@postgresql.org 111 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
112 : 1 : printf(_(" -V, --version output version information, then exit\n"));
113 : 1 : printf(_(" -?, --help show this help, then exit\n"));
114 : 1 : printf(_("\nConnection options:\n"));
115 : 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 : 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 : 1 : printf(_(" -p, --port=PORT database server port number\n"));
118 : 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
119 : 1 : printf(_(" -w, --no-password never prompt for password\n"));
120 : 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
2207 peter@eisentraut.org 121 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
4380 rhaas@postgresql.org 123 : 1 : }
124 : :
125 : : /*
126 : : * Send a Standby Status Update message to server.
127 : : */
128 : : static bool
3307 tgl@sss.pgh.pa.us 129 : 27 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
130 : : {
131 : : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
132 : : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
133 : :
134 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
4380 rhaas@postgresql.org 135 : 27 : int len = 0;
136 : :
137 : : /*
138 : : * we normally don't want to send superfluous feedback, but if it's
139 : : * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 : : * us.
141 : : */
142 [ - + ]: 27 : if (!force &&
4380 rhaas@postgresql.org 143 [ # # ]:UBC 0 : last_written_lsn == output_written_lsn &&
2132 noah@leadboat.com 144 [ # # ]: 0 : last_fsync_lsn == output_fsync_lsn)
4380 rhaas@postgresql.org 145 : 0 : return true;
146 : :
4380 rhaas@postgresql.org 147 [ + + ]:CBC 27 : if (verbose)
251 alvherre@kurilemu.de 148 :GNC 2 : pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
149 : : LSN_FORMAT_ARGS(output_written_lsn),
150 : : LSN_FORMAT_ARGS(output_fsync_lsn),
151 : : replication_slot);
152 : :
221 nathan@postgresql.or 153 : 27 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
4380 rhaas@postgresql.org 154 :CBC 27 : len += 1;
4331 bruce@momjian.us 155 : 27 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
4380 rhaas@postgresql.org 156 : 27 : len += 8;
3189 tgl@sss.pgh.pa.us 157 : 27 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
4380 rhaas@postgresql.org 158 : 27 : len += 8;
4331 bruce@momjian.us 159 : 27 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4380 rhaas@postgresql.org 160 : 27 : len += 8;
4331 bruce@momjian.us 161 : 27 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4380 rhaas@postgresql.org 162 : 27 : len += 8;
3189 tgl@sss.pgh.pa.us 163 : 27 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4380 rhaas@postgresql.org 164 : 27 : len += 1;
165 : :
166 : 27 : startpos = output_written_lsn;
167 : 27 : last_written_lsn = output_written_lsn;
168 : 27 : last_fsync_lsn = output_fsync_lsn;
169 : :
170 [ + - - + ]: 27 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 : : {
2540 peter@eisentraut.org 172 :UBC 0 : pg_log_error("could not send feedback packet: %s",
173 : : PQerrorMessage(conn));
4380 rhaas@postgresql.org 174 : 0 : return false;
175 : : }
176 : :
4380 rhaas@postgresql.org 177 :CBC 27 : return true;
178 : : }
179 : :
180 : : static void
2633 peter@eisentraut.org 181 : 56 : disconnect_atexit(void)
182 : : {
4380 rhaas@postgresql.org 183 [ + + ]: 56 : if (conn != NULL)
184 : 32 : PQfinish(conn);
185 : 56 : }
186 : :
187 : : static void
3307 tgl@sss.pgh.pa.us 188 : 37 : OutputFsync(TimestampTz now)
189 : : {
4380 rhaas@postgresql.org 190 : 37 : output_last_fsync = now;
191 : :
192 : 37 : output_fsync_lsn = output_written_lsn;
193 : :
194 : : /*
195 : : * Save the last flushed position as the replication start point. On
196 : : * reconnect, replication resumes from there to avoid re-sending flushed
197 : : * data.
198 : : */
58 fujii@postgresql.org 199 :GNC 37 : startpos = output_fsync_lsn;
200 : :
4380 rhaas@postgresql.org 201 [ - + ]:CBC 37 : if (fsync_interval <= 0)
58 fujii@postgresql.org 202 :UNC 0 : return;
203 : :
4322 heikki.linnakangas@i 204 [ + + ]:CBC 37 : if (!output_needs_fsync)
58 fujii@postgresql.org 205 :GNC 25 : return;
206 : :
4322 heikki.linnakangas@i 207 :CBC 12 : output_needs_fsync = false;
208 : :
209 : : /* can only fsync if it's a regular file */
3904 andres@anarazel.de 210 [ + + ]: 12 : if (!output_isfile)
58 fujii@postgresql.org 211 :GNC 8 : return;
212 : :
3904 andres@anarazel.de 213 [ - + ]:CBC 4 : if (fsync(outfd) != 0)
1437 tgl@sss.pgh.pa.us 214 :UBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
215 : : }
216 : :
217 : : /*
218 : : * Start the log streaming
219 : : */
220 : : static void
4185 andres@anarazel.de 221 :CBC 25 : StreamLogicalLog(void)
222 : : {
223 : : PGresult *res;
4380 rhaas@postgresql.org 224 : 25 : char *copybuf = NULL;
3307 tgl@sss.pgh.pa.us 225 : 25 : TimestampTz last_status = -1;
226 : : int i;
227 : : PQExpBuffer query;
228 : : XLogRecPtr cur_record_lsn;
229 : :
969 michael@paquier.xyz 230 : 25 : cur_record_lsn = InvalidXLogRecPtr;
231 : :
232 : : /*
233 : : * Connect in replication mode to the server
234 : : */
4380 rhaas@postgresql.org 235 [ + + ]: 25 : if (!conn)
4380 rhaas@postgresql.org 236 :GBC 1 : conn = GetConnection();
4380 rhaas@postgresql.org 237 [ - + ]:CBC 25 : if (!conn)
238 : : /* Error message already written in GetConnection() */
4380 rhaas@postgresql.org 239 :UBC 0 : return;
240 : :
241 : : /*
242 : : * Start the replication
243 : : */
4380 rhaas@postgresql.org 244 [ + + ]:CBC 25 : if (verbose)
251 alvherre@kurilemu.de 245 :GNC 2 : pg_log_info("starting log streaming at %X/%08X (slot %s)",
246 : : LSN_FORMAT_ARGS(startpos),
247 : : replication_slot);
248 : :
249 : : /* Initiate the replication stream at specified location */
1575 tgl@sss.pgh.pa.us 250 :CBC 25 : query = createPQExpBuffer();
251 alvherre@kurilemu.de 251 :GNC 25 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
1846 peter@eisentraut.org 252 :CBC 25 : replication_slot, LSN_FORMAT_ARGS(startpos));
253 : :
254 : : /* print options if there are any */
4380 rhaas@postgresql.org 255 [ + + ]: 25 : if (noptions)
256 : 20 : appendPQExpBufferStr(query, " (");
257 : :
258 [ + + ]: 65 : for (i = 0; i < noptions; i++)
259 : : {
260 : : /* separator */
261 [ + + ]: 40 : if (i > 0)
262 : 20 : appendPQExpBufferStr(query, ", ");
263 : :
264 : : /* write option name */
265 : 40 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
266 : :
267 : : /* write option value if specified */
268 [ + - ]: 40 : if (options[(i * 2) + 1] != NULL)
269 : 40 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
270 : : }
271 : :
272 [ + + ]: 25 : if (noptions)
273 : 20 : appendPQExpBufferChar(query, ')');
274 : :
275 : 25 : res = PQexec(conn, query->data);
276 [ + + ]: 25 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
277 : : {
2540 peter@eisentraut.org 278 : 6 : pg_log_error("could not send replication command \"%s\": %s",
279 : : query->data, PQresultErrorMessage(res));
4380 rhaas@postgresql.org 280 : 6 : PQclear(res);
281 : 6 : goto error;
282 : : }
283 : 19 : PQclear(res);
284 : 19 : resetPQExpBuffer(query);
285 : :
286 [ + + ]: 19 : if (verbose)
2540 peter@eisentraut.org 287 :GBC 2 : pg_log_info("streaming initiated");
288 : :
4380 rhaas@postgresql.org 289 [ + + ]:CBC 333 : while (!time_to_abort)
290 : : {
291 : : int r;
292 : : int bytes_left;
293 : : int bytes_written;
294 : : TimestampTz now;
295 : : int hdr_len;
296 : :
969 michael@paquier.xyz 297 : 331 : cur_record_lsn = InvalidXLogRecPtr;
298 : :
4380 rhaas@postgresql.org 299 [ + + ]: 331 : if (copybuf != NULL)
300 : : {
301 : 203 : PQfreemem(copybuf);
302 : 203 : copybuf = NULL;
303 : : }
304 : :
305 : : /*
306 : : * Potentially send a status message to the primary.
307 : : */
308 : 331 : now = feGetCurrentTimestamp();
309 : :
310 [ + + + + ]: 644 : if (outfd != -1 &&
311 : 313 : feTimestampDifferenceExceeds(output_last_fsync, now,
312 : : fsync_interval))
58 fujii@postgresql.org 313 :GNC 19 : OutputFsync(now);
314 : :
4380 rhaas@postgresql.org 315 [ + - + + ]:CBC 662 : if (standby_message_timeout > 0 &&
316 : 331 : feTimestampDifferenceExceeds(last_status, now,
317 : : standby_message_timeout))
318 : : {
319 : : /* Time to send feedback! */
320 [ - + ]: 19 : if (!sendFeedback(conn, now, true, false))
4380 rhaas@postgresql.org 321 :GBC 2 : goto error;
322 : :
4380 rhaas@postgresql.org 323 :CBC 19 : last_status = now;
324 : : }
325 : :
326 : : /* got SIGHUP, close output file */
4322 heikki.linnakangas@i 327 [ + + - + : 331 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
- - ]
328 : : {
4322 heikki.linnakangas@i 329 :UBC 0 : now = feGetCurrentTimestamp();
58 fujii@postgresql.org 330 :UNC 0 : OutputFsync(now);
4322 heikki.linnakangas@i 331 :UBC 0 : close(outfd);
332 : 0 : outfd = -1;
333 : : }
4322 heikki.linnakangas@i 334 :CBC 331 : output_reopen = false;
335 : :
336 : : /* open the output file, if not open yet */
4321 337 [ + + ]: 331 : if (outfd == -1)
338 : : {
339 : : struct stat statbuf;
340 : :
341 [ + + ]: 18 : if (strcmp(outfile, "-") == 0)
342 : 17 : outfd = fileno(stdout);
343 : : else
4321 heikki.linnakangas@i 344 :GBC 1 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
345 : : S_IRUSR | S_IWUSR);
4321 heikki.linnakangas@i 346 [ - + ]:CBC 18 : if (outfd == -1)
347 : : {
2540 peter@eisentraut.org 348 :UBC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
4321 heikki.linnakangas@i 349 : 0 : goto error;
350 : : }
351 : :
3904 andres@anarazel.de 352 [ - + ]:CBC 18 : if (fstat(outfd, &statbuf) != 0)
353 : : {
2540 peter@eisentraut.org 354 :UBC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
1690 michael@paquier.xyz 355 : 0 : goto error;
356 : : }
357 : :
3904 andres@anarazel.de 358 [ + + + - ]:CBC 18 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
359 : : }
360 : :
4380 rhaas@postgresql.org 361 : 331 : r = PQgetCopyData(conn, ©buf, 1);
362 [ + + ]: 331 : if (r == 0)
363 : 109 : {
364 : : /*
365 : : * In async mode, and no data available. We block on reading but
366 : : * not more than the specified timeout, so that we can send a
367 : : * response back to the client.
368 : : */
369 : : fd_set input_mask;
3307 tgl@sss.pgh.pa.us 370 : 113 : TimestampTz message_target = 0;
371 : 113 : TimestampTz fsync_target = 0;
372 : : struct timeval timeout;
4380 rhaas@postgresql.org 373 : 113 : struct timeval *timeoutptr = NULL;
374 : :
3659 peter_e@gmx.net 375 [ - + ]: 113 : if (PQsocket(conn) < 0)
376 : : {
2540 peter@eisentraut.org 377 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
3659 peter_e@gmx.net 378 :CBC 2 : goto error;
379 : : }
380 : :
4380 rhaas@postgresql.org 381 [ + + ]: 1921 : FD_ZERO(&input_mask);
382 : 113 : FD_SET(PQsocket(conn), &input_mask);
383 : :
384 : : /* Compute when we need to wakeup to send a keepalive message. */
385 [ + - ]: 113 : if (standby_message_timeout)
386 : 113 : message_target = last_status + (standby_message_timeout - 1) *
387 : : ((int64) 1000);
388 : :
389 : : /* Compute when we need to wakeup to fsync the output file. */
4322 heikki.linnakangas@i 390 [ + - + + ]: 113 : if (fsync_interval > 0 && output_needs_fsync)
4380 rhaas@postgresql.org 391 : 47 : fsync_target = output_last_fsync + (fsync_interval - 1) *
392 : : ((int64) 1000);
393 : :
394 : : /* Now compute when to wakeup. */
395 [ - + - - ]: 113 : if (message_target > 0 || fsync_target > 0)
396 : : {
397 : : TimestampTz targettime;
398 : : long secs;
399 : : int usecs;
400 : :
401 : 113 : targettime = message_target;
402 : :
403 [ + + + + ]: 113 : if (fsync_target > 0 && fsync_target < targettime)
4380 rhaas@postgresql.org 404 :GBC 2 : targettime = fsync_target;
405 : :
4380 rhaas@postgresql.org 406 :CBC 113 : feTimestampDifference(now,
407 : : targettime,
408 : : &secs,
409 : : &usecs);
410 [ + + ]: 113 : if (secs <= 0)
4380 rhaas@postgresql.org 411 :GBC 2 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
412 : : else
4380 rhaas@postgresql.org 413 :CBC 111 : timeout.tv_sec = secs;
414 : 113 : timeout.tv_usec = usecs;
415 : 113 : timeoutptr = &timeout;
416 : : }
417 : :
418 : 113 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
419 [ + - + + : 113 : if (r == 0 || (r < 0 && errno == EINTR))
+ - ]
420 : : {
421 : : /*
422 : : * Got a timeout or signal. Continue the loop and either
423 : : * deliver a status packet to the server or just go back into
424 : : * blocking.
425 : : */
426 : 111 : continue;
427 : : }
428 [ - + ]: 111 : else if (r < 0)
429 : : {
1787 peter@eisentraut.org 430 :UBC 0 : pg_log_error("%s() failed: %m", "select");
4380 rhaas@postgresql.org 431 : 0 : goto error;
432 : : }
433 : :
434 : : /* Else there is actually data on the socket */
4380 rhaas@postgresql.org 435 [ + + ]:CBC 111 : if (PQconsumeInput(conn) == 0)
436 : : {
2540 peter@eisentraut.org 437 : 2 : pg_log_error("could not receive data from WAL stream: %s",
438 : : PQerrorMessage(conn));
4380 rhaas@postgresql.org 439 : 2 : goto error;
440 : : }
441 : 109 : continue;
442 : : }
443 : :
444 : : /* End of copy stream */
445 [ + + ]: 218 : if (r == -1)
446 : 15 : break;
447 : :
448 : : /* Failure while reading the copy stream */
449 [ - + ]: 210 : if (r == -2)
450 : : {
2540 peter@eisentraut.org 451 :UBC 0 : pg_log_error("could not read COPY data: %s",
452 : : PQerrorMessage(conn));
4380 rhaas@postgresql.org 453 : 0 : goto error;
454 : : }
455 : :
456 : : /* Check the message type. */
221 nathan@postgresql.or 457 [ + + ]:GNC 210 : if (copybuf[0] == PqReplMsg_Keepalive)
4380 rhaas@postgresql.org 458 :CBC 111 : {
459 : : int pos;
460 : : bool replyRequested;
461 : : XLogRecPtr walEnd;
3357 simon@2ndQuadrant.co 462 : 113 : bool endposReached = false;
463 : :
464 : : /*
465 : : * Parse the keepalive message, enclosed in the CopyData message.
466 : : * We just check if the server requested a reply, and ignore the
467 : : * rest.
468 : : */
221 nathan@postgresql.or 469 :GNC 113 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
4380 rhaas@postgresql.org 470 :CBC 113 : walEnd = fe_recvint64(©buf[pos]);
471 : 113 : output_written_lsn = Max(walEnd, output_written_lsn);
472 : :
473 : 113 : pos += 8; /* read walEnd */
474 : :
475 : 113 : pos += 8; /* skip sendTime */
476 : :
477 [ - + ]: 113 : if (r < pos + 1)
478 : : {
2540 peter@eisentraut.org 479 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4380 rhaas@postgresql.org 480 : 0 : goto error;
481 : : }
4380 rhaas@postgresql.org 482 :CBC 113 : replyRequested = copybuf[pos];
483 : :
129 alvherre@kurilemu.de 484 [ + + + + ]:GNC 113 : if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
485 : : {
486 : : /*
487 : : * If there's nothing to read on the socket until a keepalive
488 : : * we know that the server has nothing to send us; and if
489 : : * walEnd has passed endpos, we know nothing else can have
490 : : * committed before endpos. So we can bail out now.
491 : : */
3357 simon@2ndQuadrant.co 492 :CBC 2 : endposReached = true;
493 : : }
494 : :
495 : : /* Send a reply, if necessary */
496 [ + + + + ]: 113 : if (replyRequested || endposReached)
497 : : {
498 [ - + ]: 3 : if (!flushAndSendFeedback(conn, &now))
4380 rhaas@postgresql.org 499 :UBC 0 : goto error;
4380 rhaas@postgresql.org 500 :CBC 3 : last_status = now;
501 : : }
502 : :
3357 simon@2ndQuadrant.co 503 [ + + ]: 113 : if (endposReached)
504 : : {
969 michael@paquier.xyz 505 : 2 : stop_reason = STREAM_STOP_KEEPALIVE;
3357 simon@2ndQuadrant.co 506 : 2 : time_to_abort = true;
507 : 2 : break;
508 : : }
509 : :
4380 rhaas@postgresql.org 510 : 111 : continue;
511 : : }
221 nathan@postgresql.or 512 [ - + ]:GNC 97 : else if (copybuf[0] != PqReplMsg_WALData)
513 : : {
2540 peter@eisentraut.org 514 :UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
515 : : copybuf[0]);
4380 rhaas@postgresql.org 516 : 0 : goto error;
517 : : }
518 : :
519 : : /*
520 : : * Read the header of the WALData message, enclosed in the CopyData
521 : : * message. We only need the WAL location field (dataStart), the rest
522 : : * of the header is ignored.
523 : : */
221 nathan@postgresql.or 524 :GNC 97 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
4380 rhaas@postgresql.org 525 :CBC 97 : hdr_len += 8; /* dataStart */
526 : 97 : hdr_len += 8; /* walEnd */
527 : 97 : hdr_len += 8; /* sendTime */
528 [ - + ]: 97 : if (r < hdr_len + 1)
529 : : {
2540 peter@eisentraut.org 530 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4380 rhaas@postgresql.org 531 : 0 : goto error;
532 : : }
533 : :
534 : : /* Extract WAL location for this block */
3357 simon@2ndQuadrant.co 535 :CBC 97 : cur_record_lsn = fe_recvint64(©buf[1]);
536 : :
129 alvherre@kurilemu.de 537 [ + + - + ]:GNC 97 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
538 : : {
539 : : /*
540 : : * We've read past our endpoint, so prepare to go away being
541 : : * cautious about what happens to our output data.
542 : : */
3357 simon@2ndQuadrant.co 543 [ # # ]:UBC 0 : if (!flushAndSendFeedback(conn, &now))
544 : 0 : goto error;
969 michael@paquier.xyz 545 : 0 : stop_reason = STREAM_STOP_END_OF_WAL;
3357 simon@2ndQuadrant.co 546 : 0 : time_to_abort = true;
547 : 0 : break;
548 : : }
549 : :
3357 simon@2ndQuadrant.co 550 :CBC 97 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
551 : :
4380 rhaas@postgresql.org 552 : 97 : bytes_left = r - hdr_len;
553 : 97 : bytes_written = 0;
554 : :
555 : : /* signal that a fsync is needed */
4322 heikki.linnakangas@i 556 : 97 : output_needs_fsync = true;
557 : :
4380 rhaas@postgresql.org 558 [ + + ]: 194 : while (bytes_left)
559 : : {
560 : : int ret;
561 : :
562 : 194 : ret = write(outfd,
563 : 97 : copybuf + hdr_len + bytes_written,
564 : : bytes_left);
565 : :
566 [ - + ]: 97 : if (ret < 0)
567 : : {
1579 peter@eisentraut.org 568 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
569 : : bytes_left, outfile);
4380 rhaas@postgresql.org 570 : 0 : goto error;
571 : : }
572 : :
573 : : /* Write was successful, advance our position */
4380 rhaas@postgresql.org 574 :CBC 97 : bytes_written += ret;
575 : 97 : bytes_left -= ret;
576 : : }
577 : :
578 [ - + ]: 97 : if (write(outfd, "\n", 1) != 1)
579 : : {
1579 peter@eisentraut.org 580 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
581 : : 1, outfile);
4380 rhaas@postgresql.org 582 : 0 : goto error;
583 : : }
584 : :
129 alvherre@kurilemu.de 585 [ + + + + ]:GNC 97 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
586 : : {
587 : : /* endpos was exactly the record we just processed, we're done */
3357 simon@2ndQuadrant.co 588 [ - + ]:CBC 5 : if (!flushAndSendFeedback(conn, &now))
3357 simon@2ndQuadrant.co 589 :UBC 0 : goto error;
969 michael@paquier.xyz 590 :CBC 5 : stop_reason = STREAM_STOP_END_OF_WAL;
3357 simon@2ndQuadrant.co 591 : 5 : time_to_abort = true;
592 : 5 : break;
593 : : }
594 : : }
595 : :
596 : : /* Clean up connection state if stream has been aborted */
969 michael@paquier.xyz 597 [ + + ]: 17 : if (time_to_abort)
598 : 9 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
599 : :
4380 rhaas@postgresql.org 600 : 17 : res = PQgetResult(conn);
3357 simon@2ndQuadrant.co 601 [ + + ]: 17 : if (PQresultStatus(res) == PGRES_COPY_OUT)
602 : : {
2132 noah@leadboat.com 603 : 9 : PQclear(res);
604 : :
605 : : /*
606 : : * We're doing a client-initiated clean exit and have sent CopyDone to
607 : : * the server. Drain any messages, so we don't miss a last-minute
608 : : * ErrorResponse. The walsender stops generating WALData records once
609 : : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
610 : : * it's too late for sendFeedback(), even if this were to take a long
611 : : * time. Hence, use synchronous-mode PQgetCopyData().
612 : : */
613 : : while (1)
614 : 149 : {
615 : : int r;
616 : :
617 [ + + ]: 158 : if (copybuf != NULL)
618 : : {
619 : 156 : PQfreemem(copybuf);
620 : 156 : copybuf = NULL;
621 : : }
622 : 158 : r = PQgetCopyData(conn, ©buf, 0);
623 [ + + ]: 158 : if (r == -1)
624 : 9 : break;
625 [ - + ]: 149 : if (r == -2)
626 : : {
2132 noah@leadboat.com 627 :UBC 0 : pg_log_error("could not read COPY data: %s",
628 : : PQerrorMessage(conn));
629 : 0 : time_to_abort = false; /* unclean exit */
630 : 0 : goto error;
631 : : }
632 : : }
633 : :
2132 noah@leadboat.com 634 :CBC 9 : res = PQgetResult(conn);
635 : : }
636 [ + + ]: 17 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
637 : : {
2540 peter@eisentraut.org 638 : 7 : pg_log_error("unexpected termination of replication stream: %s",
639 : : PQresultErrorMessage(res));
355 dgustafsson@postgres 640 : 7 : PQclear(res);
4380 rhaas@postgresql.org 641 : 7 : goto error;
642 : : }
643 : 10 : PQclear(res);
644 : :
645 [ + - + + ]: 10 : if (outfd != -1 && strcmp(outfile, "-") != 0)
646 : : {
3307 tgl@sss.pgh.pa.us 647 :GBC 1 : TimestampTz t = feGetCurrentTimestamp();
648 : :
4380 rhaas@postgresql.org 649 : 1 : OutputFsync(t);
650 [ - + ]: 1 : if (close(outfd) != 0)
2540 peter@eisentraut.org 651 :UBC 0 : pg_log_error("could not close file \"%s\": %m", outfile);
652 : : }
4380 rhaas@postgresql.org 653 :CBC 10 : outfd = -1;
654 : 25 : error:
4332 heikki.linnakangas@i 655 [ - + ]: 25 : if (copybuf != NULL)
656 : : {
4332 heikki.linnakangas@i 657 :UBC 0 : PQfreemem(copybuf);
658 : 0 : copybuf = NULL;
659 : : }
4380 rhaas@postgresql.org 660 :CBC 25 : destroyPQExpBuffer(query);
661 : 25 : PQfinish(conn);
662 : 25 : conn = NULL;
663 : : }
664 : :
665 : : /*
666 : : * Unfortunately we can't do sensible signal handling on windows...
667 : : */
668 : : #ifndef WIN32
669 : :
670 : : /*
671 : : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
672 : : * possible moment.
673 : : */
674 : : static void
1278 tgl@sss.pgh.pa.us 675 : 2 : sigexit_handler(SIGNAL_ARGS)
676 : : {
969 michael@paquier.xyz 677 : 2 : stop_reason = STREAM_STOP_SIGNAL;
4380 rhaas@postgresql.org 678 : 2 : time_to_abort = true;
679 : 2 : }
680 : :
681 : : /*
682 : : * Trigger the output file to be reopened.
683 : : */
684 : : static void
1278 tgl@sss.pgh.pa.us 685 :UBC 0 : sighup_handler(SIGNAL_ARGS)
686 : : {
4380 rhaas@postgresql.org 687 : 0 : output_reopen = true;
688 : 0 : }
689 : : #endif
690 : :
691 : :
692 : : int
4380 rhaas@postgresql.org 693 :CBC 64 : main(int argc, char **argv)
694 : : {
695 : : static struct option long_options[] = {
696 : : /* general options */
697 : : {"file", required_argument, NULL, 'f'},
698 : : {"fsync-interval", required_argument, NULL, 'F'},
699 : : {"no-loop", no_argument, NULL, 'n'},
700 : : {"enable-failover", no_argument, NULL, 5},
701 : : {"enable-two-phase", no_argument, NULL, 't'},
702 : : {"two-phase", no_argument, NULL, 't'}, /* deprecated */
703 : : {"verbose", no_argument, NULL, 'v'},
704 : : {"version", no_argument, NULL, 'V'},
705 : : {"help", no_argument, NULL, '?'},
706 : : /* connection options */
707 : : {"dbname", required_argument, NULL, 'd'},
708 : : {"host", required_argument, NULL, 'h'},
709 : : {"port", required_argument, NULL, 'p'},
710 : : {"username", required_argument, NULL, 'U'},
711 : : {"no-password", no_argument, NULL, 'w'},
712 : : {"password", no_argument, NULL, 'W'},
713 : : /* replication options */
714 : : {"startpos", required_argument, NULL, 'I'},
715 : : {"endpos", required_argument, NULL, 'E'},
716 : : {"option", required_argument, NULL, 'o'},
717 : : {"plugin", required_argument, NULL, 'P'},
718 : : {"status-interval", required_argument, NULL, 's'},
719 : : {"slot", required_argument, NULL, 'S'},
720 : : /* action */
721 : : {"create-slot", no_argument, NULL, 1},
722 : : {"start", no_argument, NULL, 2},
723 : : {"drop-slot", no_argument, NULL, 3},
724 : : {"if-not-exists", no_argument, NULL, 4},
725 : : {NULL, 0, NULL, 0}
726 : : };
727 : : int c;
728 : : int option_index;
729 : : uint32 hi,
730 : : lo;
731 : : char *db_name;
732 : :
2540 peter@eisentraut.org 733 : 64 : pg_logging_init(argv[0]);
4380 rhaas@postgresql.org 734 : 64 : progname = get_progname(argv[0]);
3730 alvherre@alvh.no-ip. 735 : 64 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
736 : :
4380 rhaas@postgresql.org 737 [ + + ]: 64 : if (argc > 1)
738 : : {
739 [ + + - + ]: 63 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
740 : : {
741 : 1 : usage();
742 : 1 : exit(0);
743 : : }
744 [ + - ]: 62 : else if (strcmp(argv[1], "-V") == 0 ||
745 [ + + ]: 62 : strcmp(argv[1], "--version") == 0)
746 : : {
747 : 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
748 : 1 : exit(0);
749 : : }
750 : : }
751 : :
1189 peter@eisentraut.org 752 : 365 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
4380 rhaas@postgresql.org 753 [ + + ]: 365 : long_options, &option_index)) != -1)
754 : : {
755 [ + + + + : 304 : switch (c)
+ + + - -
- - - - +
+ + + + +
+ + - + ]
756 : : {
757 : : /* general options */
758 : 25 : case 'f':
759 : 25 : outfile = pg_strdup(optarg);
760 : 25 : break;
4312 andres@anarazel.de 761 :GBC 1 : case 'F':
1695 michael@paquier.xyz 762 [ - + ]: 1 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
763 : : INT_MAX / 1000,
764 : : &fsync_interval))
4312 andres@anarazel.de 765 :UBC 0 : exit(1);
1695 michael@paquier.xyz 766 :GBC 1 : fsync_interval *= 1000;
4312 andres@anarazel.de 767 : 1 : break;
4380 rhaas@postgresql.org 768 :CBC 23 : case 'n':
769 : 23 : noloop = 1;
770 : 23 : break;
1719 akapila@postgresql.o 771 : 2 : case 't':
772 : 2 : two_phase = true;
773 : 2 : break;
1189 peter@eisentraut.org 774 :GBC 1 : case 'v':
775 : 1 : verbose++;
776 : 1 : break;
345 msawada@postgresql.o 777 :CBC 1 : case 5:
778 : 1 : failover = true;
779 : 1 : break;
780 : : /* connection options */
4380 rhaas@postgresql.org 781 : 58 : case 'd':
782 : 58 : dbname = pg_strdup(optarg);
783 : 58 : break;
4380 rhaas@postgresql.org 784 :UBC 0 : case 'h':
785 : 0 : dbhost = pg_strdup(optarg);
786 : 0 : break;
787 : 0 : case 'p':
788 : 0 : dbport = pg_strdup(optarg);
789 : 0 : break;
790 : 0 : case 'U':
791 : 0 : dbuser = pg_strdup(optarg);
792 : 0 : break;
793 : 0 : case 'w':
794 : 0 : dbgetpassword = -1;
795 : 0 : break;
796 : 0 : case 'W':
797 : 0 : dbgetpassword = 1;
798 : 0 : break;
799 : : /* replication options */
4312 andres@anarazel.de 800 : 0 : case 'I':
251 alvherre@kurilemu.de 801 [ # # ]:UNC 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1437 tgl@sss.pgh.pa.us 802 :UBC 0 : pg_fatal("could not parse start position \"%s\"", optarg);
4312 andres@anarazel.de 803 : 0 : startpos = ((uint64) hi) << 32 | lo;
804 : 0 : break;
3357 simon@2ndQuadrant.co 805 :CBC 8 : case 'E':
251 alvherre@kurilemu.de 806 [ - + ]:GNC 8 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1437 tgl@sss.pgh.pa.us 807 :UBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
3357 simon@2ndQuadrant.co 808 :CBC 8 : endpos = ((uint64) hi) << 32 | lo;
809 : 8 : break;
4380 rhaas@postgresql.org 810 : 40 : case 'o':
811 : : {
4331 bruce@momjian.us 812 : 40 : char *data = pg_strdup(optarg);
813 : 40 : char *val = strchr(data, '=');
814 : :
4380 rhaas@postgresql.org 815 [ + - ]: 40 : if (val != NULL)
816 : : {
817 : : /* remove =; separate data from val */
818 : 40 : *val = '\0';
819 : 40 : val++;
820 : : }
821 : :
822 : 40 : noptions += 1;
16 michael@paquier.xyz 823 :GNC 40 : options = pg_realloc_array(options, char *, noptions * 2);
824 : :
4380 rhaas@postgresql.org 825 :CBC 40 : options[(noptions - 1) * 2] = data;
826 : 40 : options[(noptions - 1) * 2 + 1] = val;
827 : : }
828 : :
829 : 40 : break;
830 : 25 : case 'P':
831 : 25 : plugin = pg_strdup(optarg);
832 : 25 : break;
4380 rhaas@postgresql.org 833 :GBC 1 : case 's':
1695 michael@paquier.xyz 834 [ - + ]: 1 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
835 : : INT_MAX / 1000,
836 : : &standby_message_timeout))
4380 rhaas@postgresql.org 837 :UBC 0 : exit(1);
1695 michael@paquier.xyz 838 :GBC 1 : standby_message_timeout *= 1000;
4380 rhaas@postgresql.org 839 : 1 : break;
4380 rhaas@postgresql.org 840 :CBC 60 : case 'S':
841 : 60 : replication_slot = pg_strdup(optarg);
842 : 60 : break;
843 : : /* action */
844 : 29 : case 1:
845 : 29 : do_create_slot = true;
846 : 29 : break;
847 : 26 : case 2:
848 : 26 : do_start_slot = true;
849 : 26 : break;
850 : 3 : case 3:
851 : 3 : do_drop_slot = true;
852 : 3 : break;
3899 andres@anarazel.de 853 :UBC 0 : case 4:
854 : 0 : slot_exists_ok = true;
855 : 0 : break;
856 : :
4380 rhaas@postgresql.org 857 :CBC 1 : default:
858 : : /* getopt_long already emitted a complaint */
1437 tgl@sss.pgh.pa.us 859 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 860 : 1 : exit(1);
861 : : }
862 : : }
863 : :
864 : : /*
865 : : * Any non-option arguments?
866 : : */
867 [ - + ]: 61 : if (optind < argc)
868 : : {
2540 peter@eisentraut.org 869 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
870 : : argv[optind]);
1437 tgl@sss.pgh.pa.us 871 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 872 : 0 : exit(1);
873 : : }
874 : :
875 : : /*
876 : : * Required arguments
877 : : */
4380 rhaas@postgresql.org 878 [ + + ]:CBC 61 : if (replication_slot == NULL)
879 : : {
2540 peter@eisentraut.org 880 : 1 : pg_log_error("no slot specified");
1437 tgl@sss.pgh.pa.us 881 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 882 : 1 : exit(1);
883 : : }
884 : :
885 [ + + + + ]: 60 : if (do_start_slot && outfile == NULL)
886 : : {
2540 peter@eisentraut.org 887 : 1 : pg_log_error("no target file specified");
1437 tgl@sss.pgh.pa.us 888 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 889 : 1 : exit(1);
890 : : }
891 : :
892 [ + + + + ]: 59 : if (!do_drop_slot && dbname == NULL)
893 : : {
2540 peter@eisentraut.org 894 : 1 : pg_log_error("no database specified");
1437 tgl@sss.pgh.pa.us 895 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 896 : 1 : exit(1);
897 : : }
898 : :
899 [ + + + + : 58 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
+ + ]
900 : : {
2540 peter@eisentraut.org 901 : 1 : pg_log_error("at least one action needs to be specified");
1437 tgl@sss.pgh.pa.us 902 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 903 : 1 : exit(1);
904 : : }
905 : :
906 [ + + + - : 57 : if (do_drop_slot && (do_create_slot || do_start_slot))
- + ]
907 : : {
2540 peter@eisentraut.org 908 :UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
1437 tgl@sss.pgh.pa.us 909 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 910 : 0 : exit(1);
911 : : }
912 : :
129 alvherre@kurilemu.de 913 [ - + - - :GNC 57 : if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
- - ]
914 : : {
2540 peter@eisentraut.org 915 :UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
1437 tgl@sss.pgh.pa.us 916 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4380 rhaas@postgresql.org 917 : 0 : exit(1);
918 : : }
919 : :
129 alvherre@kurilemu.de 920 [ + + - + ]:GNC 57 : if (XLogRecPtrIsValid(endpos) && !do_start_slot)
921 : : {
2540 peter@eisentraut.org 922 :UBC 0 : pg_log_error("--endpos may only be specified with --start");
1437 tgl@sss.pgh.pa.us 923 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3357 simon@2ndQuadrant.co 924 : 0 : exit(1);
925 : : }
926 : :
345 msawada@postgresql.o 927 [ + + ]:CBC 57 : if (!do_create_slot)
928 : : {
929 [ + + ]: 28 : if (two_phase)
930 : : {
259 peter@eisentraut.org 931 : 1 : pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
345 msawada@postgresql.o 932 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
933 : 1 : exit(1);
934 : : }
935 : :
936 [ - + ]: 27 : if (failover)
937 : : {
259 peter@eisentraut.org 938 :UBC 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
345 msawada@postgresql.o 939 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
940 : 0 : exit(1);
941 : : }
942 : : }
943 : :
944 : : /*
945 : : * Obtain a connection to server. Notably, if we need a password, we want
946 : : * to collect it from the user immediately.
947 : : */
4183 andres@anarazel.de 948 :CBC 56 : conn = GetConnection();
949 [ - + ]: 56 : if (!conn)
950 : : /* Error message already written in GetConnection() */
4183 andres@anarazel.de 951 :UBC 0 : exit(1);
2633 peter@eisentraut.org 952 :CBC 56 : atexit(disconnect_atexit);
953 : :
954 : : /*
955 : : * Trap signals. (Don't do this until after the initial password prompt,
956 : : * if one is needed, in GetConnection.)
957 : : */
958 : : #ifndef WIN32
1278 dgustafsson@postgres 959 : 56 : pqsignal(SIGINT, sigexit_handler);
960 : 56 : pqsignal(SIGTERM, sigexit_handler);
1575 tgl@sss.pgh.pa.us 961 : 56 : pqsignal(SIGHUP, sighup_handler);
962 : : #endif
963 : :
964 : : /*
965 : : * Run IDENTIFY_SYSTEM to check the connection type for each action.
966 : : * --create-slot and --start actions require a database-specific
967 : : * replication connection because they handle logical replication slots.
968 : : * --drop-slot can remove replication slots from any replication
969 : : * connection without this restriction.
970 : : */
4183 andres@anarazel.de 971 [ - + ]: 56 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
2633 peter@eisentraut.org 972 :UBC 0 : exit(1);
973 : :
355 fujii@postgresql.org 974 [ + + - + ]:CBC 56 : if (!do_drop_slot && db_name == NULL)
1437 tgl@sss.pgh.pa.us 975 :UBC 0 : pg_fatal("could not establish database-specific replication connection");
976 : :
977 : : /*
978 : : * Set umask so that directories/files are created with the same
979 : : * permissions as directories/files in the source data directory.
980 : : *
981 : : * pg_mode_mask is set to owner-only by default and then updated in
982 : : * GetConnection() where we get the mode from the server-side with
983 : : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
984 : : */
2899 sfrost@snowman.net 985 :CBC 56 : umask(pg_mode_mask);
986 : :
987 : : /* Drop a replication slot. */
4380 rhaas@postgresql.org 988 [ + + ]: 56 : if (do_drop_slot)
989 : : {
990 [ - + ]: 3 : if (verbose)
2540 peter@eisentraut.org 991 :UBC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
992 : :
4183 andres@anarazel.de 993 [ - + ]:CBC 3 : if (!DropReplicationSlot(conn, replication_slot))
2633 peter@eisentraut.org 994 :UBC 0 : exit(1);
995 : : }
996 : :
997 : : /* Create a replication slot. */
4380 rhaas@postgresql.org 998 [ + + ]:CBC 56 : if (do_create_slot)
999 : : {
1000 [ - + ]: 29 : if (verbose)
2540 peter@eisentraut.org 1001 :UBC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1002 : :
3092 peter_e@gmx.net 1003 [ - + ]:CBC 29 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1004 : : false, false, slot_exists_ok, two_phase,
1005 : : failover))
2633 peter@eisentraut.org 1006 :UBC 0 : exit(1);
3899 andres@anarazel.de 1007 :CBC 29 : startpos = InvalidXLogRecPtr;
1008 : : }
1009 : :
4380 rhaas@postgresql.org 1010 [ + + ]: 56 : if (!do_start_slot)
2633 peter@eisentraut.org 1011 : 32 : exit(0);
1012 : :
1013 : : /* Stream loop */
1014 : : while (true)
1015 : : {
4183 andres@anarazel.de 1016 : 25 : StreamLogicalLog();
4380 rhaas@postgresql.org 1017 [ + + ]: 25 : if (time_to_abort)
1018 : : {
1019 : : /*
1020 : : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1021 : : * not an error, so exit without an errorcode.
1022 : : */
2633 peter@eisentraut.org 1023 : 9 : exit(0);
1024 : : }
1025 : :
1026 : : /*
1027 : : * Ensure all written data is flushed to disk before exiting or
1028 : : * starting a new replication.
1029 : : */
58 fujii@postgresql.org 1030 [ + + ]:GNC 16 : if (outfd != -1)
1031 : 9 : OutputFsync(feGetCurrentTimestamp());
1032 : :
1033 [ + + ]: 16 : if (noloop)
1034 : : {
1437 tgl@sss.pgh.pa.us 1035 :CBC 15 : pg_fatal("disconnected");
1036 : : }
1037 : : else
1038 : : {
1039 : : /* translator: check source for value for %d */
2540 peter@eisentraut.org 1040 :GBC 1 : pg_log_info("disconnected; waiting %d seconds to try again",
1041 : : RECONNECT_SLEEP_TIME);
4380 rhaas@postgresql.org 1042 : 1 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1043 : : }
1044 : : }
1045 : : }
1046 : :
1047 : : /*
1048 : : * Fsync our output data, and send a feedback message to the server. Returns
1049 : : * true if successful, false otherwise.
1050 : : *
1051 : : * If successful, *now is updated to the current timestamp just before sending
1052 : : * feedback.
1053 : : */
1054 : : static bool
3357 simon@2ndQuadrant.co 1055 :CBC 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1056 : : {
1057 : : /* flush data to disk, so that we send a recent flush pointer */
58 fujii@postgresql.org 1058 :GNC 8 : OutputFsync(*now);
3357 simon@2ndQuadrant.co 1059 :CBC 8 : *now = feGetCurrentTimestamp();
1060 [ - + ]: 8 : if (!sendFeedback(conn, *now, true, false))
3357 simon@2ndQuadrant.co 1061 :UBC 0 : return false;
1062 : :
3357 simon@2ndQuadrant.co 1063 :CBC 8 : return true;
1064 : : }
1065 : :
1066 : : /*
1067 : : * Try to inform the server about our upcoming demise, but don't wait around or
1068 : : * retry on failure.
1069 : : */
1070 : : static void
969 michael@paquier.xyz 1071 : 9 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1072 : : XLogRecPtr lsn)
1073 : : {
3357 simon@2ndQuadrant.co 1074 : 9 : (void) PQputCopyEnd(conn, NULL);
1075 : 9 : (void) PQflush(conn);
1076 : :
1077 [ + + ]: 9 : if (verbose)
1078 : : {
969 michael@paquier.xyz 1079 [ + - - - :GBC 1 : switch (reason)
- ]
1080 : : {
1081 : 1 : case STREAM_STOP_SIGNAL:
1082 : 1 : pg_log_info("received interrupt signal, exiting");
1083 : 1 : break;
969 michael@paquier.xyz 1084 :UBC 0 : case STREAM_STOP_KEEPALIVE:
251 alvherre@kurilemu.de 1085 :UNC 0 : pg_log_info("end position %X/%08X reached by keepalive",
1086 : : LSN_FORMAT_ARGS(endpos));
969 michael@paquier.xyz 1087 :UBC 0 : break;
1088 : 0 : case STREAM_STOP_END_OF_WAL:
129 alvherre@kurilemu.de 1089 [ # # ]:UNC 0 : Assert(XLogRecPtrIsValid(lsn));
251 1090 : 0 : pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1091 : : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
969 michael@paquier.xyz 1092 :UBC 0 : break;
1093 : 0 : case STREAM_STOP_NONE:
1094 : 0 : Assert(false);
1095 : : break;
1096 : : }
1097 : : }
3357 simon@2ndQuadrant.co 1098 :CBC 9 : }
|