LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit UNC UBC GBC GNC CBC DUB DCB
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 78.3 % 484 379 6 99 26 23 330 10 22
Current Date: 2026-03-14 14:10:32 -0400 Functions: 90.0 % 10 9 1 6 3
Baseline: lcov-20260315-024220-baseline Branches: 71.9 % 292 210 13 69 16 23 171 18 22
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 1 1 1
(30,360] days: 80.4 % 46 37 6 3 22 15
(360..) days: 78.0 % 437 341 96 26 315
Function coverage date bins:
(360..) days: 90.0 % 10 9 1 6 3
Branch coverage date bins:
(30,360] days: 67.4 % 46 31 13 2 23 8
(360..) days: 72.8 % 246 179 67 16 163

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
                                  4                 :                :  *                    fashion and write it to a local file.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/bin/pg_basebackup/pg_recvlogical.c
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : 
                                 13                 :                : #include "postgres_fe.h"
                                 14                 :                : 
                                 15                 :                : #include <dirent.h>
                                 16                 :                : #include <limits.h>
                                 17                 :                : #include <sys/select.h>
                                 18                 :                : #include <sys/stat.h>
                                 19                 :                : #include <unistd.h>
                                 20                 :                : 
                                 21                 :                : #include "common/file_perm.h"
                                 22                 :                : #include "common/logging.h"
                                 23                 :                : #include "fe_utils/option_utils.h"
                                 24                 :                : #include "getopt_long.h"
                                 25                 :                : #include "libpq-fe.h"
                                 26                 :                : #include "libpq/pqsignal.h"
                                 27                 :                : #include "libpq/protocol.h"
                                 28                 :                : #include "pqexpbuffer.h"
                                 29                 :                : #include "streamutil.h"
                                 30                 :                : 
                                 31                 :                : /* Time to sleep between reconnection attempts */
                                 32                 :                : #define RECONNECT_SLEEP_TIME 5
                                 33                 :                : 
                                 34                 :                : typedef enum
                                 35                 :                : {
                                 36                 :                :     STREAM_STOP_NONE,
                                 37                 :                :     STREAM_STOP_END_OF_WAL,
                                 38                 :                :     STREAM_STOP_KEEPALIVE,
                                 39                 :                :     STREAM_STOP_SIGNAL
                                 40                 :                : } StreamStopReason;
                                 41                 :                : 
                                 42                 :                : /* Global Options */
                                 43                 :                : static char *outfile = NULL;
                                 44                 :                : static int  verbose = 0;
                                 45                 :                : static bool two_phase = false;  /* enable-two-phase option */
                                 46                 :                : static bool failover = false;   /* enable-failover option */
                                 47                 :                : static int  noloop = 0;
                                 48                 :                : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 49                 :                : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
                                 50                 :                : static XLogRecPtr startpos = InvalidXLogRecPtr;
                                 51                 :                : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 52                 :                : static bool do_create_slot = false;
                                 53                 :                : static bool slot_exists_ok = false;
                                 54                 :                : static bool do_start_slot = false;
                                 55                 :                : static bool do_drop_slot = false;
                                 56                 :                : static char *replication_slot = NULL;
                                 57                 :                : 
                                 58                 :                : /* filled pairwise with option, value. value may be NULL */
                                 59                 :                : static char **options;
                                 60                 :                : static size_t noptions = 0;
                                 61                 :                : static const char *plugin = "test_decoding";
                                 62                 :                : 
                                 63                 :                : /* Global State */
                                 64                 :                : static int  outfd = -1;
                                 65                 :                : static volatile sig_atomic_t time_to_abort = false;
                                 66                 :                : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
                                 67                 :                : static volatile sig_atomic_t output_reopen = false;
                                 68                 :                : static bool output_isfile;
                                 69                 :                : static TimestampTz output_last_fsync = -1;
                                 70                 :                : static bool output_needs_fsync = false;
                                 71                 :                : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
                                 72                 :                : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
                                 73                 :                : 
                                 74                 :                : static void usage(void);
                                 75                 :                : static void StreamLogicalLog(void);
                                 76                 :                : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
                                 77                 :                : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                                 78                 :                :                                StreamStopReason reason,
                                 79                 :                :                                XLogRecPtr lsn);
                                 80                 :                : 
                                 81                 :                : static void
 4380 rhaas@postgresql.org       82                 :CBC           1 : usage(void)
                                 83                 :                : {
 4013 bruce@momjian.us           84                 :              1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
                                 85                 :                :            progname);
 4380 rhaas@postgresql.org       86                 :              1 :     printf(_("Usage:\n"));
                                 87                 :              1 :     printf(_("  %s [OPTION]...\n"), progname);
 4172 peter_e@gmx.net            88                 :              1 :     printf(_("\nAction to be performed:\n"));
                                 89                 :              1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                 90                 :              1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
                                 91                 :              1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 4380 rhaas@postgresql.org       92                 :              1 :     printf(_("\nOptions:\n"));
  259 peter@eisentraut.org       93                 :              1 :     printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
                                 94                 :                :              "                         creating a replication slot\n"));
                                 95                 :              1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 4172 peter_e@gmx.net            96                 :              1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 4312 andres@anarazel.de         97                 :              1 :     printf(_("  -F  --fsync-interval=SECS\n"
                                 98                 :                :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 3833 peter_e@gmx.net            99                 :              1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 4172                           100                 :              1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
 4380 rhaas@postgresql.org      101                 :              1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 4172 peter_e@gmx.net           102                 :              1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
                                103                 :                :              "                         pass option NAME with optional value VALUE to the\n"
                                104                 :                :              "                         output plugin\n"));
                                105                 :              1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
                                106                 :              1 :     printf(_("  -s, --status-interval=SECS\n"
                                107                 :                :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
                                108                 :              1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
  259 peter@eisentraut.org      109                 :              1 :     printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
                                110                 :              1 :     printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
 4380 rhaas@postgresql.org      111                 :              1 :     printf(_("  -v, --verbose          output verbose messages\n"));
                                112                 :              1 :     printf(_("  -V, --version          output version information, then exit\n"));
                                113                 :              1 :     printf(_("  -?, --help             show this help, then exit\n"));
                                114                 :              1 :     printf(_("\nConnection options:\n"));
                                115                 :              1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
                                116                 :              1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                117                 :              1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                118                 :              1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                119                 :              1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                120                 :              1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 2207 peter@eisentraut.org      121                 :              1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
                                122                 :              1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 4380 rhaas@postgresql.org      123                 :              1 : }
                                124                 :                : 
                                125                 :                : /*
                                126                 :                :  * Send a Standby Status Update message to server.
                                127                 :                :  */
                                128                 :                : static bool
 3307 tgl@sss.pgh.pa.us         129                 :             27 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                                130                 :                : {
                                131                 :                :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
                                132                 :                :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
                                133                 :                : 
                                134                 :                :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 4380 rhaas@postgresql.org      135                 :             27 :     int         len = 0;
                                136                 :                : 
                                137                 :                :     /*
                                138                 :                :      * we normally don't want to send superfluous feedback, but if it's
                                139                 :                :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
                                140                 :                :      * us.
                                141                 :                :      */
                                142         [ -  + ]:             27 :     if (!force &&
 4380 rhaas@postgresql.org      143         [ #  # ]:UBC           0 :         last_written_lsn == output_written_lsn &&
 2132 noah@leadboat.com         144         [ #  # ]:              0 :         last_fsync_lsn == output_fsync_lsn)
 4380 rhaas@postgresql.org      145                 :              0 :         return true;
                                146                 :                : 
 4380 rhaas@postgresql.org      147         [ +  + ]:CBC          27 :     if (verbose)
  251 alvherre@kurilemu.de      148                 :GNC           2 :         pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
                                149                 :                :                     LSN_FORMAT_ARGS(output_written_lsn),
                                150                 :                :                     LSN_FORMAT_ARGS(output_fsync_lsn),
                                151                 :                :                     replication_slot);
                                152                 :                : 
  221 nathan@postgresql.or      153                 :             27 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
 4380 rhaas@postgresql.org      154                 :CBC          27 :     len += 1;
 4331 bruce@momjian.us          155                 :             27 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
 4380 rhaas@postgresql.org      156                 :             27 :     len += 8;
 3189 tgl@sss.pgh.pa.us         157                 :             27 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
 4380 rhaas@postgresql.org      158                 :             27 :     len += 8;
 4331 bruce@momjian.us          159                 :             27 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 4380 rhaas@postgresql.org      160                 :             27 :     len += 8;
 4331 bruce@momjian.us          161                 :             27 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 4380 rhaas@postgresql.org      162                 :             27 :     len += 8;
 3189 tgl@sss.pgh.pa.us         163                 :             27 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 4380 rhaas@postgresql.org      164                 :             27 :     len += 1;
                                165                 :                : 
                                166                 :             27 :     startpos = output_written_lsn;
                                167                 :             27 :     last_written_lsn = output_written_lsn;
                                168                 :             27 :     last_fsync_lsn = output_fsync_lsn;
                                169                 :                : 
                                170   [ +  -  -  + ]:             27 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                171                 :                :     {
 2540 peter@eisentraut.org      172                 :UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                173                 :                :                      PQerrorMessage(conn));
 4380 rhaas@postgresql.org      174                 :              0 :         return false;
                                175                 :                :     }
                                176                 :                : 
 4380 rhaas@postgresql.org      177                 :CBC          27 :     return true;
                                178                 :                : }
                                179                 :                : 
                                180                 :                : static void
 2633 peter@eisentraut.org      181                 :             56 : disconnect_atexit(void)
                                182                 :                : {
 4380 rhaas@postgresql.org      183         [ +  + ]:             56 :     if (conn != NULL)
                                184                 :             32 :         PQfinish(conn);
                                185                 :             56 : }
                                186                 :                : 
                                187                 :                : static void
 3307 tgl@sss.pgh.pa.us         188                 :             37 : OutputFsync(TimestampTz now)
                                189                 :                : {
 4380 rhaas@postgresql.org      190                 :             37 :     output_last_fsync = now;
                                191                 :                : 
                                192                 :             37 :     output_fsync_lsn = output_written_lsn;
                                193                 :                : 
                                194                 :                :     /*
                                195                 :                :      * Save the last flushed position as the replication start point. On
                                196                 :                :      * reconnect, replication resumes from there to avoid re-sending flushed
                                197                 :                :      * data.
                                198                 :                :      */
   58 fujii@postgresql.org      199                 :GNC          37 :     startpos = output_fsync_lsn;
                                200                 :                : 
 4380 rhaas@postgresql.org      201         [ -  + ]:CBC          37 :     if (fsync_interval <= 0)
   58 fujii@postgresql.org      202                 :UNC           0 :         return;
                                203                 :                : 
 4322 heikki.linnakangas@i      204         [ +  + ]:CBC          37 :     if (!output_needs_fsync)
   58 fujii@postgresql.org      205                 :GNC          25 :         return;
                                206                 :                : 
 4322 heikki.linnakangas@i      207                 :CBC          12 :     output_needs_fsync = false;
                                208                 :                : 
                                209                 :                :     /* can only fsync if it's a regular file */
 3904 andres@anarazel.de        210         [ +  + ]:             12 :     if (!output_isfile)
   58 fujii@postgresql.org      211                 :GNC           8 :         return;
                                212                 :                : 
 3904 andres@anarazel.de        213         [ -  + ]:CBC           4 :     if (fsync(outfd) != 0)
 1437 tgl@sss.pgh.pa.us         214                 :UBC           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
                                215                 :                : }
                                216                 :                : 
                                217                 :                : /*
                                218                 :                :  * Start the log streaming
                                219                 :                :  */
                                220                 :                : static void
 4185 andres@anarazel.de        221                 :CBC          25 : StreamLogicalLog(void)
                                222                 :                : {
                                223                 :                :     PGresult   *res;
 4380 rhaas@postgresql.org      224                 :             25 :     char       *copybuf = NULL;
 3307 tgl@sss.pgh.pa.us         225                 :             25 :     TimestampTz last_status = -1;
                                226                 :                :     int         i;
                                227                 :                :     PQExpBuffer query;
                                228                 :                :     XLogRecPtr  cur_record_lsn;
                                229                 :                : 
  969 michael@paquier.xyz       230                 :             25 :     cur_record_lsn = InvalidXLogRecPtr;
                                231                 :                : 
                                232                 :                :     /*
                                233                 :                :      * Connect in replication mode to the server
                                234                 :                :      */
 4380 rhaas@postgresql.org      235         [ +  + ]:             25 :     if (!conn)
 4380 rhaas@postgresql.org      236                 :GBC           1 :         conn = GetConnection();
 4380 rhaas@postgresql.org      237         [ -  + ]:CBC          25 :     if (!conn)
                                238                 :                :         /* Error message already written in GetConnection() */
 4380 rhaas@postgresql.org      239                 :UBC           0 :         return;
                                240                 :                : 
                                241                 :                :     /*
                                242                 :                :      * Start the replication
                                243                 :                :      */
 4380 rhaas@postgresql.org      244         [ +  + ]:CBC          25 :     if (verbose)
  251 alvherre@kurilemu.de      245                 :GNC           2 :         pg_log_info("starting log streaming at %X/%08X (slot %s)",
                                246                 :                :                     LSN_FORMAT_ARGS(startpos),
                                247                 :                :                     replication_slot);
                                248                 :                : 
                                249                 :                :     /* Initiate the replication stream at specified location */
 1575 tgl@sss.pgh.pa.us         250                 :CBC          25 :     query = createPQExpBuffer();
  251 alvherre@kurilemu.de      251                 :GNC          25 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
 1846 peter@eisentraut.org      252                 :CBC          25 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
                                253                 :                : 
                                254                 :                :     /* print options if there are any */
 4380 rhaas@postgresql.org      255         [ +  + ]:             25 :     if (noptions)
                                256                 :             20 :         appendPQExpBufferStr(query, " (");
                                257                 :                : 
                                258         [ +  + ]:             65 :     for (i = 0; i < noptions; i++)
                                259                 :                :     {
                                260                 :                :         /* separator */
                                261         [ +  + ]:             40 :         if (i > 0)
                                262                 :             20 :             appendPQExpBufferStr(query, ", ");
                                263                 :                : 
                                264                 :                :         /* write option name */
                                265                 :             40 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
                                266                 :                : 
                                267                 :                :         /* write option value if specified */
                                268         [ +  - ]:             40 :         if (options[(i * 2) + 1] != NULL)
                                269                 :             40 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
                                270                 :                :     }
                                271                 :                : 
                                272         [ +  + ]:             25 :     if (noptions)
                                273                 :             20 :         appendPQExpBufferChar(query, ')');
                                274                 :                : 
                                275                 :             25 :     res = PQexec(conn, query->data);
                                276         [ +  + ]:             25 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                277                 :                :     {
 2540 peter@eisentraut.org      278                 :              6 :         pg_log_error("could not send replication command \"%s\": %s",
                                279                 :                :                      query->data, PQresultErrorMessage(res));
 4380 rhaas@postgresql.org      280                 :              6 :         PQclear(res);
                                281                 :              6 :         goto error;
                                282                 :                :     }
                                283                 :             19 :     PQclear(res);
                                284                 :             19 :     resetPQExpBuffer(query);
                                285                 :                : 
                                286         [ +  + ]:             19 :     if (verbose)
 2540 peter@eisentraut.org      287                 :GBC           2 :         pg_log_info("streaming initiated");
                                288                 :                : 
 4380 rhaas@postgresql.org      289         [ +  + ]:CBC         333 :     while (!time_to_abort)
                                290                 :                :     {
                                291                 :                :         int         r;
                                292                 :                :         int         bytes_left;
                                293                 :                :         int         bytes_written;
                                294                 :                :         TimestampTz now;
                                295                 :                :         int         hdr_len;
                                296                 :                : 
  969 michael@paquier.xyz       297                 :            331 :         cur_record_lsn = InvalidXLogRecPtr;
                                298                 :                : 
 4380 rhaas@postgresql.org      299         [ +  + ]:            331 :         if (copybuf != NULL)
                                300                 :                :         {
                                301                 :            203 :             PQfreemem(copybuf);
                                302                 :            203 :             copybuf = NULL;
                                303                 :                :         }
                                304                 :                : 
                                305                 :                :         /*
                                306                 :                :          * Potentially send a status message to the primary.
                                307                 :                :          */
                                308                 :            331 :         now = feGetCurrentTimestamp();
                                309                 :                : 
                                310   [ +  +  +  + ]:            644 :         if (outfd != -1 &&
                                311                 :            313 :             feTimestampDifferenceExceeds(output_last_fsync, now,
                                312                 :                :                                          fsync_interval))
   58 fujii@postgresql.org      313                 :GNC          19 :             OutputFsync(now);
                                314                 :                : 
 4380 rhaas@postgresql.org      315   [ +  -  +  + ]:CBC         662 :         if (standby_message_timeout > 0 &&
                                316                 :            331 :             feTimestampDifferenceExceeds(last_status, now,
                                317                 :                :                                          standby_message_timeout))
                                318                 :                :         {
                                319                 :                :             /* Time to send feedback! */
                                320         [ -  + ]:             19 :             if (!sendFeedback(conn, now, true, false))
 4380 rhaas@postgresql.org      321                 :GBC           2 :                 goto error;
                                322                 :                : 
 4380 rhaas@postgresql.org      323                 :CBC          19 :             last_status = now;
                                324                 :                :         }
                                325                 :                : 
                                326                 :                :         /* got SIGHUP, close output file */
 4322 heikki.linnakangas@i      327   [ +  +  -  +  :            331 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
                                              -  - ]
                                328                 :                :         {
 4322 heikki.linnakangas@i      329                 :UBC           0 :             now = feGetCurrentTimestamp();
   58 fujii@postgresql.org      330                 :UNC           0 :             OutputFsync(now);
 4322 heikki.linnakangas@i      331                 :UBC           0 :             close(outfd);
                                332                 :              0 :             outfd = -1;
                                333                 :                :         }
 4322 heikki.linnakangas@i      334                 :CBC         331 :         output_reopen = false;
                                335                 :                : 
                                336                 :                :         /* open the output file, if not open yet */
 4321                           337         [ +  + ]:            331 :         if (outfd == -1)
                                338                 :                :         {
                                339                 :                :             struct stat statbuf;
                                340                 :                : 
                                341         [ +  + ]:             18 :             if (strcmp(outfile, "-") == 0)
                                342                 :             17 :                 outfd = fileno(stdout);
                                343                 :                :             else
 4321 heikki.linnakangas@i      344                 :GBC           1 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
                                345                 :                :                              S_IRUSR | S_IWUSR);
 4321 heikki.linnakangas@i      346         [ -  + ]:CBC          18 :             if (outfd == -1)
                                347                 :                :             {
 2540 peter@eisentraut.org      348                 :UBC           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
 4321 heikki.linnakangas@i      349                 :              0 :                 goto error;
                                350                 :                :             }
                                351                 :                : 
 3904 andres@anarazel.de        352         [ -  + ]:CBC          18 :             if (fstat(outfd, &statbuf) != 0)
                                353                 :                :             {
 2540 peter@eisentraut.org      354                 :UBC           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
 1690 michael@paquier.xyz       355                 :              0 :                 goto error;
                                356                 :                :             }
                                357                 :                : 
 3904 andres@anarazel.de        358   [ +  +  +  - ]:CBC          18 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
                                359                 :                :         }
                                360                 :                : 
 4380 rhaas@postgresql.org      361                 :            331 :         r = PQgetCopyData(conn, &copybuf, 1);
                                362         [ +  + ]:            331 :         if (r == 0)
                                363                 :            109 :         {
                                364                 :                :             /*
                                365                 :                :              * In async mode, and no data available. We block on reading but
                                366                 :                :              * not more than the specified timeout, so that we can send a
                                367                 :                :              * response back to the client.
                                368                 :                :              */
                                369                 :                :             fd_set      input_mask;
 3307 tgl@sss.pgh.pa.us         370                 :            113 :             TimestampTz message_target = 0;
                                371                 :            113 :             TimestampTz fsync_target = 0;
                                372                 :                :             struct timeval timeout;
 4380 rhaas@postgresql.org      373                 :            113 :             struct timeval *timeoutptr = NULL;
                                374                 :                : 
 3659 peter_e@gmx.net           375         [ -  + ]:            113 :             if (PQsocket(conn) < 0)
                                376                 :                :             {
 2540 peter@eisentraut.org      377                 :UBC           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 3659 peter_e@gmx.net           378                 :CBC           2 :                 goto error;
                                379                 :                :             }
                                380                 :                : 
 4380 rhaas@postgresql.org      381         [ +  + ]:           1921 :             FD_ZERO(&input_mask);
                                382                 :            113 :             FD_SET(PQsocket(conn), &input_mask);
                                383                 :                : 
                                384                 :                :             /* Compute when we need to wakeup to send a keepalive message. */
                                385         [ +  - ]:            113 :             if (standby_message_timeout)
                                386                 :            113 :                 message_target = last_status + (standby_message_timeout - 1) *
                                387                 :                :                     ((int64) 1000);
                                388                 :                : 
                                389                 :                :             /* Compute when we need to wakeup to fsync the output file. */
 4322 heikki.linnakangas@i      390   [ +  -  +  + ]:            113 :             if (fsync_interval > 0 && output_needs_fsync)
 4380 rhaas@postgresql.org      391                 :             47 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                                392                 :                :                     ((int64) 1000);
                                393                 :                : 
                                394                 :                :             /* Now compute when to wakeup. */
                                395   [ -  +  -  - ]:            113 :             if (message_target > 0 || fsync_target > 0)
                                396                 :                :             {
                                397                 :                :                 TimestampTz targettime;
                                398                 :                :                 long        secs;
                                399                 :                :                 int         usecs;
                                400                 :                : 
                                401                 :            113 :                 targettime = message_target;
                                402                 :                : 
                                403   [ +  +  +  + ]:            113 :                 if (fsync_target > 0 && fsync_target < targettime)
 4380 rhaas@postgresql.org      404                 :GBC           2 :                     targettime = fsync_target;
                                405                 :                : 
 4380 rhaas@postgresql.org      406                 :CBC         113 :                 feTimestampDifference(now,
                                407                 :                :                                       targettime,
                                408                 :                :                                       &secs,
                                409                 :                :                                       &usecs);
                                410         [ +  + ]:            113 :                 if (secs <= 0)
 4380 rhaas@postgresql.org      411                 :GBC           2 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                                412                 :                :                 else
 4380 rhaas@postgresql.org      413                 :CBC         111 :                     timeout.tv_sec = secs;
                                414                 :            113 :                 timeout.tv_usec = usecs;
                                415                 :            113 :                 timeoutptr = &timeout;
                                416                 :                :             }
                                417                 :                : 
                                418                 :            113 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
                                419   [ +  -  +  +  :            113 :             if (r == 0 || (r < 0 && errno == EINTR))
                                              +  - ]
                                420                 :                :             {
                                421                 :                :                 /*
                                422                 :                :                  * Got a timeout or signal. Continue the loop and either
                                423                 :                :                  * deliver a status packet to the server or just go back into
                                424                 :                :                  * blocking.
                                425                 :                :                  */
                                426                 :            111 :                 continue;
                                427                 :                :             }
                                428         [ -  + ]:            111 :             else if (r < 0)
                                429                 :                :             {
 1787 peter@eisentraut.org      430                 :UBC           0 :                 pg_log_error("%s() failed: %m", "select");
 4380 rhaas@postgresql.org      431                 :              0 :                 goto error;
                                432                 :                :             }
                                433                 :                : 
                                434                 :                :             /* Else there is actually data on the socket */
 4380 rhaas@postgresql.org      435         [ +  + ]:CBC         111 :             if (PQconsumeInput(conn) == 0)
                                436                 :                :             {
 2540 peter@eisentraut.org      437                 :              2 :                 pg_log_error("could not receive data from WAL stream: %s",
                                438                 :                :                              PQerrorMessage(conn));
 4380 rhaas@postgresql.org      439                 :              2 :                 goto error;
                                440                 :                :             }
                                441                 :            109 :             continue;
                                442                 :                :         }
                                443                 :                : 
                                444                 :                :         /* End of copy stream */
                                445         [ +  + ]:            218 :         if (r == -1)
                                446                 :             15 :             break;
                                447                 :                : 
                                448                 :                :         /* Failure while reading the copy stream */
                                449         [ -  + ]:            210 :         if (r == -2)
                                450                 :                :         {
 2540 peter@eisentraut.org      451                 :UBC           0 :             pg_log_error("could not read COPY data: %s",
                                452                 :                :                          PQerrorMessage(conn));
 4380 rhaas@postgresql.org      453                 :              0 :             goto error;
                                454                 :                :         }
                                455                 :                : 
                                456                 :                :         /* Check the message type. */
  221 nathan@postgresql.or      457         [ +  + ]:GNC         210 :         if (copybuf[0] == PqReplMsg_Keepalive)
 4380 rhaas@postgresql.org      458                 :CBC         111 :         {
                                459                 :                :             int         pos;
                                460                 :                :             bool        replyRequested;
                                461                 :                :             XLogRecPtr  walEnd;
 3357 simon@2ndQuadrant.co      462                 :            113 :             bool        endposReached = false;
                                463                 :                : 
                                464                 :                :             /*
                                465                 :                :              * Parse the keepalive message, enclosed in the CopyData message.
                                466                 :                :              * We just check if the server requested a reply, and ignore the
                                467                 :                :              * rest.
                                468                 :                :              */
  221 nathan@postgresql.or      469                 :GNC         113 :             pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
 4380 rhaas@postgresql.org      470                 :CBC         113 :             walEnd = fe_recvint64(&copybuf[pos]);
                                471                 :            113 :             output_written_lsn = Max(walEnd, output_written_lsn);
                                472                 :                : 
                                473                 :            113 :             pos += 8;           /* read walEnd */
                                474                 :                : 
                                475                 :            113 :             pos += 8;           /* skip sendTime */
                                476                 :                : 
                                477         [ -  + ]:            113 :             if (r < pos + 1)
                                478                 :                :             {
 2540 peter@eisentraut.org      479                 :UBC           0 :                 pg_log_error("streaming header too small: %d", r);
 4380 rhaas@postgresql.org      480                 :              0 :                 goto error;
                                481                 :                :             }
 4380 rhaas@postgresql.org      482                 :CBC         113 :             replyRequested = copybuf[pos];
                                483                 :                : 
  129 alvherre@kurilemu.de      484   [ +  +  +  + ]:GNC         113 :             if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
                                485                 :                :             {
                                486                 :                :                 /*
                                487                 :                :                  * If there's nothing to read on the socket until a keepalive
                                488                 :                :                  * we know that the server has nothing to send us; and if
                                489                 :                :                  * walEnd has passed endpos, we know nothing else can have
                                490                 :                :                  * committed before endpos.  So we can bail out now.
                                491                 :                :                  */
 3357 simon@2ndQuadrant.co      492                 :CBC           2 :                 endposReached = true;
                                493                 :                :             }
                                494                 :                : 
                                495                 :                :             /* Send a reply, if necessary */
                                496   [ +  +  +  + ]:            113 :             if (replyRequested || endposReached)
                                497                 :                :             {
                                498         [ -  + ]:              3 :                 if (!flushAndSendFeedback(conn, &now))
 4380 rhaas@postgresql.org      499                 :UBC           0 :                     goto error;
 4380 rhaas@postgresql.org      500                 :CBC           3 :                 last_status = now;
                                501                 :                :             }
                                502                 :                : 
 3357 simon@2ndQuadrant.co      503         [ +  + ]:            113 :             if (endposReached)
                                504                 :                :             {
  969 michael@paquier.xyz       505                 :              2 :                 stop_reason = STREAM_STOP_KEEPALIVE;
 3357 simon@2ndQuadrant.co      506                 :              2 :                 time_to_abort = true;
                                507                 :              2 :                 break;
                                508                 :                :             }
                                509                 :                : 
 4380 rhaas@postgresql.org      510                 :            111 :             continue;
                                511                 :                :         }
  221 nathan@postgresql.or      512         [ -  + ]:GNC          97 :         else if (copybuf[0] != PqReplMsg_WALData)
                                513                 :                :         {
 2540 peter@eisentraut.org      514                 :UBC           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
                                515                 :                :                          copybuf[0]);
 4380 rhaas@postgresql.org      516                 :              0 :             goto error;
                                517                 :                :         }
                                518                 :                : 
                                519                 :                :         /*
                                520                 :                :          * Read the header of the WALData message, enclosed in the CopyData
                                521                 :                :          * message. We only need the WAL location field (dataStart), the rest
                                522                 :                :          * of the header is ignored.
                                523                 :                :          */
  221 nathan@postgresql.or      524                 :GNC          97 :         hdr_len = 1;            /* msgtype PqReplMsg_WALData */
 4380 rhaas@postgresql.org      525                 :CBC          97 :         hdr_len += 8;           /* dataStart */
                                526                 :             97 :         hdr_len += 8;           /* walEnd */
                                527                 :             97 :         hdr_len += 8;           /* sendTime */
                                528         [ -  + ]:             97 :         if (r < hdr_len + 1)
                                529                 :                :         {
 2540 peter@eisentraut.org      530                 :UBC           0 :             pg_log_error("streaming header too small: %d", r);
 4380 rhaas@postgresql.org      531                 :              0 :             goto error;
                                532                 :                :         }
                                533                 :                : 
                                534                 :                :         /* Extract WAL location for this block */
 3357 simon@2ndQuadrant.co      535                 :CBC          97 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
                                536                 :                : 
  129 alvherre@kurilemu.de      537   [ +  +  -  + ]:GNC          97 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
                                538                 :                :         {
                                539                 :                :             /*
                                540                 :                :              * We've read past our endpoint, so prepare to go away being
                                541                 :                :              * cautious about what happens to our output data.
                                542                 :                :              */
 3357 simon@2ndQuadrant.co      543         [ #  # ]:UBC           0 :             if (!flushAndSendFeedback(conn, &now))
                                544                 :              0 :                 goto error;
  969 michael@paquier.xyz       545                 :              0 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3357 simon@2ndQuadrant.co      546                 :              0 :             time_to_abort = true;
                                547                 :              0 :             break;
                                548                 :                :         }
                                549                 :                : 
 3357 simon@2ndQuadrant.co      550                 :CBC          97 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
                                551                 :                : 
 4380 rhaas@postgresql.org      552                 :             97 :         bytes_left = r - hdr_len;
                                553                 :             97 :         bytes_written = 0;
                                554                 :                : 
                                555                 :                :         /* signal that a fsync is needed */
 4322 heikki.linnakangas@i      556                 :             97 :         output_needs_fsync = true;
                                557                 :                : 
 4380 rhaas@postgresql.org      558         [ +  + ]:            194 :         while (bytes_left)
                                559                 :                :         {
                                560                 :                :             int         ret;
                                561                 :                : 
                                562                 :            194 :             ret = write(outfd,
                                563                 :             97 :                         copybuf + hdr_len + bytes_written,
                                564                 :                :                         bytes_left);
                                565                 :                : 
                                566         [ -  + ]:             97 :             if (ret < 0)
                                567                 :                :             {
 1579 peter@eisentraut.org      568                 :UBC           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                569                 :                :                              bytes_left, outfile);
 4380 rhaas@postgresql.org      570                 :              0 :                 goto error;
                                571                 :                :             }
                                572                 :                : 
                                573                 :                :             /* Write was successful, advance our position */
 4380 rhaas@postgresql.org      574                 :CBC          97 :             bytes_written += ret;
                                575                 :             97 :             bytes_left -= ret;
                                576                 :                :         }
                                577                 :                : 
                                578         [ -  + ]:             97 :         if (write(outfd, "\n", 1) != 1)
                                579                 :                :         {
 1579 peter@eisentraut.org      580                 :UBC           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                581                 :                :                          1, outfile);
 4380 rhaas@postgresql.org      582                 :              0 :             goto error;
                                583                 :                :         }
                                584                 :                : 
  129 alvherre@kurilemu.de      585   [ +  +  +  + ]:GNC          97 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
                                586                 :                :         {
                                587                 :                :             /* endpos was exactly the record we just processed, we're done */
 3357 simon@2ndQuadrant.co      588         [ -  + ]:CBC           5 :             if (!flushAndSendFeedback(conn, &now))
 3357 simon@2ndQuadrant.co      589                 :UBC           0 :                 goto error;
  969 michael@paquier.xyz       590                 :CBC           5 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3357 simon@2ndQuadrant.co      591                 :              5 :             time_to_abort = true;
                                592                 :              5 :             break;
                                593                 :                :         }
                                594                 :                :     }
                                595                 :                : 
                                596                 :                :     /* Clean up connection state if stream has been aborted */
  969 michael@paquier.xyz       597         [ +  + ]:             17 :     if (time_to_abort)
                                598                 :              9 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
                                599                 :                : 
 4380 rhaas@postgresql.org      600                 :             17 :     res = PQgetResult(conn);
 3357 simon@2ndQuadrant.co      601         [ +  + ]:             17 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
                                602                 :                :     {
 2132 noah@leadboat.com         603                 :              9 :         PQclear(res);
                                604                 :                : 
                                605                 :                :         /*
                                606                 :                :          * We're doing a client-initiated clean exit and have sent CopyDone to
                                607                 :                :          * the server. Drain any messages, so we don't miss a last-minute
                                608                 :                :          * ErrorResponse. The walsender stops generating WALData records once
                                609                 :                :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
                                610                 :                :          * it's too late for sendFeedback(), even if this were to take a long
                                611                 :                :          * time. Hence, use synchronous-mode PQgetCopyData().
                                612                 :                :          */
                                613                 :                :         while (1)
                                614                 :            149 :         {
                                615                 :                :             int         r;
                                616                 :                : 
                                617         [ +  + ]:            158 :             if (copybuf != NULL)
                                618                 :                :             {
                                619                 :            156 :                 PQfreemem(copybuf);
                                620                 :            156 :                 copybuf = NULL;
                                621                 :                :             }
                                622                 :            158 :             r = PQgetCopyData(conn, &copybuf, 0);
                                623         [ +  + ]:            158 :             if (r == -1)
                                624                 :              9 :                 break;
                                625         [ -  + ]:            149 :             if (r == -2)
                                626                 :                :             {
 2132 noah@leadboat.com         627                 :UBC           0 :                 pg_log_error("could not read COPY data: %s",
                                628                 :                :                              PQerrorMessage(conn));
                                629                 :              0 :                 time_to_abort = false;  /* unclean exit */
                                630                 :              0 :                 goto error;
                                631                 :                :             }
                                632                 :                :         }
                                633                 :                : 
 2132 noah@leadboat.com         634                 :CBC           9 :         res = PQgetResult(conn);
                                635                 :                :     }
                                636         [ +  + ]:             17 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                637                 :                :     {
 2540 peter@eisentraut.org      638                 :              7 :         pg_log_error("unexpected termination of replication stream: %s",
                                639                 :                :                      PQresultErrorMessage(res));
  355 dgustafsson@postgres      640                 :              7 :         PQclear(res);
 4380 rhaas@postgresql.org      641                 :              7 :         goto error;
                                642                 :                :     }
                                643                 :             10 :     PQclear(res);
                                644                 :                : 
                                645   [ +  -  +  + ]:             10 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
                                646                 :                :     {
 3307 tgl@sss.pgh.pa.us         647                 :GBC           1 :         TimestampTz t = feGetCurrentTimestamp();
                                648                 :                : 
 4380 rhaas@postgresql.org      649                 :              1 :         OutputFsync(t);
                                650         [ -  + ]:              1 :         if (close(outfd) != 0)
 2540 peter@eisentraut.org      651                 :UBC           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
                                652                 :                :     }
 4380 rhaas@postgresql.org      653                 :CBC          10 :     outfd = -1;
                                654                 :             25 : error:
 4332 heikki.linnakangas@i      655         [ -  + ]:             25 :     if (copybuf != NULL)
                                656                 :                :     {
 4332 heikki.linnakangas@i      657                 :UBC           0 :         PQfreemem(copybuf);
                                658                 :              0 :         copybuf = NULL;
                                659                 :                :     }
 4380 rhaas@postgresql.org      660                 :CBC          25 :     destroyPQExpBuffer(query);
                                661                 :             25 :     PQfinish(conn);
                                662                 :             25 :     conn = NULL;
                                663                 :                : }
                                664                 :                : 
                                665                 :                : /*
                                666                 :                :  * Unfortunately we can't do sensible signal handling on windows...
                                667                 :                :  */
                                668                 :                : #ifndef WIN32
                                669                 :                : 
                                670                 :                : /*
                                671                 :                :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                672                 :                :  * possible moment.
                                673                 :                :  */
                                674                 :                : static void
 1278 tgl@sss.pgh.pa.us         675                 :              2 : sigexit_handler(SIGNAL_ARGS)
                                676                 :                : {
  969 michael@paquier.xyz       677                 :              2 :     stop_reason = STREAM_STOP_SIGNAL;
 4380 rhaas@postgresql.org      678                 :              2 :     time_to_abort = true;
                                679                 :              2 : }
                                680                 :                : 
                                681                 :                : /*
                                682                 :                :  * Trigger the output file to be reopened.
                                683                 :                :  */
                                684                 :                : static void
 1278 tgl@sss.pgh.pa.us         685                 :UBC           0 : sighup_handler(SIGNAL_ARGS)
                                686                 :                : {
 4380 rhaas@postgresql.org      687                 :              0 :     output_reopen = true;
                                688                 :              0 : }
                                689                 :                : #endif
                                690                 :                : 
                                691                 :                : 
                                692                 :                : int
 4380 rhaas@postgresql.org      693                 :CBC          64 : main(int argc, char **argv)
                                694                 :                : {
                                695                 :                :     static struct option long_options[] = {
                                696                 :                : /* general options */
                                697                 :                :         {"file", required_argument, NULL, 'f'},
                                698                 :                :         {"fsync-interval", required_argument, NULL, 'F'},
                                699                 :                :         {"no-loop", no_argument, NULL, 'n'},
                                700                 :                :         {"enable-failover", no_argument, NULL, 5},
                                701                 :                :         {"enable-two-phase", no_argument, NULL, 't'},
                                702                 :                :         {"two-phase", no_argument, NULL, 't'},    /* deprecated */
                                703                 :                :         {"verbose", no_argument, NULL, 'v'},
                                704                 :                :         {"version", no_argument, NULL, 'V'},
                                705                 :                :         {"help", no_argument, NULL, '?'},
                                706                 :                : /* connection options */
                                707                 :                :         {"dbname", required_argument, NULL, 'd'},
                                708                 :                :         {"host", required_argument, NULL, 'h'},
                                709                 :                :         {"port", required_argument, NULL, 'p'},
                                710                 :                :         {"username", required_argument, NULL, 'U'},
                                711                 :                :         {"no-password", no_argument, NULL, 'w'},
                                712                 :                :         {"password", no_argument, NULL, 'W'},
                                713                 :                : /* replication options */
                                714                 :                :         {"startpos", required_argument, NULL, 'I'},
                                715                 :                :         {"endpos", required_argument, NULL, 'E'},
                                716                 :                :         {"option", required_argument, NULL, 'o'},
                                717                 :                :         {"plugin", required_argument, NULL, 'P'},
                                718                 :                :         {"status-interval", required_argument, NULL, 's'},
                                719                 :                :         {"slot", required_argument, NULL, 'S'},
                                720                 :                : /* action */
                                721                 :                :         {"create-slot", no_argument, NULL, 1},
                                722                 :                :         {"start", no_argument, NULL, 2},
                                723                 :                :         {"drop-slot", no_argument, NULL, 3},
                                724                 :                :         {"if-not-exists", no_argument, NULL, 4},
                                725                 :                :         {NULL, 0, NULL, 0}
                                726                 :                :     };
                                727                 :                :     int         c;
                                728                 :                :     int         option_index;
                                729                 :                :     uint32      hi,
                                730                 :                :                 lo;
                                731                 :                :     char       *db_name;
                                732                 :                : 
 2540 peter@eisentraut.org      733                 :             64 :     pg_logging_init(argv[0]);
 4380 rhaas@postgresql.org      734                 :             64 :     progname = get_progname(argv[0]);
 3730 alvherre@alvh.no-ip.      735                 :             64 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                736                 :                : 
 4380 rhaas@postgresql.org      737         [ +  + ]:             64 :     if (argc > 1)
                                738                 :                :     {
                                739   [ +  +  -  + ]:             63 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
                                740                 :                :         {
                                741                 :              1 :             usage();
                                742                 :              1 :             exit(0);
                                743                 :                :         }
                                744         [ +  - ]:             62 :         else if (strcmp(argv[1], "-V") == 0 ||
                                745         [ +  + ]:             62 :                  strcmp(argv[1], "--version") == 0)
                                746                 :                :         {
                                747                 :              1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
                                748                 :              1 :             exit(0);
                                749                 :                :         }
                                750                 :                :     }
                                751                 :                : 
 1189 peter@eisentraut.org      752                 :            365 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
 4380 rhaas@postgresql.org      753         [ +  + ]:            365 :                             long_options, &option_index)) != -1)
                                754                 :                :     {
                                755   [ +  +  +  +  :            304 :         switch (c)
                                     +  +  +  -  -  
                                     -  -  -  -  +  
                                     +  +  +  +  +  
                                        +  +  -  + ]
                                756                 :                :         {
                                757                 :                : /* general options */
                                758                 :             25 :             case 'f':
                                759                 :             25 :                 outfile = pg_strdup(optarg);
                                760                 :             25 :                 break;
 4312 andres@anarazel.de        761                 :GBC           1 :             case 'F':
 1695 michael@paquier.xyz       762         [ -  + ]:              1 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
                                763                 :                :                                       INT_MAX / 1000,
                                764                 :                :                                       &fsync_interval))
 4312 andres@anarazel.de        765                 :UBC           0 :                     exit(1);
 1695 michael@paquier.xyz       766                 :GBC           1 :                 fsync_interval *= 1000;
 4312 andres@anarazel.de        767                 :              1 :                 break;
 4380 rhaas@postgresql.org      768                 :CBC          23 :             case 'n':
                                769                 :             23 :                 noloop = 1;
                                770                 :             23 :                 break;
 1719 akapila@postgresql.o      771                 :              2 :             case 't':
                                772                 :              2 :                 two_phase = true;
                                773                 :              2 :                 break;
 1189 peter@eisentraut.org      774                 :GBC           1 :             case 'v':
                                775                 :              1 :                 verbose++;
                                776                 :              1 :                 break;
  345 msawada@postgresql.o      777                 :CBC           1 :             case 5:
                                778                 :              1 :                 failover = true;
                                779                 :              1 :                 break;
                                780                 :                : /* connection options */
 4380 rhaas@postgresql.org      781                 :             58 :             case 'd':
                                782                 :             58 :                 dbname = pg_strdup(optarg);
                                783                 :             58 :                 break;
 4380 rhaas@postgresql.org      784                 :UBC           0 :             case 'h':
                                785                 :              0 :                 dbhost = pg_strdup(optarg);
                                786                 :              0 :                 break;
                                787                 :              0 :             case 'p':
                                788                 :              0 :                 dbport = pg_strdup(optarg);
                                789                 :              0 :                 break;
                                790                 :              0 :             case 'U':
                                791                 :              0 :                 dbuser = pg_strdup(optarg);
                                792                 :              0 :                 break;
                                793                 :              0 :             case 'w':
                                794                 :              0 :                 dbgetpassword = -1;
                                795                 :              0 :                 break;
                                796                 :              0 :             case 'W':
                                797                 :              0 :                 dbgetpassword = 1;
                                798                 :              0 :                 break;
                                799                 :                : /* replication options */
 4312 andres@anarazel.de        800                 :              0 :             case 'I':
  251 alvherre@kurilemu.de      801         [ #  # ]:UNC           0 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1437 tgl@sss.pgh.pa.us         802                 :UBC           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
 4312 andres@anarazel.de        803                 :              0 :                 startpos = ((uint64) hi) << 32 | lo;
                                804                 :              0 :                 break;
 3357 simon@2ndQuadrant.co      805                 :CBC           8 :             case 'E':
  251 alvherre@kurilemu.de      806         [ -  + ]:GNC           8 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1437 tgl@sss.pgh.pa.us         807                 :UBC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
 3357 simon@2ndQuadrant.co      808                 :CBC           8 :                 endpos = ((uint64) hi) << 32 | lo;
                                809                 :              8 :                 break;
 4380 rhaas@postgresql.org      810                 :             40 :             case 'o':
                                811                 :                :                 {
 4331 bruce@momjian.us          812                 :             40 :                     char       *data = pg_strdup(optarg);
                                813                 :             40 :                     char       *val = strchr(data, '=');
                                814                 :                : 
 4380 rhaas@postgresql.org      815         [ +  - ]:             40 :                     if (val != NULL)
                                816                 :                :                     {
                                817                 :                :                         /* remove =; separate data from val */
                                818                 :             40 :                         *val = '\0';
                                819                 :             40 :                         val++;
                                820                 :                :                     }
                                821                 :                : 
                                822                 :             40 :                     noptions += 1;
   16 michael@paquier.xyz       823                 :GNC          40 :                     options = pg_realloc_array(options, char *, noptions * 2);
                                824                 :                : 
 4380 rhaas@postgresql.org      825                 :CBC          40 :                     options[(noptions - 1) * 2] = data;
                                826                 :             40 :                     options[(noptions - 1) * 2 + 1] = val;
                                827                 :                :                 }
                                828                 :                : 
                                829                 :             40 :                 break;
                                830                 :             25 :             case 'P':
                                831                 :             25 :                 plugin = pg_strdup(optarg);
                                832                 :             25 :                 break;
 4380 rhaas@postgresql.org      833                 :GBC           1 :             case 's':
 1695 michael@paquier.xyz       834         [ -  + ]:              1 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
                                835                 :                :                                       INT_MAX / 1000,
                                836                 :                :                                       &standby_message_timeout))
 4380 rhaas@postgresql.org      837                 :UBC           0 :                     exit(1);
 1695 michael@paquier.xyz       838                 :GBC           1 :                 standby_message_timeout *= 1000;
 4380 rhaas@postgresql.org      839                 :              1 :                 break;
 4380 rhaas@postgresql.org      840                 :CBC          60 :             case 'S':
                                841                 :             60 :                 replication_slot = pg_strdup(optarg);
                                842                 :             60 :                 break;
                                843                 :                : /* action */
                                844                 :             29 :             case 1:
                                845                 :             29 :                 do_create_slot = true;
                                846                 :             29 :                 break;
                                847                 :             26 :             case 2:
                                848                 :             26 :                 do_start_slot = true;
                                849                 :             26 :                 break;
                                850                 :              3 :             case 3:
                                851                 :              3 :                 do_drop_slot = true;
                                852                 :              3 :                 break;
 3899 andres@anarazel.de        853                 :UBC           0 :             case 4:
                                854                 :              0 :                 slot_exists_ok = true;
                                855                 :              0 :                 break;
                                856                 :                : 
 4380 rhaas@postgresql.org      857                 :CBC           1 :             default:
                                858                 :                :                 /* getopt_long already emitted a complaint */
 1437 tgl@sss.pgh.pa.us         859                 :              1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      860                 :              1 :                 exit(1);
                                861                 :                :         }
                                862                 :                :     }
                                863                 :                : 
                                864                 :                :     /*
                                865                 :                :      * Any non-option arguments?
                                866                 :                :      */
                                867         [ -  + ]:             61 :     if (optind < argc)
                                868                 :                :     {
 2540 peter@eisentraut.org      869                 :UBC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
                                870                 :                :                      argv[optind]);
 1437 tgl@sss.pgh.pa.us         871                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      872                 :              0 :         exit(1);
                                873                 :                :     }
                                874                 :                : 
                                875                 :                :     /*
                                876                 :                :      * Required arguments
                                877                 :                :      */
 4380 rhaas@postgresql.org      878         [ +  + ]:CBC          61 :     if (replication_slot == NULL)
                                879                 :                :     {
 2540 peter@eisentraut.org      880                 :              1 :         pg_log_error("no slot specified");
 1437 tgl@sss.pgh.pa.us         881                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      882                 :              1 :         exit(1);
                                883                 :                :     }
                                884                 :                : 
                                885   [ +  +  +  + ]:             60 :     if (do_start_slot && outfile == NULL)
                                886                 :                :     {
 2540 peter@eisentraut.org      887                 :              1 :         pg_log_error("no target file specified");
 1437 tgl@sss.pgh.pa.us         888                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      889                 :              1 :         exit(1);
                                890                 :                :     }
                                891                 :                : 
                                892   [ +  +  +  + ]:             59 :     if (!do_drop_slot && dbname == NULL)
                                893                 :                :     {
 2540 peter@eisentraut.org      894                 :              1 :         pg_log_error("no database specified");
 1437 tgl@sss.pgh.pa.us         895                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      896                 :              1 :         exit(1);
                                897                 :                :     }
                                898                 :                : 
                                899   [ +  +  +  +  :             58 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
                                              +  + ]
                                900                 :                :     {
 2540 peter@eisentraut.org      901                 :              1 :         pg_log_error("at least one action needs to be specified");
 1437 tgl@sss.pgh.pa.us         902                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      903                 :              1 :         exit(1);
                                904                 :                :     }
                                905                 :                : 
                                906   [ +  +  +  -  :             57 :     if (do_drop_slot && (do_create_slot || do_start_slot))
                                              -  + ]
                                907                 :                :     {
 2540 peter@eisentraut.org      908                 :UBC           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
 1437 tgl@sss.pgh.pa.us         909                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      910                 :              0 :         exit(1);
                                911                 :                :     }
                                912                 :                : 
  129 alvherre@kurilemu.de      913   [ -  +  -  -  :GNC          57 :     if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
                                              -  - ]
                                914                 :                :     {
 2540 peter@eisentraut.org      915                 :UBC           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
 1437 tgl@sss.pgh.pa.us         916                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4380 rhaas@postgresql.org      917                 :              0 :         exit(1);
                                918                 :                :     }
                                919                 :                : 
  129 alvherre@kurilemu.de      920   [ +  +  -  + ]:GNC          57 :     if (XLogRecPtrIsValid(endpos) && !do_start_slot)
                                921                 :                :     {
 2540 peter@eisentraut.org      922                 :UBC           0 :         pg_log_error("--endpos may only be specified with --start");
 1437 tgl@sss.pgh.pa.us         923                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3357 simon@2ndQuadrant.co      924                 :              0 :         exit(1);
                                925                 :                :     }
                                926                 :                : 
  345 msawada@postgresql.o      927         [ +  + ]:CBC          57 :     if (!do_create_slot)
                                928                 :                :     {
                                929         [ +  + ]:             28 :         if (two_phase)
                                930                 :                :         {
  259 peter@eisentraut.org      931                 :              1 :             pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
  345 msawada@postgresql.o      932                 :              1 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                933                 :              1 :             exit(1);
                                934                 :                :         }
                                935                 :                : 
                                936         [ -  + ]:             27 :         if (failover)
                                937                 :                :         {
  259 peter@eisentraut.org      938                 :UBC           0 :             pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
  345 msawada@postgresql.o      939                 :              0 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                940                 :              0 :             exit(1);
                                941                 :                :         }
                                942                 :                :     }
                                943                 :                : 
                                944                 :                :     /*
                                945                 :                :      * Obtain a connection to server.  Notably, if we need a password, we want
                                946                 :                :      * to collect it from the user immediately.
                                947                 :                :      */
 4183 andres@anarazel.de        948                 :CBC          56 :     conn = GetConnection();
                                949         [ -  + ]:             56 :     if (!conn)
                                950                 :                :         /* Error message already written in GetConnection() */
 4183 andres@anarazel.de        951                 :UBC           0 :         exit(1);
 2633 peter@eisentraut.org      952                 :CBC          56 :     atexit(disconnect_atexit);
                                953                 :                : 
                                954                 :                :     /*
                                955                 :                :      * Trap signals.  (Don't do this until after the initial password prompt,
                                956                 :                :      * if one is needed, in GetConnection.)
                                957                 :                :      */
                                958                 :                : #ifndef WIN32
 1278 dgustafsson@postgres      959                 :             56 :     pqsignal(SIGINT, sigexit_handler);
                                960                 :             56 :     pqsignal(SIGTERM, sigexit_handler);
 1575 tgl@sss.pgh.pa.us         961                 :             56 :     pqsignal(SIGHUP, sighup_handler);
                                962                 :                : #endif
                                963                 :                : 
                                964                 :                :     /*
                                965                 :                :      * Run IDENTIFY_SYSTEM to check the connection type for each action.
                                966                 :                :      * --create-slot and --start actions require a database-specific
                                967                 :                :      * replication connection because they handle logical replication slots.
                                968                 :                :      * --drop-slot can remove replication slots from any replication
                                969                 :                :      * connection without this restriction.
                                970                 :                :      */
 4183 andres@anarazel.de        971         [ -  + ]:             56 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 2633 peter@eisentraut.org      972                 :UBC           0 :         exit(1);
                                973                 :                : 
  355 fujii@postgresql.org      974   [ +  +  -  + ]:CBC          56 :     if (!do_drop_slot && db_name == NULL)
 1437 tgl@sss.pgh.pa.us         975                 :UBC           0 :         pg_fatal("could not establish database-specific replication connection");
                                976                 :                : 
                                977                 :                :     /*
                                978                 :                :      * Set umask so that directories/files are created with the same
                                979                 :                :      * permissions as directories/files in the source data directory.
                                980                 :                :      *
                                981                 :                :      * pg_mode_mask is set to owner-only by default and then updated in
                                982                 :                :      * GetConnection() where we get the mode from the server-side with
                                983                 :                :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
                                984                 :                :      */
 2899 sfrost@snowman.net        985                 :CBC          56 :     umask(pg_mode_mask);
                                986                 :                : 
                                987                 :                :     /* Drop a replication slot. */
 4380 rhaas@postgresql.org      988         [ +  + ]:             56 :     if (do_drop_slot)
                                989                 :                :     {
                                990         [ -  + ]:              3 :         if (verbose)
 2540 peter@eisentraut.org      991                 :UBC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
                                992                 :                : 
 4183 andres@anarazel.de        993         [ -  + ]:CBC           3 :         if (!DropReplicationSlot(conn, replication_slot))
 2633 peter@eisentraut.org      994                 :UBC           0 :             exit(1);
                                995                 :                :     }
                                996                 :                : 
                                997                 :                :     /* Create a replication slot. */
 4380 rhaas@postgresql.org      998         [ +  + ]:CBC          56 :     if (do_create_slot)
                                999                 :                :     {
                               1000         [ -  + ]:             29 :         if (verbose)
 2540 peter@eisentraut.org     1001                 :UBC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
                               1002                 :                : 
 3092 peter_e@gmx.net          1003         [ -  + ]:CBC          29 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
                               1004                 :                :                                    false, false, slot_exists_ok, two_phase,
                               1005                 :                :                                    failover))
 2633 peter@eisentraut.org     1006                 :UBC           0 :             exit(1);
 3899 andres@anarazel.de       1007                 :CBC          29 :         startpos = InvalidXLogRecPtr;
                               1008                 :                :     }
                               1009                 :                : 
 4380 rhaas@postgresql.org     1010         [ +  + ]:             56 :     if (!do_start_slot)
 2633 peter@eisentraut.org     1011                 :             32 :         exit(0);
                               1012                 :                : 
                               1013                 :                :     /* Stream loop */
                               1014                 :                :     while (true)
                               1015                 :                :     {
 4183 andres@anarazel.de       1016                 :             25 :         StreamLogicalLog();
 4380 rhaas@postgresql.org     1017         [ +  + ]:             25 :         if (time_to_abort)
                               1018                 :                :         {
                               1019                 :                :             /*
                               1020                 :                :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
                               1021                 :                :              * not an error, so exit without an errorcode.
                               1022                 :                :              */
 2633 peter@eisentraut.org     1023                 :              9 :             exit(0);
                               1024                 :                :         }
                               1025                 :                : 
                               1026                 :                :         /*
                               1027                 :                :          * Ensure all written data is flushed to disk before exiting or
                               1028                 :                :          * starting a new replication.
                               1029                 :                :          */
   58 fujii@postgresql.org     1030         [ +  + ]:GNC          16 :         if (outfd != -1)
                               1031                 :              9 :             OutputFsync(feGetCurrentTimestamp());
                               1032                 :                : 
                               1033         [ +  + ]:             16 :         if (noloop)
                               1034                 :                :         {
 1437 tgl@sss.pgh.pa.us        1035                 :CBC          15 :             pg_fatal("disconnected");
                               1036                 :                :         }
                               1037                 :                :         else
                               1038                 :                :         {
                               1039                 :                :             /* translator: check source for value for %d */
 2540 peter@eisentraut.org     1040                 :GBC           1 :             pg_log_info("disconnected; waiting %d seconds to try again",
                               1041                 :                :                         RECONNECT_SLEEP_TIME);
 4380 rhaas@postgresql.org     1042                 :              1 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                               1043                 :                :         }
                               1044                 :                :     }
                               1045                 :                : }
                               1046                 :                : 
                               1047                 :                : /*
                               1048                 :                :  * Fsync our output data, and send a feedback message to the server.  Returns
                               1049                 :                :  * true if successful, false otherwise.
                               1050                 :                :  *
                               1051                 :                :  * If successful, *now is updated to the current timestamp just before sending
                               1052                 :                :  * feedback.
                               1053                 :                :  */
                               1054                 :                : static bool
 3357 simon@2ndQuadrant.co     1055                 :CBC           8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
                               1056                 :                : {
                               1057                 :                :     /* flush data to disk, so that we send a recent flush pointer */
   58 fujii@postgresql.org     1058                 :GNC           8 :     OutputFsync(*now);
 3357 simon@2ndQuadrant.co     1059                 :CBC           8 :     *now = feGetCurrentTimestamp();
                               1060         [ -  + ]:              8 :     if (!sendFeedback(conn, *now, true, false))
 3357 simon@2ndQuadrant.co     1061                 :UBC           0 :         return false;
                               1062                 :                : 
 3357 simon@2ndQuadrant.co     1063                 :CBC           8 :     return true;
                               1064                 :                : }
                               1065                 :                : 
                               1066                 :                : /*
                               1067                 :                :  * Try to inform the server about our upcoming demise, but don't wait around or
                               1068                 :                :  * retry on failure.
                               1069                 :                :  */
                               1070                 :                : static void
  969 michael@paquier.xyz      1071                 :              9 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
                               1072                 :                :                    XLogRecPtr lsn)
                               1073                 :                : {
 3357 simon@2ndQuadrant.co     1074                 :              9 :     (void) PQputCopyEnd(conn, NULL);
                               1075                 :              9 :     (void) PQflush(conn);
                               1076                 :                : 
                               1077         [ +  + ]:              9 :     if (verbose)
                               1078                 :                :     {
  969 michael@paquier.xyz      1079   [ +  -  -  -  :GBC           1 :         switch (reason)
                                                 - ]
                               1080                 :                :         {
                               1081                 :              1 :             case STREAM_STOP_SIGNAL:
                               1082                 :              1 :                 pg_log_info("received interrupt signal, exiting");
                               1083                 :              1 :                 break;
  969 michael@paquier.xyz      1084                 :UBC           0 :             case STREAM_STOP_KEEPALIVE:
  251 alvherre@kurilemu.de     1085                 :UNC           0 :                 pg_log_info("end position %X/%08X reached by keepalive",
                               1086                 :                :                             LSN_FORMAT_ARGS(endpos));
  969 michael@paquier.xyz      1087                 :UBC           0 :                 break;
                               1088                 :              0 :             case STREAM_STOP_END_OF_WAL:
  129 alvherre@kurilemu.de     1089         [ #  # ]:UNC           0 :                 Assert(XLogRecPtrIsValid(lsn));
  251                          1090                 :              0 :                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
                               1091                 :                :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
  969 michael@paquier.xyz      1092                 :UBC           0 :                 break;
                               1093                 :              0 :             case STREAM_STOP_NONE:
                               1094                 :              0 :                 Assert(false);
                               1095                 :                :                 break;
                               1096                 :                :         }
                               1097                 :                :     }
 3357 simon@2ndQuadrant.co     1098                 :CBC           9 : }
        

Generated by: LCOV version 2.4-beta