Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * receivelog.c - receive WAL files using the streaming
4 : : * replication protocol.
5 : : *
6 : : * Author: Magnus Hagander <magnus@hagander.net>
7 : : *
8 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_basebackup/receivelog.c
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres_fe.h"
16 : :
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "access/xlog_internal.h"
22 : : #include "common/logging.h"
23 : : #include "libpq-fe.h"
24 : : #include "libpq/protocol.h"
25 : : #include "receivelog.h"
26 : : #include "streamutil.h"
27 : :
28 : : /* currently open WAL file */
29 : : static Walfile *walfile = NULL;
30 : : static bool reportFlushPosition = false;
31 : : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
32 : :
33 : : static bool still_sending = true; /* feedback still needs to be sent? */
34 : :
35 : : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
36 : : XLogRecPtr *stoppos);
37 : : static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
38 : : static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
39 : : char **buffer);
40 : : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
41 : : int len, XLogRecPtr blockpos, TimestampTz *last_status);
42 : : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
43 : : XLogRecPtr *blockpos);
44 : : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
45 : : XLogRecPtr blockpos, XLogRecPtr *stoppos);
46 : : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47 : : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
48 : : TimestampTz last_status);
49 : :
50 : : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
51 : : uint32 *timeline);
52 : :
53 : : static bool
3240 magnus@hagander.net 54 :CBC 6 : mark_file_as_archived(StreamCtl *stream, const char *fname)
55 : : {
56 : : Walfile *f;
57 : : static char tmppath[MAXPGPATH];
58 : :
59 : 6 : snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
60 : : fname);
61 : :
1083 rhaas@postgresql.org 62 : 6 : f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
63 : : NULL, 0);
3240 magnus@hagander.net 64 [ - + ]: 6 : if (f == NULL)
65 : : {
2350 peter@eisentraut.org 66 :UBC 0 : pg_log_error("could not create archive status file \"%s\": %s",
67 : : tmppath, GetLastWalMethodError(stream->walmethod));
3899 andres@anarazel.de 68 : 0 : return false;
69 : : }
70 : :
1083 rhaas@postgresql.org 71 [ - + ]:CBC 6 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
72 : : {
1389 tgl@sss.pgh.pa.us 73 :UBC 0 : pg_log_error("could not close archive status file \"%s\": %s",
74 : : tmppath, GetLastWalMethodError(stream->walmethod));
75 : 0 : return false;
76 : : }
77 : :
3899 andres@anarazel.de 78 :CBC 6 : return true;
79 : : }
80 : :
81 : : /*
82 : : * Open a new WAL file in the specified directory.
83 : : *
84 : : * Returns true if OK; on failure, returns false after printing an error msg.
85 : : * On success, 'walfile' is set to the opened WAL file.
86 : : *
87 : : * The file will be padded to 16Mb with zeroes.
88 : : */
89 : : static bool
3466 magnus@hagander.net 90 : 142 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 : : {
92 : : Walfile *f;
93 : : char *fn;
94 : : ssize_t size;
95 : : XLogSegNo segno;
96 : : char walfile_name[MAXPGPATH];
97 : :
2909 andres@anarazel.de 98 : 142 : XLByteToSeg(startpoint, segno, WalSegSz);
1083 rhaas@postgresql.org 99 : 142 : XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
100 : :
101 : : /* Note that this considers the compression used if necessary */
102 : 142 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
103 : : walfile_name,
104 : 142 : stream->partial_suffix);
105 : :
106 : : /*
107 : : * When streaming to files, if an existing file exists we verify that it's
108 : : * either empty (just created), or a complete WalSegSz segment (in which
109 : : * case it has been created and padded). Anything else indicates a corrupt
110 : : * file. Compressed files have no need for padding, so just ignore this
111 : : * case.
112 : : *
113 : : * When streaming to tar, no file with this name will exist before, so we
114 : : * never have to verify a size.
115 : : */
116 [ + + - + ]: 277 : if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
117 : 135 : stream->walmethod->ops->existsfile(stream->walmethod, fn))
118 : : {
1083 rhaas@postgresql.org 119 :UBC 0 : size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
3240 magnus@hagander.net 120 [ # # ]: 0 : if (size < 0)
121 : : {
2350 peter@eisentraut.org 122 : 0 : pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 : : fn, GetLastWalMethodError(stream->walmethod));
1503 michael@paquier.xyz 124 : 0 : pg_free(fn);
3240 magnus@hagander.net 125 : 0 : return false;
126 : : }
2909 andres@anarazel.de 127 [ # # ]: 0 : if (size == WalSegSz)
128 : : {
129 : : /* Already padded file. Open it for use */
1083 rhaas@postgresql.org 130 : 0 : f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
3240 magnus@hagander.net 131 [ # # ]: 0 : if (f == NULL)
132 : : {
2350 peter@eisentraut.org 133 : 0 : pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 : : fn, GetLastWalMethodError(stream->walmethod));
1503 michael@paquier.xyz 135 : 0 : pg_free(fn);
3261 tgl@sss.pgh.pa.us 136 : 0 : return false;
137 : : }
138 : :
139 : : /* fsync file in case of a previous crash */
1083 rhaas@postgresql.org 140 [ # # ]: 0 : if (stream->walmethod->ops->sync(f) != 0)
141 : : {
1247 tgl@sss.pgh.pa.us 142 : 0 : pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 : : fn, GetLastWalMethodError(stream->walmethod));
1083 rhaas@postgresql.org 144 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
2231 peter@eisentraut.org 145 : 0 : exit(1);
146 : : }
147 : :
3240 magnus@hagander.net 148 : 0 : walfile = f;
1503 michael@paquier.xyz 149 : 0 : pg_free(fn);
3240 magnus@hagander.net 150 : 0 : return true;
151 : : }
152 [ # # ]: 0 : if (size != 0)
153 : : {
154 : : /* if write didn't set errno, assume problem is no disk space */
3261 tgl@sss.pgh.pa.us 155 [ # # ]: 0 : if (errno == 0)
156 : 0 : errno = ENOSPC;
1490 peter@eisentraut.org 157 : 0 : pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
158 : : "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
159 : : size),
160 : : fn, size, WalSegSz);
1503 michael@paquier.xyz 161 : 0 : pg_free(fn);
4615 heikki.linnakangas@i 162 : 0 : return false;
163 : : }
164 : : /* File existed and was empty, so fall through and open */
165 : : }
166 : :
167 : : /* No file existed, so create one */
168 : :
1083 rhaas@postgresql.org 169 :CBC 142 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
170 : : walfile_name,
171 : 142 : stream->partial_suffix,
172 : : WalSegSz);
3240 magnus@hagander.net 173 [ - + ]: 142 : if (f == NULL)
174 : : {
2350 peter@eisentraut.org 175 :UBC 0 : pg_log_error("could not open write-ahead log file \"%s\": %s",
176 : : fn, GetLastWalMethodError(stream->walmethod));
1503 michael@paquier.xyz 177 : 0 : pg_free(fn);
4615 heikki.linnakangas@i 178 : 0 : return false;
179 : : }
180 : :
1503 michael@paquier.xyz 181 :CBC 142 : pg_free(fn);
4615 heikki.linnakangas@i 182 : 142 : walfile = f;
183 : 142 : return true;
184 : : }
185 : :
186 : : /*
187 : : * Close the current WAL file (if open), and rename it to the correct
188 : : * filename if it's complete. On failure, prints an error message to stderr
189 : : * and returns false, otherwise returns true.
190 : : */
191 : : static bool
3466 magnus@hagander.net 192 : 146 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
193 : : {
194 : : char *fn;
195 : : pgoff_t currpos;
196 : : int r;
197 : : char walfile_name[MAXPGPATH];
198 : :
3240 199 [ + + ]: 146 : if (walfile == NULL)
4615 heikki.linnakangas@i 200 : 4 : return true;
201 : :
1083 rhaas@postgresql.org 202 : 142 : strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
203 : 142 : currpos = walfile->currpos;
204 : :
205 : : /* Note that this considers the compression used if necessary */
206 : 142 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
207 : : walfile_name,
208 : 142 : stream->partial_suffix);
209 : :
3240 magnus@hagander.net 210 [ + + ]: 142 : if (stream->partial_suffix)
211 : : {
2909 andres@anarazel.de 212 [ + + ]: 12 : if (currpos == WalSegSz)
1083 rhaas@postgresql.org 213 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
214 : : else
215 : : {
1450 michael@paquier.xyz 216 : 6 : pg_log_info("not renaming \"%s\", segment is not complete", fn);
1083 rhaas@postgresql.org 217 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
218 : : }
219 : : }
220 : : else
221 : 130 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
222 : :
3240 magnus@hagander.net 223 : 142 : walfile = NULL;
224 : :
225 [ - + ]: 142 : if (r != 0)
226 : : {
2350 peter@eisentraut.org 227 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
228 : : fn, GetLastWalMethodError(stream->walmethod));
229 : :
1450 michael@paquier.xyz 230 : 0 : pg_free(fn);
5056 magnus@hagander.net 231 : 0 : return false;
232 : : }
233 : :
1450 michael@paquier.xyz 234 :CBC 142 : pg_free(fn);
235 : :
236 : : /*
237 : : * Mark file as archived if requested by the caller - pg_basebackup needs
238 : : * to do so as files can otherwise get archived again after promotion of a
239 : : * new node. This is in line with walreceiver.c always doing a
240 : : * XLogArchiveForceDone() after a complete segment.
241 : : */
2909 andres@anarazel.de 242 [ + + + + ]: 142 : if (currpos == WalSegSz && stream->mark_done)
243 : : {
244 : : /* writes error message if failed */
1083 rhaas@postgresql.org 245 [ - + ]: 4 : if (!mark_file_as_archived(stream, walfile_name))
3899 andres@anarazel.de 246 :UBC 0 : return false;
247 : : }
248 : :
4236 rhaas@postgresql.org 249 :CBC 142 : lastFlushPosition = pos;
5056 magnus@hagander.net 250 : 142 : return true;
251 : : }
252 : :
253 : :
254 : : /*
255 : : * Check if a timeline history file exists.
256 : : */
257 : : static bool
3466 258 : 137 : existsTimeLineHistoryFile(StreamCtl *stream)
259 : : {
260 : : char histfname[MAXFNAMELEN];
261 : :
262 : : /*
263 : : * Timeline 1 never has a history file. We treat that as if it existed,
264 : : * since we never need to stream it.
265 : : */
266 [ + + ]: 137 : if (stream->timeline == 1)
4615 heikki.linnakangas@i 267 : 134 : return true;
268 : :
3466 magnus@hagander.net 269 : 3 : TLHistoryFileName(histfname, stream->timeline);
270 : :
1083 rhaas@postgresql.org 271 : 3 : return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
272 : : }
273 : :
274 : : static bool
3466 magnus@hagander.net 275 : 3 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
276 : : {
4615 heikki.linnakangas@i 277 : 3 : int size = strlen(content);
278 : : char histfname[MAXFNAMELEN];
279 : : Walfile *f;
280 : :
281 : : /*
282 : : * Check that the server's idea of how timeline history files should be
283 : : * named matches ours.
284 : : */
3466 magnus@hagander.net 285 : 3 : TLHistoryFileName(histfname, stream->timeline);
4615 heikki.linnakangas@i 286 [ - + ]: 3 : if (strcmp(histfname, filename) != 0)
287 : : {
2350 peter@eisentraut.org 288 :UBC 0 : pg_log_error("server reported unexpected history file name for timeline %u: %s",
289 : : stream->timeline, filename);
4615 heikki.linnakangas@i 290 : 0 : return false;
291 : : }
292 : :
1083 rhaas@postgresql.org 293 :CBC 3 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
294 : : histfname, ".tmp", 0);
3240 magnus@hagander.net 295 [ - + ]: 3 : if (f == NULL)
296 : : {
2350 peter@eisentraut.org 297 :UBC 0 : pg_log_error("could not create timeline history file \"%s\": %s",
298 : : histfname, GetLastWalMethodError(stream->walmethod));
4615 heikki.linnakangas@i 299 : 0 : return false;
300 : : }
301 : :
1083 rhaas@postgresql.org 302 [ - + ]:CBC 3 : if ((int) stream->walmethod->ops->write(f, content, size) != size)
303 : : {
2350 peter@eisentraut.org 304 :UBC 0 : pg_log_error("could not write timeline history file \"%s\": %s",
305 : : histfname, GetLastWalMethodError(stream->walmethod));
306 : :
307 : : /*
308 : : * If we fail to make the file, delete it to release disk space
309 : : */
1083 rhaas@postgresql.org 310 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
311 : :
4615 heikki.linnakangas@i 312 : 0 : return false;
313 : : }
314 : :
1083 rhaas@postgresql.org 315 [ - + ]:CBC 3 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
316 : : {
2350 peter@eisentraut.org 317 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
318 : : histfname, GetLastWalMethodError(stream->walmethod));
4615 heikki.linnakangas@i 319 : 0 : return false;
320 : : }
321 : :
322 : : /* Maintain archive_status, check close_walfile() for details. */
3466 magnus@hagander.net 323 [ + + ]:CBC 3 : if (stream->mark_done)
324 : : {
325 : : /* writes error message if failed */
3240 326 [ - + ]: 2 : if (!mark_file_as_archived(stream, histfname))
3899 andres@anarazel.de 327 :UBC 0 : return false;
328 : : }
329 : :
4615 heikki.linnakangas@i 330 :CBC 3 : return true;
331 : : }
332 : :
333 : : /*
334 : : * Send a Standby Status Update message to server.
335 : : */
336 : : static bool
3117 tgl@sss.pgh.pa.us 337 : 136 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
338 : : {
339 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
4483 bruce@momjian.us 340 : 136 : int len = 0;
341 : :
31 nathan@postgresql.or 342 :GNC 136 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
4686 heikki.linnakangas@i 343 :CBC 136 : len += 1;
2999 tgl@sss.pgh.pa.us 344 : 136 : fe_sendint64(blockpos, &replybuf[len]); /* write */
4686 heikki.linnakangas@i 345 : 136 : len += 8;
4236 rhaas@postgresql.org 346 [ + + ]: 136 : if (reportFlushPosition)
2999 tgl@sss.pgh.pa.us 347 : 132 : fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
348 : : else
349 : 4 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
4686 heikki.linnakangas@i 350 : 136 : len += 8;
4141 bruce@momjian.us 351 : 136 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4686 heikki.linnakangas@i 352 : 136 : len += 8;
4141 bruce@momjian.us 353 : 136 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4686 heikki.linnakangas@i 354 : 136 : len += 8;
2999 tgl@sss.pgh.pa.us 355 : 136 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4686 heikki.linnakangas@i 356 : 136 : len += 1;
357 : :
358 [ + - - + ]: 136 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359 : : {
2350 peter@eisentraut.org 360 :UBC 0 : pg_log_error("could not send feedback packet: %s",
361 : : PQerrorMessage(conn));
4686 heikki.linnakangas@i 362 : 0 : return false;
363 : : }
364 : :
4686 heikki.linnakangas@i 365 :CBC 136 : return true;
366 : : }
367 : :
368 : : /*
369 : : * Check that the server version we're connected to is supported by
370 : : * ReceiveXlogStream().
371 : : *
372 : : * If it's not, an error message is printed to stderr, and false is returned.
373 : : */
374 : : bool
4551 375 : 293 : CheckServerVersionForStreaming(PGconn *conn)
376 : : {
377 : : int minServerMajor,
378 : : maxServerMajor;
379 : : int serverMajor;
380 : :
381 : : /*
382 : : * The message format used in streaming replication changed in 9.3, so we
383 : : * cannot stream from older servers. And we don't support servers newer
384 : : * than the client; it might work, but we don't know, so err on the safe
385 : : * side.
386 : : */
387 : 293 : minServerMajor = 903;
388 : 293 : maxServerMajor = PG_VERSION_NUM / 100;
389 : 293 : serverMajor = PQserverVersion(conn) / 100;
4141 simon@2ndQuadrant.co 390 [ - + ]: 293 : if (serverMajor < minServerMajor)
391 : : {
4551 heikki.linnakangas@i 392 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
393 : :
2350 peter@eisentraut.org 394 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395 : : serverver ? serverver : "'unknown'",
396 : : "9.3");
4141 simon@2ndQuadrant.co 397 : 0 : return false;
398 : : }
4141 simon@2ndQuadrant.co 399 [ - + ]:CBC 293 : else if (serverMajor > maxServerMajor)
400 : : {
4141 simon@2ndQuadrant.co 401 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
402 : :
2350 peter@eisentraut.org 403 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404 : : serverver ? serverver : "'unknown'",
405 : : PG_VERSION);
4551 heikki.linnakangas@i 406 : 0 : return false;
407 : : }
4551 heikki.linnakangas@i 408 :CBC 293 : return true;
409 : : }
410 : :
411 : : /*
412 : : * Receive a log stream starting at the specified position.
413 : : *
414 : : * Individual parameters are passed through the StreamCtl structure.
415 : : *
416 : : * If sysidentifier is specified, validate that both the system
417 : : * identifier and the timeline matches the specified ones
418 : : * (by sending an extra IDENTIFY_SYSTEM command)
419 : : *
420 : : * All received segments will be written to the directory
421 : : * specified by basedir. This will also fetch any missing timeline history
422 : : * files.
423 : : *
424 : : * The stream_stop callback will be called every time data
425 : : * is received, and whenever a segment is completed. If it returns
426 : : * true, the streaming will stop and the function
427 : : * return. As long as it returns false, streaming will continue
428 : : * indefinitely.
429 : : *
430 : : * If stream_stop() checks for external input, stop_socket should be set to
431 : : * the FD it checks. This will allow such input to be detected promptly
432 : : * rather than after standby_message_timeout (which might be indefinite).
433 : : * Note that signals will interrupt waits for input as well, but that is
434 : : * race-y since a signal received while busy won't interrupt the wait.
435 : : *
436 : : * standby_message_timeout controls how often we send a message
437 : : * back to the primary letting it know our progress, in milliseconds.
438 : : * Zero means no messages are sent.
439 : : * This message will only contain the write location, and never
440 : : * flush or replay.
441 : : *
442 : : * If 'partial_suffix' is not NULL, files are initially created with the
443 : : * given suffix, and the suffix is removed once the file is finished. That
444 : : * allows you to tell the difference between partial and completed files,
445 : : * so that you can continue later where you left.
446 : : *
447 : : * If 'synchronous' is true, the received WAL is flushed as soon as written,
448 : : * otherwise only when the WAL file is closed.
449 : : *
450 : : * Note: The WAL location *must* be at a log segment start!
451 : : */
452 : : bool
3466 magnus@hagander.net 453 : 136 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
454 : : {
455 : : char query[128];
456 : : char slotcmd[128];
457 : : PGresult *res;
458 : : XLogRecPtr stoppos;
459 : :
460 : : /*
461 : : * The caller should've checked the server version already, but doesn't do
462 : : * any harm to check it here too.
463 : : */
4551 heikki.linnakangas@i 464 [ - + ]: 136 : if (!CheckServerVersionForStreaming(conn))
4617 heikki.linnakangas@i 465 :UBC 0 : return false;
466 : :
467 : : /*
468 : : * Decide whether we want to report the flush position. If we report the
469 : : * flush position, the primary will know what WAL we'll possibly
470 : : * re-request, and it can then remove older WAL safely. We must always do
471 : : * that when we are using slots.
472 : : *
473 : : * Reporting the flush position makes one eligible as a synchronous
474 : : * replica. People shouldn't include generic names in
475 : : * synchronous_standby_names, but we've protected them against it so far,
476 : : * so let's continue to do so unless specifically requested.
477 : : */
3155 magnus@hagander.net 478 [ + + ]:CBC 136 : if (stream->replication_slot != NULL)
479 : : {
4236 rhaas@postgresql.org 480 : 131 : reportFlushPosition = true;
3155 magnus@hagander.net 481 : 131 : sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
482 : : }
483 : : else
484 : : {
3295 simon@2ndQuadrant.co 485 [ + + ]: 5 : if (stream->synchronous)
486 : 1 : reportFlushPosition = true;
487 : : else
488 : 4 : reportFlushPosition = false;
4236 rhaas@postgresql.org 489 : 5 : slotcmd[0] = 0;
490 : : }
491 : :
3466 magnus@hagander.net 492 [ + - ]: 136 : if (stream->sysidentifier != NULL)
493 : : {
1467 michael@paquier.xyz 494 : 136 : char *sysidentifier = NULL;
495 : : TimeLineID servertli;
496 : :
497 : : /*
498 : : * Get the server system identifier and timeline, and validate them.
499 : : */
500 [ - + ]: 136 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
501 : : {
1467 michael@paquier.xyz 502 :UBC 0 : pg_free(sysidentifier);
5064 magnus@hagander.net 503 : 0 : return false;
504 : : }
505 : :
1467 michael@paquier.xyz 506 [ - + ]:CBC 136 : if (strcmp(stream->sysidentifier, sysidentifier) != 0)
507 : : {
2350 peter@eisentraut.org 508 :UBC 0 : pg_log_error("system identifier does not match between base backup and streaming connection");
1467 michael@paquier.xyz 509 : 0 : pg_free(sysidentifier);
5064 magnus@hagander.net 510 : 0 : return false;
511 : : }
1467 michael@paquier.xyz 512 :CBC 136 : pg_free(sysidentifier);
513 : :
514 [ - + ]: 136 : if (stream->timeline > servertli)
515 : : {
2350 peter@eisentraut.org 516 :UBC 0 : pg_log_error("starting timeline %u is not present in the server",
517 : : stream->timeline);
5064 magnus@hagander.net 518 : 0 : return false;
519 : : }
520 : : }
521 : :
522 : : /*
523 : : * initialize flush position to starting point, it's the caller's
524 : : * responsibility that that's sane.
525 : : */
3466 magnus@hagander.net 526 :CBC 136 : lastFlushPosition = stream->startpos;
527 : :
528 : : while (1)
529 : : {
530 : : /*
531 : : * Fetch the timeline history file for this timeline, if we don't have
532 : : * it already. When streaming log to tar, this will always return
533 : : * false, as we are never streaming into an existing file and
534 : : * therefore there can be no pre-existing timeline history file.
535 : : */
536 [ + + ]: 137 : if (!existsTimeLineHistoryFile(stream))
537 : : {
538 : 3 : snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
4615 heikki.linnakangas@i 539 : 3 : res = PQexec(conn, query);
540 [ - + ]: 3 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
541 : : {
542 : : /* FIXME: we might send it ok, but get an error */
2350 peter@eisentraut.org 543 :UBC 0 : pg_log_error("could not send replication command \"%s\": %s",
544 : : "TIMELINE_HISTORY", PQresultErrorMessage(res));
4615 heikki.linnakangas@i 545 : 0 : PQclear(res);
546 : 0 : return false;
547 : : }
548 : :
549 : : /*
550 : : * The response to TIMELINE_HISTORY is a single row result set
551 : : * with two fields: filename and content
552 : : */
4615 heikki.linnakangas@i 553 [ + - - + ]:CBC 3 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
554 : : {
2350 peter@eisentraut.org 555 :UBC 0 : pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 : : PQntuples(res), PQnfields(res), 1, 2);
557 : : }
558 : :
559 : : /* Write the history file to disk */
3466 magnus@hagander.net 560 :CBC 3 : writeTimeLineHistoryFile(stream,
561 : : PQgetvalue(res, 0, 0),
562 : : PQgetvalue(res, 0, 1));
563 : :
4615 heikki.linnakangas@i 564 : 3 : PQclear(res);
565 : : }
566 : :
567 : : /*
568 : : * Before we start streaming from the requested location, check if the
569 : : * callback tells us to stop here.
570 : : */
3466 magnus@hagander.net 571 [ - + ]: 137 : if (stream->stream_stop(stream->startpos, stream->timeline, false))
4615 heikki.linnakangas@i 572 :UBC 0 : return true;
573 : :
574 : : /* Initiate the replication stream at specified location */
61 alvherre@kurilemu.de 575 :GNC 137 : snprintf(query, sizeof(query), "START_REPLICATION %s%X/%08X TIMELINE %u",
576 : : slotcmd,
1656 peter@eisentraut.org 577 :CBC 137 : LSN_FORMAT_ARGS(stream->startpos),
578 : : stream->timeline);
4615 heikki.linnakangas@i 579 : 137 : res = PQexec(conn, query);
580 [ + + ]: 137 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
581 : : {
2350 peter@eisentraut.org 582 : 1 : pg_log_error("could not send replication command \"%s\": %s",
583 : : "START_REPLICATION", PQresultErrorMessage(res));
4615 heikki.linnakangas@i 584 : 1 : PQclear(res);
585 : 1 : return false;
586 : : }
4804 magnus@hagander.net 587 : 136 : PQclear(res);
588 : :
589 : : /* Stream the WAL */
3466 590 : 136 : res = HandleCopyStream(conn, stream, &stoppos);
4513 rhaas@postgresql.org 591 [ - + ]: 136 : if (res == NULL)
4615 heikki.linnakangas@i 592 :UBC 0 : goto error;
593 : :
594 : : /*
595 : : * Streaming finished.
596 : : *
597 : : * There are two possible reasons for that: a controlled shutdown, or
598 : : * we reached the end of the current timeline. In case of
599 : : * end-of-timeline, the server sends a result set after Copy has
600 : : * finished, containing information about the next timeline. Read
601 : : * that, and restart streaming from the next timeline. In case of
602 : : * controlled shutdown, stop here.
603 : : */
4615 heikki.linnakangas@i 604 [ + + ]:CBC 136 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
605 : 1 : {
606 : : /*
607 : : * End-of-timeline. Read the next timeline's ID and starting
608 : : * position. Usually, the starting position will match the end of
609 : : * the previous timeline, but there are corner cases like if the
610 : : * server had sent us half of a WAL record, when it was promoted.
611 : : * The new timeline will begin at the end of the last complete
612 : : * record in that case, overlapping the partial WAL record on the
613 : : * old timeline.
614 : : */
615 : : uint32 newtimeline;
616 : : bool parsed;
617 : :
3466 magnus@hagander.net 618 : 1 : parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
4615 heikki.linnakangas@i 619 : 1 : PQclear(res);
4504 620 [ - + ]: 1 : if (!parsed)
4504 heikki.linnakangas@i 621 :UBC 0 : goto error;
622 : :
623 : : /* Sanity check the values the server gave us */
3466 magnus@hagander.net 624 [ - + ]:CBC 1 : if (newtimeline <= stream->timeline)
625 : : {
2350 peter@eisentraut.org 626 :UBC 0 : pg_log_error("server reported unexpected next timeline %u, following timeline %u",
627 : : newtimeline, stream->timeline);
4504 heikki.linnakangas@i 628 : 0 : goto error;
629 : : }
3466 magnus@hagander.net 630 [ - + ]:CBC 1 : if (stream->startpos > stoppos)
631 : : {
61 alvherre@kurilemu.de 632 :UNC 0 : pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
633 : : stream->timeline, LSN_FORMAT_ARGS(stoppos),
634 : : newtimeline, LSN_FORMAT_ARGS(stream->startpos));
4615 heikki.linnakangas@i 635 :UBC 0 : goto error;
636 : : }
637 : :
638 : : /* Read the final result, which should be CommandComplete. */
4615 heikki.linnakangas@i 639 :CBC 1 : res = PQgetResult(conn);
640 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
641 : : {
2350 peter@eisentraut.org 642 :UBC 0 : pg_log_error("unexpected termination of replication stream: %s",
643 : : PQresultErrorMessage(res));
4053 fujii@postgresql.org 644 : 0 : PQclear(res);
4615 heikki.linnakangas@i 645 : 0 : goto error;
646 : : }
4615 heikki.linnakangas@i 647 :CBC 1 : PQclear(res);
648 : :
649 : : /*
650 : : * Loop back to start streaming from the new timeline. Always
651 : : * start streaming at the beginning of a segment.
652 : : */
3466 magnus@hagander.net 653 : 1 : stream->timeline = newtimeline;
2909 andres@anarazel.de 654 : 1 : stream->startpos = stream->startpos -
655 : 1 : XLogSegmentOffset(stream->startpos, WalSegSz);
4615 heikki.linnakangas@i 656 : 1 : continue;
657 : : }
658 [ + + ]: 135 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
659 : : {
4053 fujii@postgresql.org 660 : 134 : PQclear(res);
661 : :
662 : : /*
663 : : * End of replication (ie. controlled shut down of the server).
664 : : *
665 : : * Check if the callback thinks it's OK to stop here. If not,
666 : : * complain.
667 : : */
3466 magnus@hagander.net 668 [ + - ]: 134 : if (stream->stream_stop(stoppos, stream->timeline, false))
4615 heikki.linnakangas@i 669 : 134 : return true;
670 : : else
671 : : {
2350 peter@eisentraut.org 672 :UBC 0 : pg_log_error("replication stream was terminated before stop point");
4615 heikki.linnakangas@i 673 : 0 : goto error;
674 : : }
675 : : }
676 : : else
677 : : {
678 : : /* Server returned an error. */
2350 peter@eisentraut.org 679 :CBC 1 : pg_log_error("unexpected termination of replication stream: %s",
680 : : PQresultErrorMessage(res));
4053 fujii@postgresql.org 681 : 1 : PQclear(res);
4615 heikki.linnakangas@i 682 : 1 : goto error;
683 : : }
684 : : }
685 : :
686 : 1 : error:
1083 rhaas@postgresql.org 687 [ - + - - ]: 1 : if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
2350 peter@eisentraut.org 688 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
689 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
3240 magnus@hagander.net 690 :CBC 1 : walfile = NULL;
4615 heikki.linnakangas@i 691 : 1 : return false;
692 : : }
693 : :
694 : : /*
695 : : * Helper function to parse the result set returned by server after streaming
696 : : * has finished. On failure, prints an error to stderr and returns false.
697 : : */
698 : : static bool
4504 699 : 1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
700 : : {
701 : : uint32 startpos_xlogid,
702 : : startpos_xrecoff;
703 : :
704 : : /*----------
705 : : * The result set consists of one row and two columns, e.g:
706 : : *
707 : : * next_tli | next_tli_startpos
708 : : * ----------+-------------------
709 : : * 4 | 0/9949AE0
710 : : *
711 : : * next_tli is the timeline ID of the next timeline after the one that
712 : : * just finished streaming. next_tli_startpos is the WAL location where
713 : : * the server switched to it.
714 : : *----------
715 : : */
716 [ + - - + ]: 1 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
717 : : {
2350 peter@eisentraut.org 718 :UBC 0 : pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
719 : : PQntuples(res), PQnfields(res), 1, 2);
4504 heikki.linnakangas@i 720 : 0 : return false;
721 : : }
722 : :
4504 heikki.linnakangas@i 723 :CBC 1 : *timeline = atoi(PQgetvalue(res, 0, 0));
61 alvherre@kurilemu.de 724 [ - + ]:GNC 1 : if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
725 : : &startpos_xrecoff) != 2)
726 : : {
2350 peter@eisentraut.org 727 :UBC 0 : pg_log_error("could not parse next timeline's starting point \"%s\"",
728 : : PQgetvalue(res, 0, 1));
4504 heikki.linnakangas@i 729 : 0 : return false;
730 : : }
4504 heikki.linnakangas@i 731 :CBC 1 : *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
732 : :
733 : 1 : return true;
734 : : }
735 : :
736 : : /*
737 : : * The main loop of ReceiveXlogStream. Handles the COPY stream after
738 : : * initiating streaming with the START_REPLICATION command.
739 : : *
740 : : * If the COPY ends (not necessarily successfully) due a message from the
741 : : * server, returns a PGresult and sets *stoppos to the last byte written.
742 : : * On any other sort of error, returns NULL.
743 : : */
744 : : static PGresult *
3466 magnus@hagander.net 745 : 136 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
746 : : XLogRecPtr *stoppos)
747 : : {
4615 heikki.linnakangas@i 748 : 136 : char *copybuf = NULL;
3117 tgl@sss.pgh.pa.us 749 : 136 : TimestampTz last_status = -1;
3466 magnus@hagander.net 750 : 136 : XLogRecPtr blockpos = stream->startpos;
751 : :
4049 fujii@postgresql.org 752 : 136 : still_sending = true;
753 : :
754 : : while (1)
5064 magnus@hagander.net 755 : 1159 : {
756 : : int r;
757 : : TimestampTz now;
758 : : long sleeptime;
759 : :
760 : : /*
761 : : * Check if we should continue streaming, or abort at this point.
762 : : */
1822 peter@eisentraut.org 763 [ - + ]: 1295 : if (!CheckCopyStreamStop(conn, stream, blockpos))
4047 fujii@postgresql.org 764 :UBC 0 : goto error;
765 : :
4047 fujii@postgresql.org 766 :CBC 1295 : now = feGetCurrentTimestamp();
767 : :
768 : : /*
769 : : * If synchronous option is true, issue sync command as soon as there
770 : : * are WAL data which has not been flushed yet.
771 : : */
3240 magnus@hagander.net 772 [ + + - + : 1295 : if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
- - ]
773 : : {
1083 rhaas@postgresql.org 774 [ # # ]:UBC 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1247 tgl@sss.pgh.pa.us 775 : 0 : pg_fatal("could not fsync file \"%s\": %s",
776 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
4047 fujii@postgresql.org 777 : 0 : lastFlushPosition = blockpos;
778 : :
779 : : /*
780 : : * Send feedback so that the server sees the latest WAL locations
781 : : * immediately.
782 : : */
3945 783 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
784 : 0 : goto error;
785 : 0 : last_status = now;
786 : : }
787 : :
788 : : /*
789 : : * Potentially send a status message to the primary
790 : : */
3466 magnus@hagander.net 791 [ + + + - :CBC 2511 : if (still_sending && stream->standby_message_timeout > 0 &&
+ + ]
4190 rhaas@postgresql.org 792 : 1216 : feTimestampDifferenceExceeds(last_status, now,
793 : : stream->standby_message_timeout))
794 : : {
795 : : /* Time to send feedback! */
4685 heikki.linnakangas@i 796 [ - + ]: 136 : if (!sendFeedback(conn, blockpos, now, false))
4804 magnus@hagander.net 797 :UBC 0 : goto error;
5064 magnus@hagander.net 798 :CBC 136 : last_status = now;
799 : : }
800 : :
801 : : /*
802 : : * Calculate how long send/receive loops should sleep
803 : : */
3466 804 : 1295 : sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
805 : : last_status);
806 : :
807 : : /* Done with any prior message */
206 tgl@sss.pgh.pa.us 808 : 1295 : PQfreemem(copybuf);
809 : 1295 : copybuf = NULL;
810 : :
3054 811 : 1295 : r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
4047 fujii@postgresql.org 812 [ + + ]: 2619 : while (r != 0)
813 : : {
814 [ - + ]: 1460 : if (r == -1)
4049 fujii@postgresql.org 815 :UBC 0 : goto error;
4047 fujii@postgresql.org 816 [ + + ]:CBC 1460 : if (r == -2)
817 : : {
3466 magnus@hagander.net 818 : 136 : PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
819 : :
4047 fujii@postgresql.org 820 [ - + ]: 136 : if (res == NULL)
4047 fujii@postgresql.org 821 :UBC 0 : goto error;
206 tgl@sss.pgh.pa.us 822 :CBC 136 : PQfreemem(copybuf);
823 : 136 : return res;
824 : : }
825 : :
826 : : /* Check the message type. */
31 nathan@postgresql.or 827 [ - + ]:GNC 1324 : if (copybuf[0] == PqReplMsg_Keepalive)
828 : : {
3264 peter_e@gmx.net 829 [ # # ]:UBC 0 : if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
830 : : &last_status))
4047 fujii@postgresql.org 831 : 0 : goto error;
832 : : }
31 nathan@postgresql.or 833 [ + - ]:GNC 1324 : else if (copybuf[0] == PqReplMsg_WALData)
834 : : {
33 alvherre@kurilemu.de 835 [ - + ]: 1324 : if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
4047 fujii@postgresql.org 836 :UBC 0 : goto error;
837 : :
838 : : /*
839 : : * Check if we should continue streaming, or abort at this
840 : : * point.
841 : : */
1822 peter@eisentraut.org 842 [ - + ]:CBC 1324 : if (!CheckCopyStreamStop(conn, stream, blockpos))
4047 fujii@postgresql.org 843 :UBC 0 : goto error;
844 : : }
845 : : else
846 : : {
2350 peter@eisentraut.org 847 : 0 : pg_log_error("unrecognized streaming header: \"%c\"",
848 : : copybuf[0]);
4804 magnus@hagander.net 849 : 0 : goto error;
850 : : }
851 : :
852 : : /* Done with that message */
206 tgl@sss.pgh.pa.us 853 :CBC 1324 : PQfreemem(copybuf);
854 : 1324 : copybuf = NULL;
855 : :
856 : : /*
857 : : * Process the received data, and any subsequent data we can read
858 : : * without blocking.
859 : : */
3054 860 : 1324 : r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
861 : : }
862 : : }
863 : :
4804 magnus@hagander.net 864 :UBC 0 : error:
1107 peter@eisentraut.org 865 : 0 : PQfreemem(copybuf);
4513 rhaas@postgresql.org 866 : 0 : return NULL;
867 : : }
868 : :
869 : : /*
870 : : * Wait until we can read a CopyData message,
871 : : * or timeout, or occurrence of a signal or input on the stop_socket.
872 : : * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
873 : : *
874 : : * Returns 1 if data has become available for reading, 0 if timed out
875 : : * or interrupted by signal or stop_socket input, and -1 on an error.
876 : : */
877 : : static int
3054 tgl@sss.pgh.pa.us 878 :CBC 2433 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
879 : : {
880 : : int ret;
881 : : fd_set input_mask;
882 : : int connsocket;
883 : : int maxfd;
884 : : struct timeval timeout;
885 : : struct timeval *timeoutptr;
886 : :
887 : 2433 : connsocket = PQsocket(conn);
888 [ - + ]: 2433 : if (connsocket < 0)
889 : : {
2350 peter@eisentraut.org 890 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
4082 fujii@postgresql.org 891 : 0 : return -1;
892 : : }
893 : :
4082 fujii@postgresql.org 894 [ + + ]:CBC 41361 : FD_ZERO(&input_mask);
3054 tgl@sss.pgh.pa.us 895 : 2433 : FD_SET(connsocket, &input_mask);
896 : 2433 : maxfd = connsocket;
897 [ + + ]: 2433 : if (stop_socket != PGINVALID_SOCKET)
898 : : {
899 : 2300 : FD_SET(stop_socket, &input_mask);
900 : 2300 : maxfd = Max(maxfd, stop_socket);
901 : : }
902 : :
4082 fujii@postgresql.org 903 [ + + ]: 2433 : if (timeout_ms < 0)
904 : 79 : timeoutptr = NULL;
905 : : else
906 : : {
907 : 2354 : timeout.tv_sec = timeout_ms / 1000L;
908 : 2354 : timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
909 : 2354 : timeoutptr = &timeout;
910 : : }
911 : :
3054 tgl@sss.pgh.pa.us 912 : 2433 : ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
913 : :
914 [ - + ]: 2433 : if (ret < 0)
915 : : {
3054 tgl@sss.pgh.pa.us 916 [ # # ]:UBC 0 : if (errno == EINTR)
917 : 0 : return 0; /* Got a signal, so not an error */
1597 peter@eisentraut.org 918 : 0 : pg_log_error("%s() failed: %m", "select");
4082 fujii@postgresql.org 919 : 0 : return -1;
920 : : }
3054 tgl@sss.pgh.pa.us 921 [ + + + + ]:CBC 2433 : if (ret > 0 && FD_ISSET(connsocket, &input_mask))
922 : 1847 : return 1; /* Got input on connection socket */
923 : :
924 : 586 : return 0; /* Got timeout or input on stop_socket */
925 : : }
926 : :
927 : : /*
928 : : * Receive CopyData message available from XLOG stream, blocking for
929 : : * maximum of 'timeout' ms.
930 : : *
931 : : * If data was received, returns the length of the data. *buffer is set to
932 : : * point to a buffer holding the received message. The caller must eventually
933 : : * free the buffer with PQfreemem().
934 : : *
935 : : * Returns 0 if no data was available within timeout, or if wait was
936 : : * interrupted by signal or stop_socket input.
937 : : * -1 on error. -2 if the server ended the COPY.
938 : : */
939 : : static int
940 : 2619 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
941 : : char **buffer)
942 : : {
4082 fujii@postgresql.org 943 : 2619 : char *copybuf = NULL;
944 : : int rawlen;
945 : :
946 : : /* Caller should have cleared any prior buffer */
206 tgl@sss.pgh.pa.us 947 [ - + ]: 2619 : Assert(*buffer == NULL);
948 : :
949 : : /* Try to receive a CopyData message */
4082 fujii@postgresql.org 950 : 2619 : rawlen = PQgetCopyData(conn, ©buf, 1);
951 [ + + ]: 2619 : if (rawlen == 0)
952 : : {
953 : : int ret;
954 : :
955 : : /*
956 : : * No data available. Wait for some to appear, but not longer than
957 : : * the specified timeout, so that we can ping the server. Also stop
958 : : * waiting if input appears on stop_socket.
959 : : */
3054 tgl@sss.pgh.pa.us 960 : 2433 : ret = CopyStreamPoll(conn, timeout, stop_socket);
961 [ + + ]: 2433 : if (ret <= 0)
962 : 586 : return ret;
963 : :
964 : : /* Now there is actually data on the socket */
4082 fujii@postgresql.org 965 [ - + ]: 1847 : if (PQconsumeInput(conn) == 0)
966 : : {
2350 peter@eisentraut.org 967 :UBC 0 : pg_log_error("could not receive data from WAL stream: %s",
968 : : PQerrorMessage(conn));
4082 fujii@postgresql.org 969 : 0 : return -1;
970 : : }
971 : :
972 : : /* Now that we've consumed some input, try again */
4082 fujii@postgresql.org 973 :CBC 1847 : rawlen = PQgetCopyData(conn, ©buf, 1);
974 [ + + ]: 1847 : if (rawlen == 0)
975 : 573 : return 0;
976 : : }
977 [ + + ]: 1460 : if (rawlen == -1) /* end-of-streaming or error */
978 : 136 : return -2;
979 [ - + ]: 1324 : if (rawlen == -2)
980 : : {
2350 peter@eisentraut.org 981 :UBC 0 : pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
4082 fujii@postgresql.org 982 : 0 : return -1;
983 : : }
984 : :
985 : : /* Return received messages to caller */
4082 fujii@postgresql.org 986 :CBC 1324 : *buffer = copybuf;
987 : 1324 : return rawlen;
988 : : }
989 : :
990 : : /*
991 : : * Process the keepalive message.
992 : : */
993 : : static bool
3264 peter_e@gmx.net 994 :UBC 0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
995 : : XLogRecPtr blockpos, TimestampTz *last_status)
996 : : {
997 : : int pos;
998 : : bool replyRequested;
999 : : TimestampTz now;
1000 : :
1001 : : /*
1002 : : * Parse the keepalive message, enclosed in the CopyData message. We just
1003 : : * check if the server requested a reply, and ignore the rest.
1004 : : */
31 nathan@postgresql.or 1005 :UNC 0 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
3759 bruce@momjian.us 1006 :UBC 0 : pos += 8; /* skip walEnd */
1007 : 0 : pos += 8; /* skip sendTime */
1008 : :
4049 fujii@postgresql.org 1009 [ # # ]: 0 : if (len < pos + 1)
1010 : : {
2350 peter@eisentraut.org 1011 : 0 : pg_log_error("streaming header too small: %d", len);
4049 fujii@postgresql.org 1012 : 0 : return false;
1013 : : }
1014 : 0 : replyRequested = copybuf[pos];
1015 : :
1016 : : /* If the server requested an immediate reply, send one. */
1017 [ # # # # ]: 0 : if (replyRequested && still_sending)
1018 : : {
3944 1019 [ # # # # ]: 0 : if (reportFlushPosition && lastFlushPosition < blockpos &&
3240 magnus@hagander.net 1020 [ # # ]: 0 : walfile != NULL)
1021 : : {
1022 : : /*
1023 : : * If a valid flush location needs to be reported, flush the
1024 : : * current WAL file so that the latest flush location is sent back
1025 : : * to the server. This is necessary to see whether the last WAL
1026 : : * data has been successfully replicated or not, at the normal
1027 : : * shutdown of the server.
1028 : : */
1083 rhaas@postgresql.org 1029 [ # # ]: 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1247 tgl@sss.pgh.pa.us 1030 : 0 : pg_fatal("could not fsync file \"%s\": %s",
1031 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
3944 fujii@postgresql.org 1032 : 0 : lastFlushPosition = blockpos;
1033 : : }
1034 : :
4049 1035 : 0 : now = feGetCurrentTimestamp();
1036 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
1037 : 0 : return false;
1038 : 0 : *last_status = now;
1039 : : }
1040 : :
1041 : 0 : return true;
1042 : : }
1043 : :
1044 : : /*
1045 : : * Process WALData message.
1046 : : */
1047 : : static bool
33 alvherre@kurilemu.de 1048 :GNC 1324 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1049 : : XLogRecPtr *blockpos)
1050 : : {
1051 : : int xlogoff;
1052 : : int bytes_left;
1053 : : int bytes_written;
1054 : : int hdr_len;
1055 : :
1056 : : /*
1057 : : * Once we've decided we don't want to receive any more, just ignore any
1058 : : * subsequent WALData messages.
1059 : : */
4049 fujii@postgresql.org 1060 [ + + ]:CBC 1324 : if (!(still_sending))
1061 : 189 : return true;
1062 : :
1063 : : /*
1064 : : * Read the header of the WALData message, enclosed in the CopyData
1065 : : * message. We only need the WAL location field (dataStart), the rest of
1066 : : * the header is ignored.
1067 : : */
31 nathan@postgresql.or 1068 :GNC 1135 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
3759 bruce@momjian.us 1069 :CBC 1135 : hdr_len += 8; /* dataStart */
1070 : 1135 : hdr_len += 8; /* walEnd */
1071 : 1135 : hdr_len += 8; /* sendTime */
4049 fujii@postgresql.org 1072 [ - + ]: 1135 : if (len < hdr_len)
1073 : : {
2350 peter@eisentraut.org 1074 :UBC 0 : pg_log_error("streaming header too small: %d", len);
4049 fujii@postgresql.org 1075 : 0 : return false;
1076 : : }
4049 fujii@postgresql.org 1077 :CBC 1135 : *blockpos = fe_recvint64(©buf[1]);
1078 : :
1079 : : /* Extract WAL location for this block */
2909 andres@anarazel.de 1080 : 1135 : xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1081 : :
1082 : : /*
1083 : : * Verify that the initial location in the stream matches where we think
1084 : : * we are.
1085 : : */
3240 magnus@hagander.net 1086 [ + + ]: 1135 : if (walfile == NULL)
1087 : : {
1088 : : /* No file open yet */
4049 fujii@postgresql.org 1089 [ - + ]: 142 : if (xlogoff != 0)
1090 : : {
2350 peter@eisentraut.org 1091 :UBC 0 : pg_log_error("received write-ahead log record for offset %u with no file open",
1092 : : xlogoff);
4049 fujii@postgresql.org 1093 : 0 : return false;
1094 : : }
1095 : : }
1096 : : else
1097 : : {
1098 : : /* More data in existing segment */
1083 rhaas@postgresql.org 1099 [ - + ]:CBC 993 : if (walfile->currpos != xlogoff)
1100 : : {
2350 peter@eisentraut.org 1101 :UBC 0 : pg_log_error("got WAL data offset %08x, expected %08x",
1102 : : xlogoff, (int) walfile->currpos);
4049 fujii@postgresql.org 1103 : 0 : return false;
1104 : : }
1105 : : }
1106 : :
4049 fujii@postgresql.org 1107 :CBC 1135 : bytes_left = len - hdr_len;
1108 : 1135 : bytes_written = 0;
1109 : :
1110 [ + + ]: 2270 : while (bytes_left)
1111 : : {
1112 : : int bytes_to_write;
1113 : :
1114 : : /*
1115 : : * If crossing a WAL boundary, only write up until we reach wal
1116 : : * segment size.
1117 : : */
2909 andres@anarazel.de 1118 [ - + ]: 1135 : if (xlogoff + bytes_left > WalSegSz)
2909 andres@anarazel.de 1119 :UBC 0 : bytes_to_write = WalSegSz - xlogoff;
1120 : : else
4049 fujii@postgresql.org 1121 :CBC 1135 : bytes_to_write = bytes_left;
1122 : :
3240 magnus@hagander.net 1123 [ + + ]: 1135 : if (walfile == NULL)
1124 : : {
3466 1125 [ - + ]: 142 : if (!open_walfile(stream, *blockpos))
1126 : : {
1127 : : /* Error logged by open_walfile */
4049 fujii@postgresql.org 1128 :UBC 0 : return false;
1129 : : }
1130 : : }
1131 : :
1083 rhaas@postgresql.org 1132 :CBC 2270 : if (stream->walmethod->ops->write(walfile,
1133 : 1135 : copybuf + hdr_len + bytes_written,
1134 [ - + ]: 1135 : bytes_to_write) != bytes_to_write)
1135 : : {
1389 peter@eisentraut.org 1136 :UBC 0 : pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1137 : : bytes_to_write, walfile->pathname,
1138 : : GetLastWalMethodError(stream->walmethod));
4049 fujii@postgresql.org 1139 : 0 : return false;
1140 : : }
1141 : :
1142 : : /* Write was successful, advance our position */
4049 fujii@postgresql.org 1143 :CBC 1135 : bytes_written += bytes_to_write;
1144 : 1135 : bytes_left -= bytes_to_write;
1145 : 1135 : *blockpos += bytes_to_write;
1146 : 1135 : xlogoff += bytes_to_write;
1147 : :
1148 : : /* Did we reach the end of a WAL segment? */
2909 andres@anarazel.de 1149 [ + + ]: 1135 : if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1150 : : {
3466 magnus@hagander.net 1151 [ - + ]: 10 : if (!close_walfile(stream, *blockpos))
1152 : : /* Error message written in close_walfile() */
4049 fujii@postgresql.org 1153 :UBC 0 : return false;
1154 : :
4049 fujii@postgresql.org 1155 :CBC 10 : xlogoff = 0;
1156 : :
3466 magnus@hagander.net 1157 [ + - - + ]: 10 : if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1158 : : {
4049 fujii@postgresql.org 1159 [ # # # # ]:UBC 0 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1160 : : {
2350 peter@eisentraut.org 1161 : 0 : pg_log_error("could not send copy-end packet: %s",
1162 : : PQerrorMessage(conn));
4049 fujii@postgresql.org 1163 : 0 : return false;
1164 : : }
1165 : 0 : still_sending = false;
33 alvherre@kurilemu.de 1166 :UNC 0 : return true; /* ignore the rest of this WALData packet */
1167 : : }
1168 : : }
1169 : : }
1170 : : /* No more data left to write, receive next copy packet */
1171 : :
4049 fujii@postgresql.org 1172 :CBC 1135 : return true;
1173 : : }
1174 : :
1175 : : /*
1176 : : * Handle end of the copy stream.
1177 : : */
1178 : : static PGresult *
3466 magnus@hagander.net 1179 : 136 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1180 : : XLogRecPtr blockpos, XLogRecPtr *stoppos)
1181 : : {
4049 fujii@postgresql.org 1182 : 136 : PGresult *res = PQgetResult(conn);
1183 : :
1184 : : /*
1185 : : * The server closed its end of the copy stream. If we haven't closed
1186 : : * ours already, we need to do so now, unless the server threw an error,
1187 : : * in which case we don't.
1188 : : */
1189 [ + + ]: 136 : if (still_sending)
1190 : : {
3466 magnus@hagander.net 1191 [ - + ]: 2 : if (!close_walfile(stream, blockpos))
1192 : : {
1193 : : /* Error message written in close_walfile() */
4049 fujii@postgresql.org 1194 :UBC 0 : PQclear(res);
1195 : 0 : return NULL;
1196 : : }
4049 fujii@postgresql.org 1197 [ + + ]:CBC 2 : if (PQresultStatus(res) == PGRES_COPY_IN)
1198 : : {
1199 [ + - - + ]: 1 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1200 : : {
2350 peter@eisentraut.org 1201 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1202 : : PQerrorMessage(conn));
4049 fujii@postgresql.org 1203 : 0 : PQclear(res);
1204 : 0 : return NULL;
1205 : : }
4049 fujii@postgresql.org 1206 :CBC 1 : res = PQgetResult(conn);
1207 : : }
1208 : 2 : still_sending = false;
1209 : : }
1210 : 136 : *stoppos = blockpos;
1211 : 136 : return res;
1212 : : }
1213 : :
1214 : : /*
1215 : : * Check if we should continue streaming, or abort at this point.
1216 : : */
1217 : : static bool
1822 peter@eisentraut.org 1218 : 2619 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1219 : : {
3466 magnus@hagander.net 1220 [ + + + + ]: 2619 : if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1221 : : {
1222 [ - + ]: 134 : if (!close_walfile(stream, blockpos))
1223 : : {
1224 : : /* Potential error message is written by close_walfile */
4047 fujii@postgresql.org 1225 :UBC 0 : return false;
1226 : : }
4047 fujii@postgresql.org 1227 [ + - - + ]:CBC 134 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1228 : : {
2350 peter@eisentraut.org 1229 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1230 : : PQerrorMessage(conn));
4047 fujii@postgresql.org 1231 : 0 : return false;
1232 : : }
4047 fujii@postgresql.org 1233 :CBC 134 : still_sending = false;
1234 : : }
1235 : :
1236 : 2619 : return true;
1237 : : }
1238 : :
1239 : : /*
1240 : : * Calculate how long send/receive loops should sleep
1241 : : */
1242 : : static long
3117 tgl@sss.pgh.pa.us 1243 : 1295 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1244 : : TimestampTz last_status)
1245 : : {
1246 : 1295 : TimestampTz status_targettime = 0;
1247 : : long sleeptime;
1248 : :
4047 fujii@postgresql.org 1249 [ + - + + ]: 1295 : if (standby_message_timeout && still_sending)
1250 : 1216 : status_targettime = last_status +
1251 : 1216 : (standby_message_timeout - 1) * ((int64) 1000);
1252 : :
3945 1253 [ + + ]: 1295 : if (status_targettime > 0)
1254 : : {
1255 : : long secs;
1256 : : int usecs;
1257 : :
4047 1258 : 1216 : feTimestampDifference(now,
1259 : : status_targettime,
1260 : : &secs,
1261 : : &usecs);
1262 : : /* Always sleep at least 1 sec */
1263 [ - + ]: 1216 : if (secs <= 0)
1264 : : {
4047 fujii@postgresql.org 1265 :UBC 0 : secs = 1;
1266 : 0 : usecs = 0;
1267 : : }
1268 : :
4047 fujii@postgresql.org 1269 :CBC 1216 : sleeptime = secs * 1000 + usecs / 1000;
1270 : : }
1271 : : else
1272 : 79 : sleeptime = -1;
1273 : :
1274 : 1295 : return sleeptime;
1275 : : }
|