LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB DCB
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 63.5 % 427 271 3 153 8 263 3 8
Current Date: 2025-09-06 07:49:51 +0900 Functions: 94.1 % 17 16 1 5 11 1
Baseline: lcov-20250906-005545-baseline Branches: 56.2 % 260 146 4 110 4 142
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 83.3 % 18 15 3 8 7
(360..) days: 62.6 % 409 256 153 256
Function coverage date bins:
(30,360] days: 100.0 % 1 1 1
(360..) days: 93.8 % 16 15 1 4 11
Branch coverage date bins:
(30,360] days: 50.0 % 10 5 4 1 4 1
(360..) days: 56.4 % 250 141 109 141

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * receivelog.c - receive WAL files using the streaming
                                  4                 :                :  *                replication protocol.
                                  5                 :                :  *
                                  6                 :                :  * Author: Magnus Hagander <magnus@hagander.net>
                                  7                 :                :  *
                                  8                 :                :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  * IDENTIFICATION
                                 11                 :                :  *        src/bin/pg_basebackup/receivelog.c
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : #include "postgres_fe.h"
                                 16                 :                : 
                                 17                 :                : #include <sys/select.h>
                                 18                 :                : #include <sys/stat.h>
                                 19                 :                : #include <unistd.h>
                                 20                 :                : 
                                 21                 :                : #include "access/xlog_internal.h"
                                 22                 :                : #include "common/logging.h"
                                 23                 :                : #include "libpq-fe.h"
                                 24                 :                : #include "libpq/protocol.h"
                                 25                 :                : #include "receivelog.h"
                                 26                 :                : #include "streamutil.h"
                                 27                 :                : 
                                 28                 :                : /* currently open WAL file */
                                 29                 :                : static Walfile *walfile = NULL;
                                 30                 :                : static bool reportFlushPosition = false;
                                 31                 :                : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
                                 32                 :                : 
                                 33                 :                : static bool still_sending = true;   /* feedback still needs to be sent? */
                                 34                 :                : 
                                 35                 :                : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
                                 36                 :                :                                   XLogRecPtr *stoppos);
                                 37                 :                : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
                                 38                 :                : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                                 39                 :                :                               char **buffer);
                                 40                 :                : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
                                 41                 :                :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
                                 42                 :                : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                                 43                 :                :                               XLogRecPtr *blockpos);
                                 44                 :                : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                                 45                 :                :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
                                 46                 :                : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
                                 47                 :                : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
                                 48                 :                :                                          TimestampTz last_status);
                                 49                 :                : 
                                 50                 :                : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                 51                 :                :                                      uint32 *timeline);
                                 52                 :                : 
                                 53                 :                : static bool
 3240 magnus@hagander.net        54                 :CBC           6 : mark_file_as_archived(StreamCtl *stream, const char *fname)
                                 55                 :                : {
                                 56                 :                :     Walfile    *f;
                                 57                 :                :     static char tmppath[MAXPGPATH];
                                 58                 :                : 
                                 59                 :              6 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
                                 60                 :                :              fname);
                                 61                 :                : 
 1083 rhaas@postgresql.org       62                 :              6 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
                                 63                 :                :                                                NULL, 0);
 3240 magnus@hagander.net        64         [ -  + ]:              6 :     if (f == NULL)
                                 65                 :                :     {
 2350 peter@eisentraut.org       66                 :UBC           0 :         pg_log_error("could not create archive status file \"%s\": %s",
                                 67                 :                :                      tmppath, GetLastWalMethodError(stream->walmethod));
 3899 andres@anarazel.de         68                 :              0 :         return false;
                                 69                 :                :     }
                                 70                 :                : 
 1083 rhaas@postgresql.org       71         [ -  + ]:CBC           6 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
                                 72                 :                :     {
 1389 tgl@sss.pgh.pa.us          73                 :UBC           0 :         pg_log_error("could not close archive status file \"%s\": %s",
                                 74                 :                :                      tmppath, GetLastWalMethodError(stream->walmethod));
                                 75                 :              0 :         return false;
                                 76                 :                :     }
                                 77                 :                : 
 3899 andres@anarazel.de         78                 :CBC           6 :     return true;
                                 79                 :                : }
                                 80                 :                : 
                                 81                 :                : /*
                                 82                 :                :  * Open a new WAL file in the specified directory.
                                 83                 :                :  *
                                 84                 :                :  * Returns true if OK; on failure, returns false after printing an error msg.
                                 85                 :                :  * On success, 'walfile' is set to the opened WAL file.
                                 86                 :                :  *
                                 87                 :                :  * The file will be padded to 16Mb with zeroes.
                                 88                 :                :  */
                                 89                 :                : static bool
 3466 magnus@hagander.net        90                 :            142 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
                                 91                 :                : {
                                 92                 :                :     Walfile    *f;
                                 93                 :                :     char       *fn;
                                 94                 :                :     ssize_t     size;
                                 95                 :                :     XLogSegNo   segno;
                                 96                 :                :     char        walfile_name[MAXPGPATH];
                                 97                 :                : 
 2909 andres@anarazel.de         98                 :            142 :     XLByteToSeg(startpoint, segno, WalSegSz);
 1083 rhaas@postgresql.org       99                 :            142 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
                                100                 :                : 
                                101                 :                :     /* Note that this considers the compression used if necessary */
                                102                 :            142 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
                                103                 :                :                                                walfile_name,
                                104                 :            142 :                                                stream->partial_suffix);
                                105                 :                : 
                                106                 :                :     /*
                                107                 :                :      * When streaming to files, if an existing file exists we verify that it's
                                108                 :                :      * either empty (just created), or a complete WalSegSz segment (in which
                                109                 :                :      * case it has been created and padded). Anything else indicates a corrupt
                                110                 :                :      * file. Compressed files have no need for padding, so just ignore this
                                111                 :                :      * case.
                                112                 :                :      *
                                113                 :                :      * When streaming to tar, no file with this name will exist before, so we
                                114                 :                :      * never have to verify a size.
                                115                 :                :      */
                                116   [ +  +  -  + ]:            277 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
                                117                 :            135 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
                                118                 :                :     {
 1083 rhaas@postgresql.org      119                 :UBC           0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
 3240 magnus@hagander.net       120         [ #  # ]:              0 :         if (size < 0)
                                121                 :                :         {
 2350 peter@eisentraut.org      122                 :              0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
                                123                 :                :                          fn, GetLastWalMethodError(stream->walmethod));
 1503 michael@paquier.xyz       124                 :              0 :             pg_free(fn);
 3240 magnus@hagander.net       125                 :              0 :             return false;
                                126                 :                :         }
 2909 andres@anarazel.de        127         [ #  # ]:              0 :         if (size == WalSegSz)
                                128                 :                :         {
                                129                 :                :             /* Already padded file. Open it for use */
 1083 rhaas@postgresql.org      130                 :              0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
 3240 magnus@hagander.net       131         [ #  # ]:              0 :             if (f == NULL)
                                132                 :                :             {
 2350 peter@eisentraut.org      133                 :              0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
                                134                 :                :                              fn, GetLastWalMethodError(stream->walmethod));
 1503 michael@paquier.xyz       135                 :              0 :                 pg_free(fn);
 3261 tgl@sss.pgh.pa.us         136                 :              0 :                 return false;
                                137                 :                :             }
                                138                 :                : 
                                139                 :                :             /* fsync file in case of a previous crash */
 1083 rhaas@postgresql.org      140         [ #  # ]:              0 :             if (stream->walmethod->ops->sync(f) != 0)
                                141                 :                :             {
 1247 tgl@sss.pgh.pa.us         142                 :              0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
                                143                 :                :                              fn, GetLastWalMethodError(stream->walmethod));
 1083 rhaas@postgresql.org      144                 :              0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
 2231 peter@eisentraut.org      145                 :              0 :                 exit(1);
                                146                 :                :             }
                                147                 :                : 
 3240 magnus@hagander.net       148                 :              0 :             walfile = f;
 1503 michael@paquier.xyz       149                 :              0 :             pg_free(fn);
 3240 magnus@hagander.net       150                 :              0 :             return true;
                                151                 :                :         }
                                152         [ #  # ]:              0 :         if (size != 0)
                                153                 :                :         {
                                154                 :                :             /* if write didn't set errno, assume problem is no disk space */
 3261 tgl@sss.pgh.pa.us         155         [ #  # ]:              0 :             if (errno == 0)
                                156                 :              0 :                 errno = ENOSPC;
 1490 peter@eisentraut.org      157                 :              0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
                                158                 :                :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
                                159                 :                :                                   size),
                                160                 :                :                          fn, size, WalSegSz);
 1503 michael@paquier.xyz       161                 :              0 :             pg_free(fn);
 4615 heikki.linnakangas@i      162                 :              0 :             return false;
                                163                 :                :         }
                                164                 :                :         /* File existed and was empty, so fall through and open */
                                165                 :                :     }
                                166                 :                : 
                                167                 :                :     /* No file existed, so create one */
                                168                 :                : 
 1083 rhaas@postgresql.org      169                 :CBC         142 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
                                170                 :                :                                                walfile_name,
                                171                 :            142 :                                                stream->partial_suffix,
                                172                 :                :                                                WalSegSz);
 3240 magnus@hagander.net       173         [ -  + ]:            142 :     if (f == NULL)
                                174                 :                :     {
 2350 peter@eisentraut.org      175                 :UBC           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
                                176                 :                :                      fn, GetLastWalMethodError(stream->walmethod));
 1503 michael@paquier.xyz       177                 :              0 :         pg_free(fn);
 4615 heikki.linnakangas@i      178                 :              0 :         return false;
                                179                 :                :     }
                                180                 :                : 
 1503 michael@paquier.xyz       181                 :CBC         142 :     pg_free(fn);
 4615 heikki.linnakangas@i      182                 :            142 :     walfile = f;
                                183                 :            142 :     return true;
                                184                 :                : }
                                185                 :                : 
                                186                 :                : /*
                                187                 :                :  * Close the current WAL file (if open), and rename it to the correct
                                188                 :                :  * filename if it's complete. On failure, prints an error message to stderr
                                189                 :                :  * and returns false, otherwise returns true.
                                190                 :                :  */
                                191                 :                : static bool
 3466 magnus@hagander.net       192                 :            146 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
                                193                 :                : {
                                194                 :                :     char       *fn;
                                195                 :                :     pgoff_t     currpos;
                                196                 :                :     int         r;
                                197                 :                :     char        walfile_name[MAXPGPATH];
                                198                 :                : 
 3240                           199         [ +  + ]:            146 :     if (walfile == NULL)
 4615 heikki.linnakangas@i      200                 :              4 :         return true;
                                201                 :                : 
 1083 rhaas@postgresql.org      202                 :            142 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
                                203                 :            142 :     currpos = walfile->currpos;
                                204                 :                : 
                                205                 :                :     /* Note that this considers the compression used if necessary */
                                206                 :            142 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
                                207                 :                :                                                walfile_name,
                                208                 :            142 :                                                stream->partial_suffix);
                                209                 :                : 
 3240 magnus@hagander.net       210         [ +  + ]:            142 :     if (stream->partial_suffix)
                                211                 :                :     {
 2909 andres@anarazel.de        212         [ +  + ]:             12 :         if (currpos == WalSegSz)
 1083 rhaas@postgresql.org      213                 :              6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
                                214                 :                :         else
                                215                 :                :         {
 1450 michael@paquier.xyz       216                 :              6 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
 1083 rhaas@postgresql.org      217                 :              6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
                                218                 :                :         }
                                219                 :                :     }
                                220                 :                :     else
                                221                 :            130 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
                                222                 :                : 
 3240 magnus@hagander.net       223                 :            142 :     walfile = NULL;
                                224                 :                : 
                                225         [ -  + ]:            142 :     if (r != 0)
                                226                 :                :     {
 2350 peter@eisentraut.org      227                 :UBC           0 :         pg_log_error("could not close file \"%s\": %s",
                                228                 :                :                      fn, GetLastWalMethodError(stream->walmethod));
                                229                 :                : 
 1450 michael@paquier.xyz       230                 :              0 :         pg_free(fn);
 5056 magnus@hagander.net       231                 :              0 :         return false;
                                232                 :                :     }
                                233                 :                : 
 1450 michael@paquier.xyz       234                 :CBC         142 :     pg_free(fn);
                                235                 :                : 
                                236                 :                :     /*
                                237                 :                :      * Mark file as archived if requested by the caller - pg_basebackup needs
                                238                 :                :      * to do so as files can otherwise get archived again after promotion of a
                                239                 :                :      * new node. This is in line with walreceiver.c always doing a
                                240                 :                :      * XLogArchiveForceDone() after a complete segment.
                                241                 :                :      */
 2909 andres@anarazel.de        242   [ +  +  +  + ]:            142 :     if (currpos == WalSegSz && stream->mark_done)
                                243                 :                :     {
                                244                 :                :         /* writes error message if failed */
 1083 rhaas@postgresql.org      245         [ -  + ]:              4 :         if (!mark_file_as_archived(stream, walfile_name))
 3899 andres@anarazel.de        246                 :UBC           0 :             return false;
                                247                 :                :     }
                                248                 :                : 
 4236 rhaas@postgresql.org      249                 :CBC         142 :     lastFlushPosition = pos;
 5056 magnus@hagander.net       250                 :            142 :     return true;
                                251                 :                : }
                                252                 :                : 
                                253                 :                : 
                                254                 :                : /*
                                255                 :                :  * Check if a timeline history file exists.
                                256                 :                :  */
                                257                 :                : static bool
 3466                           258                 :            137 : existsTimeLineHistoryFile(StreamCtl *stream)
                                259                 :                : {
                                260                 :                :     char        histfname[MAXFNAMELEN];
                                261                 :                : 
                                262                 :                :     /*
                                263                 :                :      * Timeline 1 never has a history file. We treat that as if it existed,
                                264                 :                :      * since we never need to stream it.
                                265                 :                :      */
                                266         [ +  + ]:            137 :     if (stream->timeline == 1)
 4615 heikki.linnakangas@i      267                 :            134 :         return true;
                                268                 :                : 
 3466 magnus@hagander.net       269                 :              3 :     TLHistoryFileName(histfname, stream->timeline);
                                270                 :                : 
 1083 rhaas@postgresql.org      271                 :              3 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
                                272                 :                : }
                                273                 :                : 
                                274                 :                : static bool
 3466 magnus@hagander.net       275                 :              3 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
                                276                 :                : {
 4615 heikki.linnakangas@i      277                 :              3 :     int         size = strlen(content);
                                278                 :                :     char        histfname[MAXFNAMELEN];
                                279                 :                :     Walfile    *f;
                                280                 :                : 
                                281                 :                :     /*
                                282                 :                :      * Check that the server's idea of how timeline history files should be
                                283                 :                :      * named matches ours.
                                284                 :                :      */
 3466 magnus@hagander.net       285                 :              3 :     TLHistoryFileName(histfname, stream->timeline);
 4615 heikki.linnakangas@i      286         [ -  + ]:              3 :     if (strcmp(histfname, filename) != 0)
                                287                 :                :     {
 2350 peter@eisentraut.org      288                 :UBC           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
                                289                 :                :                      stream->timeline, filename);
 4615 heikki.linnakangas@i      290                 :              0 :         return false;
                                291                 :                :     }
                                292                 :                : 
 1083 rhaas@postgresql.org      293                 :CBC           3 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
                                294                 :                :                                                histfname, ".tmp", 0);
 3240 magnus@hagander.net       295         [ -  + ]:              3 :     if (f == NULL)
                                296                 :                :     {
 2350 peter@eisentraut.org      297                 :UBC           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
                                298                 :                :                      histfname, GetLastWalMethodError(stream->walmethod));
 4615 heikki.linnakangas@i      299                 :              0 :         return false;
                                300                 :                :     }
                                301                 :                : 
 1083 rhaas@postgresql.org      302         [ -  + ]:CBC           3 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
                                303                 :                :     {
 2350 peter@eisentraut.org      304                 :UBC           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
                                305                 :                :                      histfname, GetLastWalMethodError(stream->walmethod));
                                306                 :                : 
                                307                 :                :         /*
                                308                 :                :          * If we fail to make the file, delete it to release disk space
                                309                 :                :          */
 1083 rhaas@postgresql.org      310                 :              0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
                                311                 :                : 
 4615 heikki.linnakangas@i      312                 :              0 :         return false;
                                313                 :                :     }
                                314                 :                : 
 1083 rhaas@postgresql.org      315         [ -  + ]:CBC           3 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
                                316                 :                :     {
 2350 peter@eisentraut.org      317                 :UBC           0 :         pg_log_error("could not close file \"%s\": %s",
                                318                 :                :                      histfname, GetLastWalMethodError(stream->walmethod));
 4615 heikki.linnakangas@i      319                 :              0 :         return false;
                                320                 :                :     }
                                321                 :                : 
                                322                 :                :     /* Maintain archive_status, check close_walfile() for details. */
 3466 magnus@hagander.net       323         [ +  + ]:CBC           3 :     if (stream->mark_done)
                                324                 :                :     {
                                325                 :                :         /* writes error message if failed */
 3240                           326         [ -  + ]:              2 :         if (!mark_file_as_archived(stream, histfname))
 3899 andres@anarazel.de        327                 :UBC           0 :             return false;
                                328                 :                :     }
                                329                 :                : 
 4615 heikki.linnakangas@i      330                 :CBC           3 :     return true;
                                331                 :                : }
                                332                 :                : 
                                333                 :                : /*
                                334                 :                :  * Send a Standby Status Update message to server.
                                335                 :                :  */
                                336                 :                : static bool
 3117 tgl@sss.pgh.pa.us         337                 :            136 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
                                338                 :                : {
                                339                 :                :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 4483 bruce@momjian.us          340                 :            136 :     int         len = 0;
                                341                 :                : 
   31 nathan@postgresql.or      342                 :GNC         136 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
 4686 heikki.linnakangas@i      343                 :CBC         136 :     len += 1;
 2999 tgl@sss.pgh.pa.us         344                 :            136 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
 4686 heikki.linnakangas@i      345                 :            136 :     len += 8;
 4236 rhaas@postgresql.org      346         [ +  + ]:            136 :     if (reportFlushPosition)
 2999 tgl@sss.pgh.pa.us         347                 :            132 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
                                348                 :                :     else
                                349                 :              4 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
 4686 heikki.linnakangas@i      350                 :            136 :     len += 8;
 4141 bruce@momjian.us          351                 :            136 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 4686 heikki.linnakangas@i      352                 :            136 :     len += 8;
 4141 bruce@momjian.us          353                 :            136 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 4686 heikki.linnakangas@i      354                 :            136 :     len += 8;
 2999 tgl@sss.pgh.pa.us         355                 :            136 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 4686 heikki.linnakangas@i      356                 :            136 :     len += 1;
                                357                 :                : 
                                358   [ +  -  -  + ]:            136 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                359                 :                :     {
 2350 peter@eisentraut.org      360                 :UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                361                 :                :                      PQerrorMessage(conn));
 4686 heikki.linnakangas@i      362                 :              0 :         return false;
                                363                 :                :     }
                                364                 :                : 
 4686 heikki.linnakangas@i      365                 :CBC         136 :     return true;
                                366                 :                : }
                                367                 :                : 
                                368                 :                : /*
                                369                 :                :  * Check that the server version we're connected to is supported by
                                370                 :                :  * ReceiveXlogStream().
                                371                 :                :  *
                                372                 :                :  * If it's not, an error message is printed to stderr, and false is returned.
                                373                 :                :  */
                                374                 :                : bool
 4551                           375                 :            293 : CheckServerVersionForStreaming(PGconn *conn)
                                376                 :                : {
                                377                 :                :     int         minServerMajor,
                                378                 :                :                 maxServerMajor;
                                379                 :                :     int         serverMajor;
                                380                 :                : 
                                381                 :                :     /*
                                382                 :                :      * The message format used in streaming replication changed in 9.3, so we
                                383                 :                :      * cannot stream from older servers. And we don't support servers newer
                                384                 :                :      * than the client; it might work, but we don't know, so err on the safe
                                385                 :                :      * side.
                                386                 :                :      */
                                387                 :            293 :     minServerMajor = 903;
                                388                 :            293 :     maxServerMajor = PG_VERSION_NUM / 100;
                                389                 :            293 :     serverMajor = PQserverVersion(conn) / 100;
 4141 simon@2ndQuadrant.co      390         [ -  + ]:            293 :     if (serverMajor < minServerMajor)
                                391                 :                :     {
 4551 heikki.linnakangas@i      392                 :UBC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
                                393                 :                : 
 2350 peter@eisentraut.org      394         [ #  # ]:              0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
                                395                 :                :                      serverver ? serverver : "'unknown'",
                                396                 :                :                      "9.3");
 4141 simon@2ndQuadrant.co      397                 :              0 :         return false;
                                398                 :                :     }
 4141 simon@2ndQuadrant.co      399         [ -  + ]:CBC         293 :     else if (serverMajor > maxServerMajor)
                                400                 :                :     {
 4141 simon@2ndQuadrant.co      401                 :UBC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
                                402                 :                : 
 2350 peter@eisentraut.org      403         [ #  # ]:              0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
                                404                 :                :                      serverver ? serverver : "'unknown'",
                                405                 :                :                      PG_VERSION);
 4551 heikki.linnakangas@i      406                 :              0 :         return false;
                                407                 :                :     }
 4551 heikki.linnakangas@i      408                 :CBC         293 :     return true;
                                409                 :                : }
                                410                 :                : 
                                411                 :                : /*
                                412                 :                :  * Receive a log stream starting at the specified position.
                                413                 :                :  *
                                414                 :                :  * Individual parameters are passed through the StreamCtl structure.
                                415                 :                :  *
                                416                 :                :  * If sysidentifier is specified, validate that both the system
                                417                 :                :  * identifier and the timeline matches the specified ones
                                418                 :                :  * (by sending an extra IDENTIFY_SYSTEM command)
                                419                 :                :  *
                                420                 :                :  * All received segments will be written to the directory
                                421                 :                :  * specified by basedir. This will also fetch any missing timeline history
                                422                 :                :  * files.
                                423                 :                :  *
                                424                 :                :  * The stream_stop callback will be called every time data
                                425                 :                :  * is received, and whenever a segment is completed. If it returns
                                426                 :                :  * true, the streaming will stop and the function
                                427                 :                :  * return. As long as it returns false, streaming will continue
                                428                 :                :  * indefinitely.
                                429                 :                :  *
                                430                 :                :  * If stream_stop() checks for external input, stop_socket should be set to
                                431                 :                :  * the FD it checks.  This will allow such input to be detected promptly
                                432                 :                :  * rather than after standby_message_timeout (which might be indefinite).
                                433                 :                :  * Note that signals will interrupt waits for input as well, but that is
                                434                 :                :  * race-y since a signal received while busy won't interrupt the wait.
                                435                 :                :  *
                                436                 :                :  * standby_message_timeout controls how often we send a message
                                437                 :                :  * back to the primary letting it know our progress, in milliseconds.
                                438                 :                :  * Zero means no messages are sent.
                                439                 :                :  * This message will only contain the write location, and never
                                440                 :                :  * flush or replay.
                                441                 :                :  *
                                442                 :                :  * If 'partial_suffix' is not NULL, files are initially created with the
                                443                 :                :  * given suffix, and the suffix is removed once the file is finished. That
                                444                 :                :  * allows you to tell the difference between partial and completed files,
                                445                 :                :  * so that you can continue later where you left.
                                446                 :                :  *
                                447                 :                :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
                                448                 :                :  * otherwise only when the WAL file is closed.
                                449                 :                :  *
                                450                 :                :  * Note: The WAL location *must* be at a log segment start!
                                451                 :                :  */
                                452                 :                : bool
 3466 magnus@hagander.net       453                 :            136 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
                                454                 :                : {
                                455                 :                :     char        query[128];
                                456                 :                :     char        slotcmd[128];
                                457                 :                :     PGresult   *res;
                                458                 :                :     XLogRecPtr  stoppos;
                                459                 :                : 
                                460                 :                :     /*
                                461                 :                :      * The caller should've checked the server version already, but doesn't do
                                462                 :                :      * any harm to check it here too.
                                463                 :                :      */
 4551 heikki.linnakangas@i      464         [ -  + ]:            136 :     if (!CheckServerVersionForStreaming(conn))
 4617 heikki.linnakangas@i      465                 :UBC           0 :         return false;
                                466                 :                : 
                                467                 :                :     /*
                                468                 :                :      * Decide whether we want to report the flush position. If we report the
                                469                 :                :      * flush position, the primary will know what WAL we'll possibly
                                470                 :                :      * re-request, and it can then remove older WAL safely. We must always do
                                471                 :                :      * that when we are using slots.
                                472                 :                :      *
                                473                 :                :      * Reporting the flush position makes one eligible as a synchronous
                                474                 :                :      * replica. People shouldn't include generic names in
                                475                 :                :      * synchronous_standby_names, but we've protected them against it so far,
                                476                 :                :      * so let's continue to do so unless specifically requested.
                                477                 :                :      */
 3155 magnus@hagander.net       478         [ +  + ]:CBC         136 :     if (stream->replication_slot != NULL)
                                479                 :                :     {
 4236 rhaas@postgresql.org      480                 :            131 :         reportFlushPosition = true;
 3155 magnus@hagander.net       481                 :            131 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
                                482                 :                :     }
                                483                 :                :     else
                                484                 :                :     {
 3295 simon@2ndQuadrant.co      485         [ +  + ]:              5 :         if (stream->synchronous)
                                486                 :              1 :             reportFlushPosition = true;
                                487                 :                :         else
                                488                 :              4 :             reportFlushPosition = false;
 4236 rhaas@postgresql.org      489                 :              5 :         slotcmd[0] = 0;
                                490                 :                :     }
                                491                 :                : 
 3466 magnus@hagander.net       492         [ +  - ]:            136 :     if (stream->sysidentifier != NULL)
                                493                 :                :     {
 1467 michael@paquier.xyz       494                 :            136 :         char       *sysidentifier = NULL;
                                495                 :                :         TimeLineID  servertli;
                                496                 :                : 
                                497                 :                :         /*
                                498                 :                :          * Get the server system identifier and timeline, and validate them.
                                499                 :                :          */
                                500         [ -  + ]:            136 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
                                501                 :                :         {
 1467 michael@paquier.xyz       502                 :UBC           0 :             pg_free(sysidentifier);
 5064 magnus@hagander.net       503                 :              0 :             return false;
                                504                 :                :         }
                                505                 :                : 
 1467 michael@paquier.xyz       506         [ -  + ]:CBC         136 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
                                507                 :                :         {
 2350 peter@eisentraut.org      508                 :UBC           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
 1467 michael@paquier.xyz       509                 :              0 :             pg_free(sysidentifier);
 5064 magnus@hagander.net       510                 :              0 :             return false;
                                511                 :                :         }
 1467 michael@paquier.xyz       512                 :CBC         136 :         pg_free(sysidentifier);
                                513                 :                : 
                                514         [ -  + ]:            136 :         if (stream->timeline > servertli)
                                515                 :                :         {
 2350 peter@eisentraut.org      516                 :UBC           0 :             pg_log_error("starting timeline %u is not present in the server",
                                517                 :                :                          stream->timeline);
 5064 magnus@hagander.net       518                 :              0 :             return false;
                                519                 :                :         }
                                520                 :                :     }
                                521                 :                : 
                                522                 :                :     /*
                                523                 :                :      * initialize flush position to starting point, it's the caller's
                                524                 :                :      * responsibility that that's sane.
                                525                 :                :      */
 3466 magnus@hagander.net       526                 :CBC         136 :     lastFlushPosition = stream->startpos;
                                527                 :                : 
                                528                 :                :     while (1)
                                529                 :                :     {
                                530                 :                :         /*
                                531                 :                :          * Fetch the timeline history file for this timeline, if we don't have
                                532                 :                :          * it already. When streaming log to tar, this will always return
                                533                 :                :          * false, as we are never streaming into an existing file and
                                534                 :                :          * therefore there can be no pre-existing timeline history file.
                                535                 :                :          */
                                536         [ +  + ]:            137 :         if (!existsTimeLineHistoryFile(stream))
                                537                 :                :         {
                                538                 :              3 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
 4615 heikki.linnakangas@i      539                 :              3 :             res = PQexec(conn, query);
                                540         [ -  + ]:              3 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                541                 :                :             {
                                542                 :                :                 /* FIXME: we might send it ok, but get an error */
 2350 peter@eisentraut.org      543                 :UBC           0 :                 pg_log_error("could not send replication command \"%s\": %s",
                                544                 :                :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
 4615 heikki.linnakangas@i      545                 :              0 :                 PQclear(res);
                                546                 :              0 :                 return false;
                                547                 :                :             }
                                548                 :                : 
                                549                 :                :             /*
                                550                 :                :              * The response to TIMELINE_HISTORY is a single row result set
                                551                 :                :              * with two fields: filename and content
                                552                 :                :              */
 4615 heikki.linnakangas@i      553   [ +  -  -  + ]:CBC           3 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
                                554                 :                :             {
 2350 peter@eisentraut.org      555                 :UBC           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
                                556                 :                :                                PQntuples(res), PQnfields(res), 1, 2);
                                557                 :                :             }
                                558                 :                : 
                                559                 :                :             /* Write the history file to disk */
 3466 magnus@hagander.net       560                 :CBC           3 :             writeTimeLineHistoryFile(stream,
                                561                 :                :                                      PQgetvalue(res, 0, 0),
                                562                 :                :                                      PQgetvalue(res, 0, 1));
                                563                 :                : 
 4615 heikki.linnakangas@i      564                 :              3 :             PQclear(res);
                                565                 :                :         }
                                566                 :                : 
                                567                 :                :         /*
                                568                 :                :          * Before we start streaming from the requested location, check if the
                                569                 :                :          * callback tells us to stop here.
                                570                 :                :          */
 3466 magnus@hagander.net       571         [ -  + ]:            137 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
 4615 heikki.linnakangas@i      572                 :UBC           0 :             return true;
                                573                 :                : 
                                574                 :                :         /* Initiate the replication stream at specified location */
   61 alvherre@kurilemu.de      575                 :GNC         137 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%08X TIMELINE %u",
                                576                 :                :                  slotcmd,
 1656 peter@eisentraut.org      577                 :CBC         137 :                  LSN_FORMAT_ARGS(stream->startpos),
                                578                 :                :                  stream->timeline);
 4615 heikki.linnakangas@i      579                 :            137 :         res = PQexec(conn, query);
                                580         [ +  + ]:            137 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                581                 :                :         {
 2350 peter@eisentraut.org      582                 :              1 :             pg_log_error("could not send replication command \"%s\": %s",
                                583                 :                :                          "START_REPLICATION", PQresultErrorMessage(res));
 4615 heikki.linnakangas@i      584                 :              1 :             PQclear(res);
                                585                 :              1 :             return false;
                                586                 :                :         }
 4804 magnus@hagander.net       587                 :            136 :         PQclear(res);
                                588                 :                : 
                                589                 :                :         /* Stream the WAL */
 3466                           590                 :            136 :         res = HandleCopyStream(conn, stream, &stoppos);
 4513 rhaas@postgresql.org      591         [ -  + ]:            136 :         if (res == NULL)
 4615 heikki.linnakangas@i      592                 :UBC           0 :             goto error;
                                593                 :                : 
                                594                 :                :         /*
                                595                 :                :          * Streaming finished.
                                596                 :                :          *
                                597                 :                :          * There are two possible reasons for that: a controlled shutdown, or
                                598                 :                :          * we reached the end of the current timeline. In case of
                                599                 :                :          * end-of-timeline, the server sends a result set after Copy has
                                600                 :                :          * finished, containing information about the next timeline. Read
                                601                 :                :          * that, and restart streaming from the next timeline. In case of
                                602                 :                :          * controlled shutdown, stop here.
                                603                 :                :          */
 4615 heikki.linnakangas@i      604         [ +  + ]:CBC         136 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
                                605                 :              1 :         {
                                606                 :                :             /*
                                607                 :                :              * End-of-timeline. Read the next timeline's ID and starting
                                608                 :                :              * position. Usually, the starting position will match the end of
                                609                 :                :              * the previous timeline, but there are corner cases like if the
                                610                 :                :              * server had sent us half of a WAL record, when it was promoted.
                                611                 :                :              * The new timeline will begin at the end of the last complete
                                612                 :                :              * record in that case, overlapping the partial WAL record on the
                                613                 :                :              * old timeline.
                                614                 :                :              */
                                615                 :                :             uint32      newtimeline;
                                616                 :                :             bool        parsed;
                                617                 :                : 
 3466 magnus@hagander.net       618                 :              1 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
 4615 heikki.linnakangas@i      619                 :              1 :             PQclear(res);
 4504                           620         [ -  + ]:              1 :             if (!parsed)
 4504 heikki.linnakangas@i      621                 :UBC           0 :                 goto error;
                                622                 :                : 
                                623                 :                :             /* Sanity check the values the server gave us */
 3466 magnus@hagander.net       624         [ -  + ]:CBC           1 :             if (newtimeline <= stream->timeline)
                                625                 :                :             {
 2350 peter@eisentraut.org      626                 :UBC           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
                                627                 :                :                              newtimeline, stream->timeline);
 4504 heikki.linnakangas@i      628                 :              0 :                 goto error;
                                629                 :                :             }
 3466 magnus@hagander.net       630         [ -  + ]:CBC           1 :             if (stream->startpos > stoppos)
                                631                 :                :             {
   61 alvherre@kurilemu.de      632                 :UNC           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
                                633                 :                :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
                                634                 :                :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
 4615 heikki.linnakangas@i      635                 :UBC           0 :                 goto error;
                                636                 :                :             }
                                637                 :                : 
                                638                 :                :             /* Read the final result, which should be CommandComplete. */
 4615 heikki.linnakangas@i      639                 :CBC           1 :             res = PQgetResult(conn);
                                640         [ -  + ]:              1 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                641                 :                :             {
 2350 peter@eisentraut.org      642                 :UBC           0 :                 pg_log_error("unexpected termination of replication stream: %s",
                                643                 :                :                              PQresultErrorMessage(res));
 4053 fujii@postgresql.org      644                 :              0 :                 PQclear(res);
 4615 heikki.linnakangas@i      645                 :              0 :                 goto error;
                                646                 :                :             }
 4615 heikki.linnakangas@i      647                 :CBC           1 :             PQclear(res);
                                648                 :                : 
                                649                 :                :             /*
                                650                 :                :              * Loop back to start streaming from the new timeline. Always
                                651                 :                :              * start streaming at the beginning of a segment.
                                652                 :                :              */
 3466 magnus@hagander.net       653                 :              1 :             stream->timeline = newtimeline;
 2909 andres@anarazel.de        654                 :              1 :             stream->startpos = stream->startpos -
                                655                 :              1 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
 4615 heikki.linnakangas@i      656                 :              1 :             continue;
                                657                 :                :         }
                                658         [ +  + ]:            135 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
                                659                 :                :         {
 4053 fujii@postgresql.org      660                 :            134 :             PQclear(res);
                                661                 :                : 
                                662                 :                :             /*
                                663                 :                :              * End of replication (ie. controlled shut down of the server).
                                664                 :                :              *
                                665                 :                :              * Check if the callback thinks it's OK to stop here. If not,
                                666                 :                :              * complain.
                                667                 :                :              */
 3466 magnus@hagander.net       668         [ +  - ]:            134 :             if (stream->stream_stop(stoppos, stream->timeline, false))
 4615 heikki.linnakangas@i      669                 :            134 :                 return true;
                                670                 :                :             else
                                671                 :                :             {
 2350 peter@eisentraut.org      672                 :UBC           0 :                 pg_log_error("replication stream was terminated before stop point");
 4615 heikki.linnakangas@i      673                 :              0 :                 goto error;
                                674                 :                :             }
                                675                 :                :         }
                                676                 :                :         else
                                677                 :                :         {
                                678                 :                :             /* Server returned an error. */
 2350 peter@eisentraut.org      679                 :CBC           1 :             pg_log_error("unexpected termination of replication stream: %s",
                                680                 :                :                          PQresultErrorMessage(res));
 4053 fujii@postgresql.org      681                 :              1 :             PQclear(res);
 4615 heikki.linnakangas@i      682                 :              1 :             goto error;
                                683                 :                :         }
                                684                 :                :     }
                                685                 :                : 
                                686                 :              1 : error:
 1083 rhaas@postgresql.org      687   [ -  +  -  - ]:              1 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
 2350 peter@eisentraut.org      688                 :UBC           0 :         pg_log_error("could not close file \"%s\": %s",
                                689                 :                :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
 3240 magnus@hagander.net       690                 :CBC           1 :     walfile = NULL;
 4615 heikki.linnakangas@i      691                 :              1 :     return false;
                                692                 :                : }
                                693                 :                : 
                                694                 :                : /*
                                695                 :                :  * Helper function to parse the result set returned by server after streaming
                                696                 :                :  * has finished. On failure, prints an error to stderr and returns false.
                                697                 :                :  */
                                698                 :                : static bool
 4504                           699                 :              1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
                                700                 :                : {
                                701                 :                :     uint32      startpos_xlogid,
                                702                 :                :                 startpos_xrecoff;
                                703                 :                : 
                                704                 :                :     /*----------
                                705                 :                :      * The result set consists of one row and two columns, e.g:
                                706                 :                :      *
                                707                 :                :      *  next_tli | next_tli_startpos
                                708                 :                :      * ----------+-------------------
                                709                 :                :      *         4 | 0/9949AE0
                                710                 :                :      *
                                711                 :                :      * next_tli is the timeline ID of the next timeline after the one that
                                712                 :                :      * just finished streaming. next_tli_startpos is the WAL location where
                                713                 :                :      * the server switched to it.
                                714                 :                :      *----------
                                715                 :                :      */
                                716   [ +  -  -  + ]:              1 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
                                717                 :                :     {
 2350 peter@eisentraut.org      718                 :UBC           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
                                719                 :                :                      PQntuples(res), PQnfields(res), 1, 2);
 4504 heikki.linnakangas@i      720                 :              0 :         return false;
                                721                 :                :     }
                                722                 :                : 
 4504 heikki.linnakangas@i      723                 :CBC           1 :     *timeline = atoi(PQgetvalue(res, 0, 0));
   61 alvherre@kurilemu.de      724         [ -  + ]:GNC           1 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
                                725                 :                :                &startpos_xrecoff) != 2)
                                726                 :                :     {
 2350 peter@eisentraut.org      727                 :UBC           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
                                728                 :                :                      PQgetvalue(res, 0, 1));
 4504 heikki.linnakangas@i      729                 :              0 :         return false;
                                730                 :                :     }
 4504 heikki.linnakangas@i      731                 :CBC           1 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
                                732                 :                : 
                                733                 :              1 :     return true;
                                734                 :                : }
                                735                 :                : 
                                736                 :                : /*
                                737                 :                :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
                                738                 :                :  * initiating streaming with the START_REPLICATION command.
                                739                 :                :  *
                                740                 :                :  * If the COPY ends (not necessarily successfully) due a message from the
                                741                 :                :  * server, returns a PGresult and sets *stoppos to the last byte written.
                                742                 :                :  * On any other sort of error, returns NULL.
                                743                 :                :  */
                                744                 :                : static PGresult *
 3466 magnus@hagander.net       745                 :            136 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
                                746                 :                :                  XLogRecPtr *stoppos)
                                747                 :                : {
 4615 heikki.linnakangas@i      748                 :            136 :     char       *copybuf = NULL;
 3117 tgl@sss.pgh.pa.us         749                 :            136 :     TimestampTz last_status = -1;
 3466 magnus@hagander.net       750                 :            136 :     XLogRecPtr  blockpos = stream->startpos;
                                751                 :                : 
 4049 fujii@postgresql.org      752                 :            136 :     still_sending = true;
                                753                 :                : 
                                754                 :                :     while (1)
 5064 magnus@hagander.net       755                 :           1159 :     {
                                756                 :                :         int         r;
                                757                 :                :         TimestampTz now;
                                758                 :                :         long        sleeptime;
                                759                 :                : 
                                760                 :                :         /*
                                761                 :                :          * Check if we should continue streaming, or abort at this point.
                                762                 :                :          */
 1822 peter@eisentraut.org      763         [ -  + ]:           1295 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
 4047 fujii@postgresql.org      764                 :UBC           0 :             goto error;
                                765                 :                : 
 4047 fujii@postgresql.org      766                 :CBC        1295 :         now = feGetCurrentTimestamp();
                                767                 :                : 
                                768                 :                :         /*
                                769                 :                :          * If synchronous option is true, issue sync command as soon as there
                                770                 :                :          * are WAL data which has not been flushed yet.
                                771                 :                :          */
 3240 magnus@hagander.net       772   [ +  +  -  +  :           1295 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
                                              -  - ]
                                773                 :                :         {
 1083 rhaas@postgresql.org      774         [ #  # ]:UBC           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
 1247 tgl@sss.pgh.pa.us         775                 :              0 :                 pg_fatal("could not fsync file \"%s\": %s",
                                776                 :                :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
 4047 fujii@postgresql.org      777                 :              0 :             lastFlushPosition = blockpos;
                                778                 :                : 
                                779                 :                :             /*
                                780                 :                :              * Send feedback so that the server sees the latest WAL locations
                                781                 :                :              * immediately.
                                782                 :                :              */
 3945                           783         [ #  # ]:              0 :             if (!sendFeedback(conn, blockpos, now, false))
                                784                 :              0 :                 goto error;
                                785                 :              0 :             last_status = now;
                                786                 :                :         }
                                787                 :                : 
                                788                 :                :         /*
                                789                 :                :          * Potentially send a status message to the primary
                                790                 :                :          */
 3466 magnus@hagander.net       791   [ +  +  +  -  :CBC        2511 :         if (still_sending && stream->standby_message_timeout > 0 &&
                                              +  + ]
 4190 rhaas@postgresql.org      792                 :           1216 :             feTimestampDifferenceExceeds(last_status, now,
                                793                 :                :                                          stream->standby_message_timeout))
                                794                 :                :         {
                                795                 :                :             /* Time to send feedback! */
 4685 heikki.linnakangas@i      796         [ -  + ]:            136 :             if (!sendFeedback(conn, blockpos, now, false))
 4804 magnus@hagander.net       797                 :UBC           0 :                 goto error;
 5064 magnus@hagander.net       798                 :CBC         136 :             last_status = now;
                                799                 :                :         }
                                800                 :                : 
                                801                 :                :         /*
                                802                 :                :          * Calculate how long send/receive loops should sleep
                                803                 :                :          */
 3466                           804                 :           1295 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                805                 :                :                                                  last_status);
                                806                 :                : 
                                807                 :                :         /* Done with any prior message */
  206 tgl@sss.pgh.pa.us         808                 :           1295 :         PQfreemem(copybuf);
                                809                 :           1295 :         copybuf = NULL;
                                810                 :                : 
 3054                           811                 :           1295 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
 4047 fujii@postgresql.org      812         [ +  + ]:           2619 :         while (r != 0)
                                813                 :                :         {
                                814         [ -  + ]:           1460 :             if (r == -1)
 4049 fujii@postgresql.org      815                 :UBC           0 :                 goto error;
 4047 fujii@postgresql.org      816         [ +  + ]:CBC        1460 :             if (r == -2)
                                817                 :                :             {
 3466 magnus@hagander.net       818                 :            136 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
                                819                 :                : 
 4047 fujii@postgresql.org      820         [ -  + ]:            136 :                 if (res == NULL)
 4047 fujii@postgresql.org      821                 :UBC           0 :                     goto error;
  206 tgl@sss.pgh.pa.us         822                 :CBC         136 :                 PQfreemem(copybuf);
                                823                 :            136 :                 return res;
                                824                 :                :             }
                                825                 :                : 
                                826                 :                :             /* Check the message type. */
   31 nathan@postgresql.or      827         [ -  + ]:GNC        1324 :             if (copybuf[0] == PqReplMsg_Keepalive)
                                828                 :                :             {
 3264 peter_e@gmx.net           829         [ #  # ]:UBC           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                830                 :                :                                          &last_status))
 4047 fujii@postgresql.org      831                 :              0 :                     goto error;
                                832                 :                :             }
   31 nathan@postgresql.or      833         [ +  - ]:GNC        1324 :             else if (copybuf[0] == PqReplMsg_WALData)
                                834                 :                :             {
   33 alvherre@kurilemu.de      835         [ -  + ]:           1324 :                 if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
 4047 fujii@postgresql.org      836                 :UBC           0 :                     goto error;
                                837                 :                : 
                                838                 :                :                 /*
                                839                 :                :                  * Check if we should continue streaming, or abort at this
                                840                 :                :                  * point.
                                841                 :                :                  */
 1822 peter@eisentraut.org      842         [ -  + ]:CBC        1324 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
 4047 fujii@postgresql.org      843                 :UBC           0 :                     goto error;
                                844                 :                :             }
                                845                 :                :             else
                                846                 :                :             {
 2350 peter@eisentraut.org      847                 :              0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
                                848                 :                :                              copybuf[0]);
 4804 magnus@hagander.net       849                 :              0 :                 goto error;
                                850                 :                :             }
                                851                 :                : 
                                852                 :                :             /* Done with that message */
  206 tgl@sss.pgh.pa.us         853                 :CBC        1324 :             PQfreemem(copybuf);
                                854                 :           1324 :             copybuf = NULL;
                                855                 :                : 
                                856                 :                :             /*
                                857                 :                :              * Process the received data, and any subsequent data we can read
                                858                 :                :              * without blocking.
                                859                 :                :              */
 3054                           860                 :           1324 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
                                861                 :                :         }
                                862                 :                :     }
                                863                 :                : 
 4804 magnus@hagander.net       864                 :UBC           0 : error:
 1107 peter@eisentraut.org      865                 :              0 :     PQfreemem(copybuf);
 4513 rhaas@postgresql.org      866                 :              0 :     return NULL;
                                867                 :                : }
                                868                 :                : 
                                869                 :                : /*
                                870                 :                :  * Wait until we can read a CopyData message,
                                871                 :                :  * or timeout, or occurrence of a signal or input on the stop_socket.
                                872                 :                :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
                                873                 :                :  *
                                874                 :                :  * Returns 1 if data has become available for reading, 0 if timed out
                                875                 :                :  * or interrupted by signal or stop_socket input, and -1 on an error.
                                876                 :                :  */
                                877                 :                : static int
 3054 tgl@sss.pgh.pa.us         878                 :CBC        2433 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
                                879                 :                : {
                                880                 :                :     int         ret;
                                881                 :                :     fd_set      input_mask;
                                882                 :                :     int         connsocket;
                                883                 :                :     int         maxfd;
                                884                 :                :     struct timeval timeout;
                                885                 :                :     struct timeval *timeoutptr;
                                886                 :                : 
                                887                 :           2433 :     connsocket = PQsocket(conn);
                                888         [ -  + ]:           2433 :     if (connsocket < 0)
                                889                 :                :     {
 2350 peter@eisentraut.org      890                 :UBC           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 4082 fujii@postgresql.org      891                 :              0 :         return -1;
                                892                 :                :     }
                                893                 :                : 
 4082 fujii@postgresql.org      894         [ +  + ]:CBC       41361 :     FD_ZERO(&input_mask);
 3054 tgl@sss.pgh.pa.us         895                 :           2433 :     FD_SET(connsocket, &input_mask);
                                896                 :           2433 :     maxfd = connsocket;
                                897         [ +  + ]:           2433 :     if (stop_socket != PGINVALID_SOCKET)
                                898                 :                :     {
                                899                 :           2300 :         FD_SET(stop_socket, &input_mask);
                                900                 :           2300 :         maxfd = Max(maxfd, stop_socket);
                                901                 :                :     }
                                902                 :                : 
 4082 fujii@postgresql.org      903         [ +  + ]:           2433 :     if (timeout_ms < 0)
                                904                 :             79 :         timeoutptr = NULL;
                                905                 :                :     else
                                906                 :                :     {
                                907                 :           2354 :         timeout.tv_sec = timeout_ms / 1000L;
                                908                 :           2354 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
                                909                 :           2354 :         timeoutptr = &timeout;
                                910                 :                :     }
                                911                 :                : 
 3054 tgl@sss.pgh.pa.us         912                 :           2433 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
                                913                 :                : 
                                914         [ -  + ]:           2433 :     if (ret < 0)
                                915                 :                :     {
 3054 tgl@sss.pgh.pa.us         916         [ #  # ]:UBC           0 :         if (errno == EINTR)
                                917                 :              0 :             return 0;           /* Got a signal, so not an error */
 1597 peter@eisentraut.org      918                 :              0 :         pg_log_error("%s() failed: %m", "select");
 4082 fujii@postgresql.org      919                 :              0 :         return -1;
                                920                 :                :     }
 3054 tgl@sss.pgh.pa.us         921   [ +  +  +  + ]:CBC        2433 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
                                922                 :           1847 :         return 1;               /* Got input on connection socket */
                                923                 :                : 
                                924                 :            586 :     return 0;                   /* Got timeout or input on stop_socket */
                                925                 :                : }
                                926                 :                : 
                                927                 :                : /*
                                928                 :                :  * Receive CopyData message available from XLOG stream, blocking for
                                929                 :                :  * maximum of 'timeout' ms.
                                930                 :                :  *
                                931                 :                :  * If data was received, returns the length of the data. *buffer is set to
                                932                 :                :  * point to a buffer holding the received message. The caller must eventually
                                933                 :                :  * free the buffer with PQfreemem().
                                934                 :                :  *
                                935                 :                :  * Returns 0 if no data was available within timeout, or if wait was
                                936                 :                :  * interrupted by signal or stop_socket input.
                                937                 :                :  * -1 on error. -2 if the server ended the COPY.
                                938                 :                :  */
                                939                 :                : static int
                                940                 :           2619 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                                941                 :                :                   char **buffer)
                                942                 :                : {
 4082 fujii@postgresql.org      943                 :           2619 :     char       *copybuf = NULL;
                                944                 :                :     int         rawlen;
                                945                 :                : 
                                946                 :                :     /* Caller should have cleared any prior buffer */
  206 tgl@sss.pgh.pa.us         947         [ -  + ]:           2619 :     Assert(*buffer == NULL);
                                948                 :                : 
                                949                 :                :     /* Try to receive a CopyData message */
 4082 fujii@postgresql.org      950                 :           2619 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
                                951         [ +  + ]:           2619 :     if (rawlen == 0)
                                952                 :                :     {
                                953                 :                :         int         ret;
                                954                 :                : 
                                955                 :                :         /*
                                956                 :                :          * No data available.  Wait for some to appear, but not longer than
                                957                 :                :          * the specified timeout, so that we can ping the server.  Also stop
                                958                 :                :          * waiting if input appears on stop_socket.
                                959                 :                :          */
 3054 tgl@sss.pgh.pa.us         960                 :           2433 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
                                961         [ +  + ]:           2433 :         if (ret <= 0)
                                962                 :            586 :             return ret;
                                963                 :                : 
                                964                 :                :         /* Now there is actually data on the socket */
 4082 fujii@postgresql.org      965         [ -  + ]:           1847 :         if (PQconsumeInput(conn) == 0)
                                966                 :                :         {
 2350 peter@eisentraut.org      967                 :UBC           0 :             pg_log_error("could not receive data from WAL stream: %s",
                                968                 :                :                          PQerrorMessage(conn));
 4082 fujii@postgresql.org      969                 :              0 :             return -1;
                                970                 :                :         }
                                971                 :                : 
                                972                 :                :         /* Now that we've consumed some input, try again */
 4082 fujii@postgresql.org      973                 :CBC        1847 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
                                974         [ +  + ]:           1847 :         if (rawlen == 0)
                                975                 :            573 :             return 0;
                                976                 :                :     }
                                977         [ +  + ]:           1460 :     if (rawlen == -1)           /* end-of-streaming or error */
                                978                 :            136 :         return -2;
                                979         [ -  + ]:           1324 :     if (rawlen == -2)
                                980                 :                :     {
 2350 peter@eisentraut.org      981                 :UBC           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
 4082 fujii@postgresql.org      982                 :              0 :         return -1;
                                983                 :                :     }
                                984                 :                : 
                                985                 :                :     /* Return received messages to caller */
 4082 fujii@postgresql.org      986                 :CBC        1324 :     *buffer = copybuf;
                                987                 :           1324 :     return rawlen;
                                988                 :                : }
                                989                 :                : 
                                990                 :                : /*
                                991                 :                :  * Process the keepalive message.
                                992                 :                :  */
                                993                 :                : static bool
 3264 peter_e@gmx.net           994                 :UBC           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                                995                 :                :                     XLogRecPtr blockpos, TimestampTz *last_status)
                                996                 :                : {
                                997                 :                :     int         pos;
                                998                 :                :     bool        replyRequested;
                                999                 :                :     TimestampTz now;
                               1000                 :                : 
                               1001                 :                :     /*
                               1002                 :                :      * Parse the keepalive message, enclosed in the CopyData message. We just
                               1003                 :                :      * check if the server requested a reply, and ignore the rest.
                               1004                 :                :      */
   31 nathan@postgresql.or     1005                 :UNC           0 :     pos = 1;                    /* skip msgtype PqReplMsg_Keepalive */
 3759 bruce@momjian.us         1006                 :UBC           0 :     pos += 8;                   /* skip walEnd */
                               1007                 :              0 :     pos += 8;                   /* skip sendTime */
                               1008                 :                : 
 4049 fujii@postgresql.org     1009         [ #  # ]:              0 :     if (len < pos + 1)
                               1010                 :                :     {
 2350 peter@eisentraut.org     1011                 :              0 :         pg_log_error("streaming header too small: %d", len);
 4049 fujii@postgresql.org     1012                 :              0 :         return false;
                               1013                 :                :     }
                               1014                 :              0 :     replyRequested = copybuf[pos];
                               1015                 :                : 
                               1016                 :                :     /* If the server requested an immediate reply, send one. */
                               1017   [ #  #  #  # ]:              0 :     if (replyRequested && still_sending)
                               1018                 :                :     {
 3944                          1019   [ #  #  #  # ]:              0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
 3240 magnus@hagander.net      1020         [ #  # ]:              0 :             walfile != NULL)
                               1021                 :                :         {
                               1022                 :                :             /*
                               1023                 :                :              * If a valid flush location needs to be reported, flush the
                               1024                 :                :              * current WAL file so that the latest flush location is sent back
                               1025                 :                :              * to the server. This is necessary to see whether the last WAL
                               1026                 :                :              * data has been successfully replicated or not, at the normal
                               1027                 :                :              * shutdown of the server.
                               1028                 :                :              */
 1083 rhaas@postgresql.org     1029         [ #  # ]:              0 :             if (stream->walmethod->ops->sync(walfile) != 0)
 1247 tgl@sss.pgh.pa.us        1030                 :              0 :                 pg_fatal("could not fsync file \"%s\": %s",
                               1031                 :                :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
 3944 fujii@postgresql.org     1032                 :              0 :             lastFlushPosition = blockpos;
                               1033                 :                :         }
                               1034                 :                : 
 4049                          1035                 :              0 :         now = feGetCurrentTimestamp();
                               1036         [ #  # ]:              0 :         if (!sendFeedback(conn, blockpos, now, false))
                               1037                 :              0 :             return false;
                               1038                 :              0 :         *last_status = now;
                               1039                 :                :     }
                               1040                 :                : 
                               1041                 :              0 :     return true;
                               1042                 :                : }
                               1043                 :                : 
                               1044                 :                : /*
                               1045                 :                :  * Process WALData message.
                               1046                 :                :  */
                               1047                 :                : static bool
   33 alvherre@kurilemu.de     1048                 :GNC        1324 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                               1049                 :                :                   XLogRecPtr *blockpos)
                               1050                 :                : {
                               1051                 :                :     int         xlogoff;
                               1052                 :                :     int         bytes_left;
                               1053                 :                :     int         bytes_written;
                               1054                 :                :     int         hdr_len;
                               1055                 :                : 
                               1056                 :                :     /*
                               1057                 :                :      * Once we've decided we don't want to receive any more, just ignore any
                               1058                 :                :      * subsequent WALData messages.
                               1059                 :                :      */
 4049 fujii@postgresql.org     1060         [ +  + ]:CBC        1324 :     if (!(still_sending))
                               1061                 :            189 :         return true;
                               1062                 :                : 
                               1063                 :                :     /*
                               1064                 :                :      * Read the header of the WALData message, enclosed in the CopyData
                               1065                 :                :      * message. We only need the WAL location field (dataStart), the rest of
                               1066                 :                :      * the header is ignored.
                               1067                 :                :      */
   31 nathan@postgresql.or     1068                 :GNC        1135 :     hdr_len = 1;                /* msgtype PqReplMsg_WALData */
 3759 bruce@momjian.us         1069                 :CBC        1135 :     hdr_len += 8;               /* dataStart */
                               1070                 :           1135 :     hdr_len += 8;               /* walEnd */
                               1071                 :           1135 :     hdr_len += 8;               /* sendTime */
 4049 fujii@postgresql.org     1072         [ -  + ]:           1135 :     if (len < hdr_len)
                               1073                 :                :     {
 2350 peter@eisentraut.org     1074                 :UBC           0 :         pg_log_error("streaming header too small: %d", len);
 4049 fujii@postgresql.org     1075                 :              0 :         return false;
                               1076                 :                :     }
 4049 fujii@postgresql.org     1077                 :CBC        1135 :     *blockpos = fe_recvint64(&copybuf[1]);
                               1078                 :                : 
                               1079                 :                :     /* Extract WAL location for this block */
 2909 andres@anarazel.de       1080                 :           1135 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
                               1081                 :                : 
                               1082                 :                :     /*
                               1083                 :                :      * Verify that the initial location in the stream matches where we think
                               1084                 :                :      * we are.
                               1085                 :                :      */
 3240 magnus@hagander.net      1086         [ +  + ]:           1135 :     if (walfile == NULL)
                               1087                 :                :     {
                               1088                 :                :         /* No file open yet */
 4049 fujii@postgresql.org     1089         [ -  + ]:            142 :         if (xlogoff != 0)
                               1090                 :                :         {
 2350 peter@eisentraut.org     1091                 :UBC           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
                               1092                 :                :                          xlogoff);
 4049 fujii@postgresql.org     1093                 :              0 :             return false;
                               1094                 :                :         }
                               1095                 :                :     }
                               1096                 :                :     else
                               1097                 :                :     {
                               1098                 :                :         /* More data in existing segment */
 1083 rhaas@postgresql.org     1099         [ -  + ]:CBC         993 :         if (walfile->currpos != xlogoff)
                               1100                 :                :         {
 2350 peter@eisentraut.org     1101                 :UBC           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
                               1102                 :                :                          xlogoff, (int) walfile->currpos);
 4049 fujii@postgresql.org     1103                 :              0 :             return false;
                               1104                 :                :         }
                               1105                 :                :     }
                               1106                 :                : 
 4049 fujii@postgresql.org     1107                 :CBC        1135 :     bytes_left = len - hdr_len;
                               1108                 :           1135 :     bytes_written = 0;
                               1109                 :                : 
                               1110         [ +  + ]:           2270 :     while (bytes_left)
                               1111                 :                :     {
                               1112                 :                :         int         bytes_to_write;
                               1113                 :                : 
                               1114                 :                :         /*
                               1115                 :                :          * If crossing a WAL boundary, only write up until we reach wal
                               1116                 :                :          * segment size.
                               1117                 :                :          */
 2909 andres@anarazel.de       1118         [ -  + ]:           1135 :         if (xlogoff + bytes_left > WalSegSz)
 2909 andres@anarazel.de       1119                 :UBC           0 :             bytes_to_write = WalSegSz - xlogoff;
                               1120                 :                :         else
 4049 fujii@postgresql.org     1121                 :CBC        1135 :             bytes_to_write = bytes_left;
                               1122                 :                : 
 3240 magnus@hagander.net      1123         [ +  + ]:           1135 :         if (walfile == NULL)
                               1124                 :                :         {
 3466                          1125         [ -  + ]:            142 :             if (!open_walfile(stream, *blockpos))
                               1126                 :                :             {
                               1127                 :                :                 /* Error logged by open_walfile */
 4049 fujii@postgresql.org     1128                 :UBC           0 :                 return false;
                               1129                 :                :             }
                               1130                 :                :         }
                               1131                 :                : 
 1083 rhaas@postgresql.org     1132                 :CBC        2270 :         if (stream->walmethod->ops->write(walfile,
                               1133                 :           1135 :                                           copybuf + hdr_len + bytes_written,
                               1134         [ -  + ]:           1135 :                                           bytes_to_write) != bytes_to_write)
                               1135                 :                :         {
 1389 peter@eisentraut.org     1136                 :UBC           0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
                               1137                 :                :                          bytes_to_write, walfile->pathname,
                               1138                 :                :                          GetLastWalMethodError(stream->walmethod));
 4049 fujii@postgresql.org     1139                 :              0 :             return false;
                               1140                 :                :         }
                               1141                 :                : 
                               1142                 :                :         /* Write was successful, advance our position */
 4049 fujii@postgresql.org     1143                 :CBC        1135 :         bytes_written += bytes_to_write;
                               1144                 :           1135 :         bytes_left -= bytes_to_write;
                               1145                 :           1135 :         *blockpos += bytes_to_write;
                               1146                 :           1135 :         xlogoff += bytes_to_write;
                               1147                 :                : 
                               1148                 :                :         /* Did we reach the end of a WAL segment? */
 2909 andres@anarazel.de       1149         [ +  + ]:           1135 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
                               1150                 :                :         {
 3466 magnus@hagander.net      1151         [ -  + ]:             10 :             if (!close_walfile(stream, *blockpos))
                               1152                 :                :                 /* Error message written in close_walfile() */
 4049 fujii@postgresql.org     1153                 :UBC           0 :                 return false;
                               1154                 :                : 
 4049 fujii@postgresql.org     1155                 :CBC          10 :             xlogoff = 0;
                               1156                 :                : 
 3466 magnus@hagander.net      1157   [ +  -  -  + ]:             10 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
                               1158                 :                :             {
 4049 fujii@postgresql.org     1159   [ #  #  #  # ]:UBC           0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                               1160                 :                :                 {
 2350 peter@eisentraut.org     1161                 :              0 :                     pg_log_error("could not send copy-end packet: %s",
                               1162                 :                :                                  PQerrorMessage(conn));
 4049 fujii@postgresql.org     1163                 :              0 :                     return false;
                               1164                 :                :                 }
                               1165                 :              0 :                 still_sending = false;
   33 alvherre@kurilemu.de     1166                 :UNC           0 :                 return true;    /* ignore the rest of this WALData packet */
                               1167                 :                :             }
                               1168                 :                :         }
                               1169                 :                :     }
                               1170                 :                :     /* No more data left to write, receive next copy packet */
                               1171                 :                : 
 4049 fujii@postgresql.org     1172                 :CBC        1135 :     return true;
                               1173                 :                : }
                               1174                 :                : 
                               1175                 :                : /*
                               1176                 :                :  * Handle end of the copy stream.
                               1177                 :                :  */
                               1178                 :                : static PGresult *
 3466 magnus@hagander.net      1179                 :            136 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                               1180                 :                :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
                               1181                 :                : {
 4049 fujii@postgresql.org     1182                 :            136 :     PGresult   *res = PQgetResult(conn);
                               1183                 :                : 
                               1184                 :                :     /*
                               1185                 :                :      * The server closed its end of the copy stream.  If we haven't closed
                               1186                 :                :      * ours already, we need to do so now, unless the server threw an error,
                               1187                 :                :      * in which case we don't.
                               1188                 :                :      */
                               1189         [ +  + ]:            136 :     if (still_sending)
                               1190                 :                :     {
 3466 magnus@hagander.net      1191         [ -  + ]:              2 :         if (!close_walfile(stream, blockpos))
                               1192                 :                :         {
                               1193                 :                :             /* Error message written in close_walfile() */
 4049 fujii@postgresql.org     1194                 :UBC           0 :             PQclear(res);
                               1195                 :              0 :             return NULL;
                               1196                 :                :         }
 4049 fujii@postgresql.org     1197         [ +  + ]:CBC           2 :         if (PQresultStatus(res) == PGRES_COPY_IN)
                               1198                 :                :         {
                               1199   [ +  -  -  + ]:              1 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                               1200                 :                :             {
 2350 peter@eisentraut.org     1201                 :UBC           0 :                 pg_log_error("could not send copy-end packet: %s",
                               1202                 :                :                              PQerrorMessage(conn));
 4049 fujii@postgresql.org     1203                 :              0 :                 PQclear(res);
                               1204                 :              0 :                 return NULL;
                               1205                 :                :             }
 4049 fujii@postgresql.org     1206                 :CBC           1 :             res = PQgetResult(conn);
                               1207                 :                :         }
                               1208                 :              2 :         still_sending = false;
                               1209                 :                :     }
                               1210                 :            136 :     *stoppos = blockpos;
                               1211                 :            136 :     return res;
                               1212                 :                : }
                               1213                 :                : 
                               1214                 :                : /*
                               1215                 :                :  * Check if we should continue streaming, or abort at this point.
                               1216                 :                :  */
                               1217                 :                : static bool
 1822 peter@eisentraut.org     1218                 :           2619 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
                               1219                 :                : {
 3466 magnus@hagander.net      1220   [ +  +  +  + ]:           2619 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
                               1221                 :                :     {
                               1222         [ -  + ]:            134 :         if (!close_walfile(stream, blockpos))
                               1223                 :                :         {
                               1224                 :                :             /* Potential error message is written by close_walfile */
 4047 fujii@postgresql.org     1225                 :UBC           0 :             return false;
                               1226                 :                :         }
 4047 fujii@postgresql.org     1227   [ +  -  -  + ]:CBC         134 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                               1228                 :                :         {
 2350 peter@eisentraut.org     1229                 :UBC           0 :             pg_log_error("could not send copy-end packet: %s",
                               1230                 :                :                          PQerrorMessage(conn));
 4047 fujii@postgresql.org     1231                 :              0 :             return false;
                               1232                 :                :         }
 4047 fujii@postgresql.org     1233                 :CBC         134 :         still_sending = false;
                               1234                 :                :     }
                               1235                 :                : 
                               1236                 :           2619 :     return true;
                               1237                 :                : }
                               1238                 :                : 
                               1239                 :                : /*
                               1240                 :                :  * Calculate how long send/receive loops should sleep
                               1241                 :                :  */
                               1242                 :                : static long
 3117 tgl@sss.pgh.pa.us        1243                 :           1295 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
                               1244                 :                :                              TimestampTz last_status)
                               1245                 :                : {
                               1246                 :           1295 :     TimestampTz status_targettime = 0;
                               1247                 :                :     long        sleeptime;
                               1248                 :                : 
 4047 fujii@postgresql.org     1249   [ +  -  +  + ]:           1295 :     if (standby_message_timeout && still_sending)
                               1250                 :           1216 :         status_targettime = last_status +
                               1251                 :           1216 :             (standby_message_timeout - 1) * ((int64) 1000);
                               1252                 :                : 
 3945                          1253         [ +  + ]:           1295 :     if (status_targettime > 0)
                               1254                 :                :     {
                               1255                 :                :         long        secs;
                               1256                 :                :         int         usecs;
                               1257                 :                : 
 4047                          1258                 :           1216 :         feTimestampDifference(now,
                               1259                 :                :                               status_targettime,
                               1260                 :                :                               &secs,
                               1261                 :                :                               &usecs);
                               1262                 :                :         /* Always sleep at least 1 sec */
                               1263         [ -  + ]:           1216 :         if (secs <= 0)
                               1264                 :                :         {
 4047 fujii@postgresql.org     1265                 :UBC           0 :             secs = 1;
                               1266                 :              0 :             usecs = 0;
                               1267                 :                :         }
                               1268                 :                : 
 4047 fujii@postgresql.org     1269                 :CBC        1216 :         sleeptime = secs * 1000 + usecs / 1000;
                               1270                 :                :     }
                               1271                 :                :     else
                               1272                 :             79 :         sleeptime = -1;
                               1273                 :                : 
                               1274                 :           1295 :     return sleeptime;
                               1275                 :                : }
        

Generated by: LCOV version 2.4-beta