LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB DCB
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 69.2 % 487 337 5 145 7 330 5 7
Current Date: 2025-09-06 07:49:51 +0900 Functions: 90.0 % 10 9 1 4 5
Baseline: lcov-20250906-005545-baseline Branches: 61.1 % 296 181 4 111 4 177
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 70.0 % 30 21 5 4 7 14
(360..) days: 69.1 % 457 316 141 316
Function coverage date bins:
(360..) days: 90.0 % 10 9 1 4 5
Branch coverage date bins:
(30,360] days: 66.7 % 18 12 4 2 4 8
(360..) days: 60.8 % 278 169 109 169

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
                                  4                 :                :  *                    fashion and write it to a local file.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/bin/pg_basebackup/pg_recvlogical.c
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : 
                                 13                 :                : #include "postgres_fe.h"
                                 14                 :                : 
                                 15                 :                : #include <dirent.h>
                                 16                 :                : #include <limits.h>
                                 17                 :                : #include <sys/select.h>
                                 18                 :                : #include <sys/stat.h>
                                 19                 :                : #include <unistd.h>
                                 20                 :                : 
                                 21                 :                : #include "common/file_perm.h"
                                 22                 :                : #include "common/logging.h"
                                 23                 :                : #include "fe_utils/option_utils.h"
                                 24                 :                : #include "getopt_long.h"
                                 25                 :                : #include "libpq-fe.h"
                                 26                 :                : #include "libpq/pqsignal.h"
                                 27                 :                : #include "libpq/protocol.h"
                                 28                 :                : #include "pqexpbuffer.h"
                                 29                 :                : #include "streamutil.h"
                                 30                 :                : 
                                 31                 :                : /* Time to sleep between reconnection attempts */
                                 32                 :                : #define RECONNECT_SLEEP_TIME 5
                                 33                 :                : 
                                 34                 :                : typedef enum
                                 35                 :                : {
                                 36                 :                :     STREAM_STOP_NONE,
                                 37                 :                :     STREAM_STOP_END_OF_WAL,
                                 38                 :                :     STREAM_STOP_KEEPALIVE,
                                 39                 :                :     STREAM_STOP_SIGNAL
                                 40                 :                : } StreamStopReason;
                                 41                 :                : 
                                 42                 :                : /* Global Options */
                                 43                 :                : static char *outfile = NULL;
                                 44                 :                : static int  verbose = 0;
                                 45                 :                : static bool two_phase = false;  /* enable-two-phase option */
                                 46                 :                : static bool failover = false;   /* enable-failover option */
                                 47                 :                : static int  noloop = 0;
                                 48                 :                : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 49                 :                : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
                                 50                 :                : static XLogRecPtr startpos = InvalidXLogRecPtr;
                                 51                 :                : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 52                 :                : static bool do_create_slot = false;
                                 53                 :                : static bool slot_exists_ok = false;
                                 54                 :                : static bool do_start_slot = false;
                                 55                 :                : static bool do_drop_slot = false;
                                 56                 :                : static char *replication_slot = NULL;
                                 57                 :                : 
                                 58                 :                : /* filled pairwise with option, value. value may be NULL */
                                 59                 :                : static char **options;
                                 60                 :                : static size_t noptions = 0;
                                 61                 :                : static const char *plugin = "test_decoding";
                                 62                 :                : 
                                 63                 :                : /* Global State */
                                 64                 :                : static int  outfd = -1;
                                 65                 :                : static volatile sig_atomic_t time_to_abort = false;
                                 66                 :                : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
                                 67                 :                : static volatile sig_atomic_t output_reopen = false;
                                 68                 :                : static bool output_isfile;
                                 69                 :                : static TimestampTz output_last_fsync = -1;
                                 70                 :                : static bool output_needs_fsync = false;
                                 71                 :                : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
                                 72                 :                : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
                                 73                 :                : 
                                 74                 :                : static void usage(void);
                                 75                 :                : static void StreamLogicalLog(void);
                                 76                 :                : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
                                 77                 :                : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                                 78                 :                :                                StreamStopReason reason,
                                 79                 :                :                                XLogRecPtr lsn);
                                 80                 :                : 
                                 81                 :                : static void
 4190 rhaas@postgresql.org       82                 :CBC           1 : usage(void)
                                 83                 :                : {
 3823 bruce@momjian.us           84                 :              1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
                                 85                 :                :            progname);
 4190 rhaas@postgresql.org       86                 :              1 :     printf(_("Usage:\n"));
                                 87                 :              1 :     printf(_("  %s [OPTION]...\n"), progname);
 3982 peter_e@gmx.net            88                 :              1 :     printf(_("\nAction to be performed:\n"));
                                 89                 :              1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                 90                 :              1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
                                 91                 :              1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 4190 rhaas@postgresql.org       92                 :              1 :     printf(_("\nOptions:\n"));
   69 peter@eisentraut.org       93                 :              1 :     printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
                                 94                 :                :              "                         creating a replication slot\n"));
                                 95                 :              1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 3982 peter_e@gmx.net            96                 :              1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 4122 andres@anarazel.de         97                 :              1 :     printf(_("  -F  --fsync-interval=SECS\n"
                                 98                 :                :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 3643 peter_e@gmx.net            99                 :              1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 3982                           100                 :              1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
 4190 rhaas@postgresql.org      101                 :              1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 3982 peter_e@gmx.net           102                 :              1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
                                103                 :                :              "                         pass option NAME with optional value VALUE to the\n"
                                104                 :                :              "                         output plugin\n"));
                                105                 :              1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
                                106                 :              1 :     printf(_("  -s, --status-interval=SECS\n"
                                107                 :                :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
                                108                 :              1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
   69 peter@eisentraut.org      109                 :              1 :     printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
                                110                 :              1 :     printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
 4190 rhaas@postgresql.org      111                 :              1 :     printf(_("  -v, --verbose          output verbose messages\n"));
                                112                 :              1 :     printf(_("  -V, --version          output version information, then exit\n"));
                                113                 :              1 :     printf(_("  -?, --help             show this help, then exit\n"));
                                114                 :              1 :     printf(_("\nConnection options:\n"));
                                115                 :              1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
                                116                 :              1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                117                 :              1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                118                 :              1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                119                 :              1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                120                 :              1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 2017 peter@eisentraut.org      121                 :              1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
                                122                 :              1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 4190 rhaas@postgresql.org      123                 :              1 : }
                                124                 :                : 
                                125                 :                : /*
                                126                 :                :  * Send a Standby Status Update message to server.
                                127                 :                :  */
                                128                 :                : static bool
 3117 tgl@sss.pgh.pa.us         129                 :             11 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                                130                 :                : {
                                131                 :                :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
                                132                 :                :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
                                133                 :                : 
                                134                 :                :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 4190 rhaas@postgresql.org      135                 :             11 :     int         len = 0;
                                136                 :                : 
                                137                 :                :     /*
                                138                 :                :      * we normally don't want to send superfluous feedback, but if it's
                                139                 :                :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
                                140                 :                :      * us.
                                141                 :                :      */
                                142         [ -  + ]:             11 :     if (!force &&
 4190 rhaas@postgresql.org      143         [ #  # ]:UBC           0 :         last_written_lsn == output_written_lsn &&
 1942 noah@leadboat.com         144         [ #  # ]:              0 :         last_fsync_lsn == output_fsync_lsn)
 4190 rhaas@postgresql.org      145                 :              0 :         return true;
                                146                 :                : 
 4190 rhaas@postgresql.org      147         [ -  + ]:CBC          11 :     if (verbose)
   61 alvherre@kurilemu.de      148                 :UNC           0 :         pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
                                149                 :                :                     LSN_FORMAT_ARGS(output_written_lsn),
                                150                 :                :                     LSN_FORMAT_ARGS(output_fsync_lsn),
                                151                 :                :                     replication_slot);
                                152                 :                : 
   31 nathan@postgresql.or      153                 :GNC          11 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
 4190 rhaas@postgresql.org      154                 :CBC          11 :     len += 1;
 4141 bruce@momjian.us          155                 :             11 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
 4190 rhaas@postgresql.org      156                 :             11 :     len += 8;
 2999 tgl@sss.pgh.pa.us         157                 :             11 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
 4190 rhaas@postgresql.org      158                 :             11 :     len += 8;
 4141 bruce@momjian.us          159                 :             11 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 4190 rhaas@postgresql.org      160                 :             11 :     len += 8;
 4141 bruce@momjian.us          161                 :             11 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 4190 rhaas@postgresql.org      162                 :             11 :     len += 8;
 2999 tgl@sss.pgh.pa.us         163                 :             11 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 4190 rhaas@postgresql.org      164                 :             11 :     len += 1;
                                165                 :                : 
                                166                 :             11 :     startpos = output_written_lsn;
                                167                 :             11 :     last_written_lsn = output_written_lsn;
                                168                 :             11 :     last_fsync_lsn = output_fsync_lsn;
                                169                 :                : 
                                170   [ +  -  -  + ]:             11 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                171                 :                :     {
 2350 peter@eisentraut.org      172                 :UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                173                 :                :                      PQerrorMessage(conn));
 4190 rhaas@postgresql.org      174                 :              0 :         return false;
                                175                 :                :     }
                                176                 :                : 
 4190 rhaas@postgresql.org      177                 :CBC          11 :     return true;
                                178                 :                : }
                                179                 :                : 
                                180                 :                : static void
 2443 peter@eisentraut.org      181                 :             11 : disconnect_atexit(void)
                                182                 :                : {
 4190 rhaas@postgresql.org      183         [ +  + ]:             11 :     if (conn != NULL)
                                184                 :              5 :         PQfinish(conn);
                                185                 :             11 : }
                                186                 :                : 
                                187                 :                : static bool
 3117 tgl@sss.pgh.pa.us         188                 :             11 : OutputFsync(TimestampTz now)
                                189                 :                : {
 4190 rhaas@postgresql.org      190                 :             11 :     output_last_fsync = now;
                                191                 :                : 
                                192                 :             11 :     output_fsync_lsn = output_written_lsn;
                                193                 :                : 
                                194         [ -  + ]:             11 :     if (fsync_interval <= 0)
 4190 rhaas@postgresql.org      195                 :UBC           0 :         return true;
                                196                 :                : 
 4132 heikki.linnakangas@i      197         [ +  + ]:CBC          11 :     if (!output_needs_fsync)
 4190 rhaas@postgresql.org      198                 :              7 :         return true;
                                199                 :                : 
 4132 heikki.linnakangas@i      200                 :              4 :     output_needs_fsync = false;
                                201                 :                : 
                                202                 :                :     /* can only fsync if it's a regular file */
 3714 andres@anarazel.de        203         [ +  + ]:              4 :     if (!output_isfile)
                                204                 :              2 :         return true;
                                205                 :                : 
                                206         [ -  + ]:              2 :     if (fsync(outfd) != 0)
 1247 tgl@sss.pgh.pa.us         207                 :UBC           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
                                208                 :                : 
 4190 rhaas@postgresql.org      209                 :CBC           2 :     return true;
                                210                 :                : }
                                211                 :                : 
                                212                 :                : /*
                                213                 :                :  * Start the log streaming
                                214                 :                :  */
                                215                 :                : static void
 3995 andres@anarazel.de        216                 :              6 : StreamLogicalLog(void)
                                217                 :                : {
                                218                 :                :     PGresult   *res;
 4190 rhaas@postgresql.org      219                 :              6 :     char       *copybuf = NULL;
 3117 tgl@sss.pgh.pa.us         220                 :              6 :     TimestampTz last_status = -1;
                                221                 :                :     int         i;
                                222                 :                :     PQExpBuffer query;
                                223                 :                :     XLogRecPtr  cur_record_lsn;
                                224                 :                : 
 4190 rhaas@postgresql.org      225                 :              6 :     output_written_lsn = InvalidXLogRecPtr;
                                226                 :              6 :     output_fsync_lsn = InvalidXLogRecPtr;
  779 michael@paquier.xyz       227                 :              6 :     cur_record_lsn = InvalidXLogRecPtr;
                                228                 :                : 
                                229                 :                :     /*
                                230                 :                :      * Connect in replication mode to the server
                                231                 :                :      */
 4190 rhaas@postgresql.org      232         [ -  + ]:              6 :     if (!conn)
 4190 rhaas@postgresql.org      233                 :UBC           0 :         conn = GetConnection();
 4190 rhaas@postgresql.org      234         [ -  + ]:CBC           6 :     if (!conn)
                                235                 :                :         /* Error message already written in GetConnection() */
 4190 rhaas@postgresql.org      236                 :UBC           0 :         return;
                                237                 :                : 
                                238                 :                :     /*
                                239                 :                :      * Start the replication
                                240                 :                :      */
 4190 rhaas@postgresql.org      241         [ -  + ]:CBC           6 :     if (verbose)
   61 alvherre@kurilemu.de      242                 :UNC           0 :         pg_log_info("starting log streaming at %X/%08X (slot %s)",
                                243                 :                :                     LSN_FORMAT_ARGS(startpos),
                                244                 :                :                     replication_slot);
                                245                 :                : 
                                246                 :                :     /* Initiate the replication stream at specified location */
 1385 tgl@sss.pgh.pa.us         247                 :CBC           6 :     query = createPQExpBuffer();
   61 alvherre@kurilemu.de      248                 :GNC           6 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
 1656 peter@eisentraut.org      249                 :CBC           6 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
                                250                 :                : 
                                251                 :                :     /* print options if there are any */
 4190 rhaas@postgresql.org      252         [ +  + ]:              6 :     if (noptions)
                                253                 :              3 :         appendPQExpBufferStr(query, " (");
                                254                 :                : 
                                255         [ +  + ]:             12 :     for (i = 0; i < noptions; i++)
                                256                 :                :     {
                                257                 :                :         /* separator */
                                258         [ +  + ]:              6 :         if (i > 0)
                                259                 :              3 :             appendPQExpBufferStr(query, ", ");
                                260                 :                : 
                                261                 :                :         /* write option name */
                                262                 :              6 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
                                263                 :                : 
                                264                 :                :         /* write option value if specified */
                                265         [ +  - ]:              6 :         if (options[(i * 2) + 1] != NULL)
                                266                 :              6 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
                                267                 :                :     }
                                268                 :                : 
                                269         [ +  + ]:              6 :     if (noptions)
                                270                 :              3 :         appendPQExpBufferChar(query, ')');
                                271                 :                : 
                                272                 :              6 :     res = PQexec(conn, query->data);
                                273         [ -  + ]:              6 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                274                 :                :     {
 2350 peter@eisentraut.org      275                 :UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
                                276                 :                :                      query->data, PQresultErrorMessage(res));
 4190 rhaas@postgresql.org      277                 :              0 :         PQclear(res);
                                278                 :              0 :         goto error;
                                279                 :                :     }
 4190 rhaas@postgresql.org      280                 :CBC           6 :     PQclear(res);
                                281                 :              6 :     resetPQExpBuffer(query);
                                282                 :                : 
                                283         [ -  + ]:              6 :     if (verbose)
 2350 peter@eisentraut.org      284                 :UBC           0 :         pg_log_info("streaming initiated");
                                285                 :                : 
 4190 rhaas@postgresql.org      286         [ +  + ]:CBC          68 :     while (!time_to_abort)
                                287                 :                :     {
                                288                 :                :         int         r;
                                289                 :                :         int         bytes_left;
                                290                 :                :         int         bytes_written;
                                291                 :                :         TimestampTz now;
                                292                 :                :         int         hdr_len;
                                293                 :                : 
  779 michael@paquier.xyz       294                 :             67 :         cur_record_lsn = InvalidXLogRecPtr;
                                295                 :                : 
 4190 rhaas@postgresql.org      296         [ +  + ]:             67 :         if (copybuf != NULL)
                                297                 :                :         {
                                298                 :             35 :             PQfreemem(copybuf);
                                299                 :             35 :             copybuf = NULL;
                                300                 :                :         }
                                301                 :                : 
                                302                 :                :         /*
                                303                 :                :          * Potentially send a status message to the primary.
                                304                 :                :          */
                                305                 :             67 :         now = feGetCurrentTimestamp();
                                306                 :                : 
                                307   [ +  +  +  + ]:            128 :         if (outfd != -1 &&
                                308                 :             61 :             feTimestampDifferenceExceeds(output_last_fsync, now,
                                309                 :                :                                          fsync_interval))
                                310                 :                :         {
                                311         [ -  + ]:              6 :             if (!OutputFsync(now))
 4190 rhaas@postgresql.org      312                 :UBC           0 :                 goto error;
                                313                 :                :         }
                                314                 :                : 
 4190 rhaas@postgresql.org      315   [ +  -  +  + ]:CBC         134 :         if (standby_message_timeout > 0 &&
                                316                 :             67 :             feTimestampDifferenceExceeds(last_status, now,
                                317                 :                :                                          standby_message_timeout))
                                318                 :                :         {
                                319                 :                :             /* Time to send feedback! */
                                320         [ -  + ]:              6 :             if (!sendFeedback(conn, now, true, false))
 4190 rhaas@postgresql.org      321                 :UBC           0 :                 goto error;
                                322                 :                : 
 4190 rhaas@postgresql.org      323                 :CBC           6 :             last_status = now;
                                324                 :                :         }
                                325                 :                : 
                                326                 :                :         /* got SIGHUP, close output file */
 4132 heikki.linnakangas@i      327   [ +  +  -  +  :             67 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
                                              -  - ]
                                328                 :                :         {
 4132 heikki.linnakangas@i      329                 :UBC           0 :             now = feGetCurrentTimestamp();
                                330         [ #  # ]:              0 :             if (!OutputFsync(now))
                                331                 :              0 :                 goto error;
                                332                 :              0 :             close(outfd);
                                333                 :              0 :             outfd = -1;
                                334                 :                :         }
 4132 heikki.linnakangas@i      335                 :CBC          67 :         output_reopen = false;
                                336                 :                : 
                                337                 :                :         /* open the output file, if not open yet */
 4131                           338         [ +  + ]:             67 :         if (outfd == -1)
                                339                 :                :         {
                                340                 :                :             struct stat statbuf;
                                341                 :                : 
                                342         [ +  - ]:              6 :             if (strcmp(outfile, "-") == 0)
                                343                 :              6 :                 outfd = fileno(stdout);
                                344                 :                :             else
 4131 heikki.linnakangas@i      345                 :UBC           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
                                346                 :                :                              S_IRUSR | S_IWUSR);
 4131 heikki.linnakangas@i      347         [ -  + ]:CBC           6 :             if (outfd == -1)
                                348                 :                :             {
 2350 peter@eisentraut.org      349                 :UBC           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
 4131 heikki.linnakangas@i      350                 :              0 :                 goto error;
                                351                 :                :             }
                                352                 :                : 
 3714 andres@anarazel.de        353         [ -  + ]:CBC           6 :             if (fstat(outfd, &statbuf) != 0)
                                354                 :                :             {
 2350 peter@eisentraut.org      355                 :UBC           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
 1500 michael@paquier.xyz       356                 :              0 :                 goto error;
                                357                 :                :             }
                                358                 :                : 
 3714 andres@anarazel.de        359   [ +  +  +  - ]:CBC           6 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
                                360                 :                :         }
                                361                 :                : 
 4190 rhaas@postgresql.org      362                 :             67 :         r = PQgetCopyData(conn, &copybuf, 1);
                                363         [ +  + ]:             67 :         if (r == 0)
                                364                 :             26 :         {
                                365                 :                :             /*
                                366                 :                :              * In async mode, and no data available. We block on reading but
                                367                 :                :              * not more than the specified timeout, so that we can send a
                                368                 :                :              * response back to the client.
                                369                 :                :              */
                                370                 :                :             fd_set      input_mask;
 3117 tgl@sss.pgh.pa.us         371                 :             27 :             TimestampTz message_target = 0;
                                372                 :             27 :             TimestampTz fsync_target = 0;
                                373                 :                :             struct timeval timeout;
 4190 rhaas@postgresql.org      374                 :             27 :             struct timeval *timeoutptr = NULL;
                                375                 :                : 
 3469 peter_e@gmx.net           376         [ -  + ]:             27 :             if (PQsocket(conn) < 0)
                                377                 :                :             {
 2350 peter@eisentraut.org      378                 :UBC           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 3469 peter_e@gmx.net           379                 :              0 :                 goto error;
                                380                 :                :             }
                                381                 :                : 
 4190 rhaas@postgresql.org      382         [ +  + ]:CBC         459 :             FD_ZERO(&input_mask);
                                383                 :             27 :             FD_SET(PQsocket(conn), &input_mask);
                                384                 :                : 
                                385                 :                :             /* Compute when we need to wakeup to send a keepalive message. */
                                386         [ +  - ]:             27 :             if (standby_message_timeout)
                                387                 :             27 :                 message_target = last_status + (standby_message_timeout - 1) *
                                388                 :                :                     ((int64) 1000);
                                389                 :                : 
                                390                 :                :             /* Compute when we need to wakeup to fsync the output file. */
 4132 heikki.linnakangas@i      391   [ +  -  +  + ]:             27 :             if (fsync_interval > 0 && output_needs_fsync)
 4190 rhaas@postgresql.org      392                 :             19 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                                393                 :                :                     ((int64) 1000);
                                394                 :                : 
                                395                 :                :             /* Now compute when to wakeup. */
                                396   [ -  +  -  - ]:             27 :             if (message_target > 0 || fsync_target > 0)
                                397                 :                :             {
                                398                 :                :                 TimestampTz targettime;
                                399                 :                :                 long        secs;
                                400                 :                :                 int         usecs;
                                401                 :                : 
                                402                 :             27 :                 targettime = message_target;
                                403                 :                : 
                                404   [ +  +  -  + ]:             27 :                 if (fsync_target > 0 && fsync_target < targettime)
 4190 rhaas@postgresql.org      405                 :UBC           0 :                     targettime = fsync_target;
                                406                 :                : 
 4190 rhaas@postgresql.org      407                 :CBC          27 :                 feTimestampDifference(now,
                                408                 :                :                                       targettime,
                                409                 :                :                                       &secs,
                                410                 :                :                                       &usecs);
                                411         [ -  + ]:             27 :                 if (secs <= 0)
 4190 rhaas@postgresql.org      412                 :UBC           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                                413                 :                :                 else
 4190 rhaas@postgresql.org      414                 :CBC          27 :                     timeout.tv_sec = secs;
                                415                 :             27 :                 timeout.tv_usec = usecs;
                                416                 :             27 :                 timeoutptr = &timeout;
                                417                 :                :             }
                                418                 :                : 
                                419                 :             27 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
                                420   [ +  -  +  +  :             27 :             if (r == 0 || (r < 0 && errno == EINTR))
                                              +  - ]
                                421                 :                :             {
                                422                 :                :                 /*
                                423                 :                :                  * Got a timeout or signal. Continue the loop and either
                                424                 :                :                  * deliver a status packet to the server or just go back into
                                425                 :                :                  * blocking.
                                426                 :                :                  */
                                427                 :             27 :                 continue;
                                428                 :                :             }
                                429         [ -  + ]:             26 :             else if (r < 0)
                                430                 :                :             {
 1597 peter@eisentraut.org      431                 :UBC           0 :                 pg_log_error("%s() failed: %m", "select");
 4190 rhaas@postgresql.org      432                 :              0 :                 goto error;
                                433                 :                :             }
                                434                 :                : 
                                435                 :                :             /* Else there is actually data on the socket */
 4190 rhaas@postgresql.org      436         [ -  + ]:CBC          26 :             if (PQconsumeInput(conn) == 0)
                                437                 :                :             {
 2350 peter@eisentraut.org      438                 :UBC           0 :                 pg_log_error("could not receive data from WAL stream: %s",
                                439                 :                :                              PQerrorMessage(conn));
 4190 rhaas@postgresql.org      440                 :              0 :                 goto error;
                                441                 :                :             }
 4190 rhaas@postgresql.org      442                 :CBC          26 :             continue;
                                443                 :                :         }
                                444                 :                : 
                                445                 :                :         /* End of copy stream */
                                446         [ -  + ]:             40 :         if (r == -1)
                                447                 :              5 :             break;
                                448                 :                : 
                                449                 :                :         /* Failure while reading the copy stream */
                                450         [ -  + ]:             40 :         if (r == -2)
                                451                 :                :         {
 2350 peter@eisentraut.org      452                 :UBC           0 :             pg_log_error("could not read COPY data: %s",
                                453                 :                :                          PQerrorMessage(conn));
 4190 rhaas@postgresql.org      454                 :              0 :             goto error;
                                455                 :                :         }
                                456                 :                : 
                                457                 :                :         /* Check the message type. */
   31 nathan@postgresql.or      458         [ +  + ]:GNC          40 :         if (copybuf[0] == PqReplMsg_Keepalive)
 4190 rhaas@postgresql.org      459                 :CBC           6 :         {
                                460                 :                :             int         pos;
                                461                 :                :             bool        replyRequested;
                                462                 :                :             XLogRecPtr  walEnd;
 3167 simon@2ndQuadrant.co      463                 :              7 :             bool        endposReached = false;
                                464                 :                : 
                                465                 :                :             /*
                                466                 :                :              * Parse the keepalive message, enclosed in the CopyData message.
                                467                 :                :              * We just check if the server requested a reply, and ignore the
                                468                 :                :              * rest.
                                469                 :                :              */
   31 nathan@postgresql.or      470                 :GNC           7 :             pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
 4190 rhaas@postgresql.org      471                 :CBC           7 :             walEnd = fe_recvint64(&copybuf[pos]);
                                472                 :              7 :             output_written_lsn = Max(walEnd, output_written_lsn);
                                473                 :                : 
                                474                 :              7 :             pos += 8;           /* read walEnd */
                                475                 :                : 
                                476                 :              7 :             pos += 8;           /* skip sendTime */
                                477                 :                : 
                                478         [ -  + ]:              7 :             if (r < pos + 1)
                                479                 :                :             {
 2350 peter@eisentraut.org      480                 :UBC           0 :                 pg_log_error("streaming header too small: %d", r);
 4190 rhaas@postgresql.org      481                 :              0 :                 goto error;
                                482                 :                :             }
 4190 rhaas@postgresql.org      483                 :CBC           7 :             replyRequested = copybuf[pos];
                                484                 :                : 
 3167 simon@2ndQuadrant.co      485   [ +  +  +  + ]:              7 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
                                486                 :                :             {
                                487                 :                :                 /*
                                488                 :                :                  * If there's nothing to read on the socket until a keepalive
                                489                 :                :                  * we know that the server has nothing to send us; and if
                                490                 :                :                  * walEnd has passed endpos, we know nothing else can have
                                491                 :                :                  * committed before endpos.  So we can bail out now.
                                492                 :                :                  */
                                493                 :              1 :                 endposReached = true;
                                494                 :                :             }
                                495                 :                : 
                                496                 :                :             /* Send a reply, if necessary */
                                497   [ +  -  +  + ]:              7 :             if (replyRequested || endposReached)
                                498                 :                :             {
                                499         [ -  + ]:              1 :                 if (!flushAndSendFeedback(conn, &now))
 4190 rhaas@postgresql.org      500                 :UBC           0 :                     goto error;
 4190 rhaas@postgresql.org      501                 :CBC           1 :                 last_status = now;
                                502                 :                :             }
                                503                 :                : 
 3167 simon@2ndQuadrant.co      504         [ +  + ]:              7 :             if (endposReached)
                                505                 :                :             {
  779 michael@paquier.xyz       506                 :              1 :                 stop_reason = STREAM_STOP_KEEPALIVE;
 3167 simon@2ndQuadrant.co      507                 :              1 :                 time_to_abort = true;
                                508                 :              1 :                 break;
                                509                 :                :             }
                                510                 :                : 
 4190 rhaas@postgresql.org      511                 :              6 :             continue;
                                512                 :                :         }
   31 nathan@postgresql.or      513         [ -  + ]:GNC          33 :         else if (copybuf[0] != PqReplMsg_WALData)
                                514                 :                :         {
 2350 peter@eisentraut.org      515                 :UBC           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
                                516                 :                :                          copybuf[0]);
 4190 rhaas@postgresql.org      517                 :              0 :             goto error;
                                518                 :                :         }
                                519                 :                : 
                                520                 :                :         /*
                                521                 :                :          * Read the header of the WALData message, enclosed in the CopyData
                                522                 :                :          * message. We only need the WAL location field (dataStart), the rest
                                523                 :                :          * of the header is ignored.
                                524                 :                :          */
   31 nathan@postgresql.or      525                 :GNC          33 :         hdr_len = 1;            /* msgtype PqReplMsg_WALData */
 4190 rhaas@postgresql.org      526                 :CBC          33 :         hdr_len += 8;           /* dataStart */
                                527                 :             33 :         hdr_len += 8;           /* walEnd */
                                528                 :             33 :         hdr_len += 8;           /* sendTime */
                                529         [ -  + ]:             33 :         if (r < hdr_len + 1)
                                530                 :                :         {
 2350 peter@eisentraut.org      531                 :UBC           0 :             pg_log_error("streaming header too small: %d", r);
 4190 rhaas@postgresql.org      532                 :              0 :             goto error;
                                533                 :                :         }
                                534                 :                : 
                                535                 :                :         /* Extract WAL location for this block */
 3167 simon@2ndQuadrant.co      536                 :CBC          33 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
                                537                 :                : 
                                538   [ +  -  -  + ]:             33 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
                                539                 :                :         {
                                540                 :                :             /*
                                541                 :                :              * We've read past our endpoint, so prepare to go away being
                                542                 :                :              * cautious about what happens to our output data.
                                543                 :                :              */
 3167 simon@2ndQuadrant.co      544         [ #  # ]:UBC           0 :             if (!flushAndSendFeedback(conn, &now))
                                545                 :              0 :                 goto error;
  779 michael@paquier.xyz       546                 :              0 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3167 simon@2ndQuadrant.co      547                 :              0 :             time_to_abort = true;
                                548                 :              0 :             break;
                                549                 :                :         }
                                550                 :                : 
 3167 simon@2ndQuadrant.co      551                 :CBC          33 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
                                552                 :                : 
 4190 rhaas@postgresql.org      553                 :             33 :         bytes_left = r - hdr_len;
                                554                 :             33 :         bytes_written = 0;
                                555                 :                : 
                                556                 :                :         /* signal that a fsync is needed */
 4132 heikki.linnakangas@i      557                 :             33 :         output_needs_fsync = true;
                                558                 :                : 
 4190 rhaas@postgresql.org      559         [ +  + ]:             66 :         while (bytes_left)
                                560                 :                :         {
                                561                 :                :             int         ret;
                                562                 :                : 
                                563                 :             66 :             ret = write(outfd,
                                564                 :             33 :                         copybuf + hdr_len + bytes_written,
                                565                 :                :                         bytes_left);
                                566                 :                : 
                                567         [ -  + ]:             33 :             if (ret < 0)
                                568                 :                :             {
 1389 peter@eisentraut.org      569                 :UBC           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                570                 :                :                              bytes_left, outfile);
 4190 rhaas@postgresql.org      571                 :              0 :                 goto error;
                                572                 :                :             }
                                573                 :                : 
                                574                 :                :             /* Write was successful, advance our position */
 4190 rhaas@postgresql.org      575                 :CBC          33 :             bytes_written += ret;
                                576                 :             33 :             bytes_left -= ret;
                                577                 :                :         }
                                578                 :                : 
                                579         [ -  + ]:             33 :         if (write(outfd, "\n", 1) != 1)
                                580                 :                :         {
 1389 peter@eisentraut.org      581                 :UBC           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                582                 :                :                          1, outfile);
 4190 rhaas@postgresql.org      583                 :              0 :             goto error;
                                584                 :                :         }
                                585                 :                : 
 3167 simon@2ndQuadrant.co      586   [ +  -  +  + ]:CBC          33 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
                                587                 :                :         {
                                588                 :                :             /* endpos was exactly the record we just processed, we're done */
                                589         [ -  + ]:              4 :             if (!flushAndSendFeedback(conn, &now))
 3167 simon@2ndQuadrant.co      590                 :UBC           0 :                 goto error;
  779 michael@paquier.xyz       591                 :CBC           4 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3167 simon@2ndQuadrant.co      592                 :              4 :             time_to_abort = true;
                                593                 :              4 :             break;
                                594                 :                :         }
                                595                 :                :     }
                                596                 :                : 
                                597                 :                :     /* Clean up connection state if stream has been aborted */
  779 michael@paquier.xyz       598         [ +  - ]:              6 :     if (time_to_abort)
                                599                 :              6 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
                                600                 :                : 
 4190 rhaas@postgresql.org      601                 :              6 :     res = PQgetResult(conn);
 3167 simon@2ndQuadrant.co      602         [ +  - ]:              6 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
                                603                 :                :     {
 1942 noah@leadboat.com         604                 :              6 :         PQclear(res);
                                605                 :                : 
                                606                 :                :         /*
                                607                 :                :          * We're doing a client-initiated clean exit and have sent CopyDone to
                                608                 :                :          * the server. Drain any messages, so we don't miss a last-minute
                                609                 :                :          * ErrorResponse. The walsender stops generating WALData records once
                                610                 :                :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
                                611                 :                :          * it's too late for sendFeedback(), even if this were to take a long
                                612                 :                :          * time. Hence, use synchronous-mode PQgetCopyData().
                                613                 :                :          */
                                614                 :                :         while (1)
                                615                 :              4 :         {
                                616                 :                :             int         r;
                                617                 :                : 
                                618         [ +  + ]:             10 :             if (copybuf != NULL)
                                619                 :                :             {
                                620                 :              9 :                 PQfreemem(copybuf);
                                621                 :              9 :                 copybuf = NULL;
                                622                 :                :             }
                                623                 :             10 :             r = PQgetCopyData(conn, &copybuf, 0);
                                624         [ +  + ]:             10 :             if (r == -1)
                                625                 :              6 :                 break;
                                626         [ -  + ]:              4 :             if (r == -2)
                                627                 :                :             {
 1942 noah@leadboat.com         628                 :UBC           0 :                 pg_log_error("could not read COPY data: %s",
                                629                 :                :                              PQerrorMessage(conn));
                                630                 :              0 :                 time_to_abort = false;  /* unclean exit */
                                631                 :              0 :                 goto error;
                                632                 :                :             }
                                633                 :                :         }
                                634                 :                : 
 1942 noah@leadboat.com         635                 :CBC           6 :         res = PQgetResult(conn);
                                636                 :                :     }
                                637         [ -  + ]:              6 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                638                 :                :     {
 2350 peter@eisentraut.org      639                 :UBC           0 :         pg_log_error("unexpected termination of replication stream: %s",
                                640                 :                :                      PQresultErrorMessage(res));
  165 dgustafsson@postgres      641                 :              0 :         PQclear(res);
 4190 rhaas@postgresql.org      642                 :              0 :         goto error;
                                643                 :                :     }
 4190 rhaas@postgresql.org      644                 :CBC           6 :     PQclear(res);
                                645                 :                : 
                                646   [ +  -  -  + ]:              6 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
                                647                 :                :     {
 3117 tgl@sss.pgh.pa.us         648                 :UBC           0 :         TimestampTz t = feGetCurrentTimestamp();
                                649                 :                : 
                                650                 :                :         /* no need to jump to error on failure here, we're finishing anyway */
 4190 rhaas@postgresql.org      651                 :              0 :         OutputFsync(t);
                                652                 :                : 
                                653         [ #  # ]:              0 :         if (close(outfd) != 0)
 2350 peter@eisentraut.org      654                 :              0 :             pg_log_error("could not close file \"%s\": %m", outfile);
                                655                 :                :     }
 4190 rhaas@postgresql.org      656                 :CBC           6 :     outfd = -1;
                                657                 :              6 : error:
 4142 heikki.linnakangas@i      658         [ -  + ]:              6 :     if (copybuf != NULL)
                                659                 :                :     {
 4142 heikki.linnakangas@i      660                 :UBC           0 :         PQfreemem(copybuf);
                                661                 :              0 :         copybuf = NULL;
                                662                 :                :     }
 4190 rhaas@postgresql.org      663                 :CBC           6 :     destroyPQExpBuffer(query);
                                664                 :              6 :     PQfinish(conn);
                                665                 :              6 :     conn = NULL;
                                666                 :                : }
                                667                 :                : 
                                668                 :                : /*
                                669                 :                :  * Unfortunately we can't do sensible signal handling on windows...
                                670                 :                :  */
                                671                 :                : #ifndef WIN32
                                672                 :                : 
                                673                 :                : /*
                                674                 :                :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                675                 :                :  * possible moment.
                                676                 :                :  */
                                677                 :                : static void
 1088 tgl@sss.pgh.pa.us         678                 :              1 : sigexit_handler(SIGNAL_ARGS)
                                679                 :                : {
  779 michael@paquier.xyz       680                 :              1 :     stop_reason = STREAM_STOP_SIGNAL;
 4190 rhaas@postgresql.org      681                 :              1 :     time_to_abort = true;
                                682                 :              1 : }
                                683                 :                : 
                                684                 :                : /*
                                685                 :                :  * Trigger the output file to be reopened.
                                686                 :                :  */
                                687                 :                : static void
 1088 tgl@sss.pgh.pa.us         688                 :UBC           0 : sighup_handler(SIGNAL_ARGS)
                                689                 :                : {
 4190 rhaas@postgresql.org      690                 :              0 :     output_reopen = true;
                                691                 :              0 : }
                                692                 :                : #endif
                                693                 :                : 
                                694                 :                : 
                                695                 :                : int
 4190 rhaas@postgresql.org      696                 :CBC          19 : main(int argc, char **argv)
                                697                 :                : {
                                698                 :                :     static struct option long_options[] = {
                                699                 :                : /* general options */
                                700                 :                :         {"file", required_argument, NULL, 'f'},
                                701                 :                :         {"fsync-interval", required_argument, NULL, 'F'},
                                702                 :                :         {"no-loop", no_argument, NULL, 'n'},
                                703                 :                :         {"enable-failover", no_argument, NULL, 5},
                                704                 :                :         {"enable-two-phase", no_argument, NULL, 't'},
                                705                 :                :         {"two-phase", no_argument, NULL, 't'},    /* deprecated */
                                706                 :                :         {"verbose", no_argument, NULL, 'v'},
                                707                 :                :         {"version", no_argument, NULL, 'V'},
                                708                 :                :         {"help", no_argument, NULL, '?'},
                                709                 :                : /* connection options */
                                710                 :                :         {"dbname", required_argument, NULL, 'd'},
                                711                 :                :         {"host", required_argument, NULL, 'h'},
                                712                 :                :         {"port", required_argument, NULL, 'p'},
                                713                 :                :         {"username", required_argument, NULL, 'U'},
                                714                 :                :         {"no-password", no_argument, NULL, 'w'},
                                715                 :                :         {"password", no_argument, NULL, 'W'},
                                716                 :                : /* replication options */
                                717                 :                :         {"startpos", required_argument, NULL, 'I'},
                                718                 :                :         {"endpos", required_argument, NULL, 'E'},
                                719                 :                :         {"option", required_argument, NULL, 'o'},
                                720                 :                :         {"plugin", required_argument, NULL, 'P'},
                                721                 :                :         {"status-interval", required_argument, NULL, 's'},
                                722                 :                :         {"slot", required_argument, NULL, 'S'},
                                723                 :                : /* action */
                                724                 :                :         {"create-slot", no_argument, NULL, 1},
                                725                 :                :         {"start", no_argument, NULL, 2},
                                726                 :                :         {"drop-slot", no_argument, NULL, 3},
                                727                 :                :         {"if-not-exists", no_argument, NULL, 4},
                                728                 :                :         {NULL, 0, NULL, 0}
                                729                 :                :     };
                                730                 :                :     int         c;
                                731                 :                :     int         option_index;
                                732                 :                :     uint32      hi,
                                733                 :                :                 lo;
                                734                 :                :     char       *db_name;
                                735                 :                : 
 2350 peter@eisentraut.org      736                 :             19 :     pg_logging_init(argv[0]);
 4190 rhaas@postgresql.org      737                 :             19 :     progname = get_progname(argv[0]);
 3540 alvherre@alvh.no-ip.      738                 :             19 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                739                 :                : 
 4190 rhaas@postgresql.org      740         [ +  + ]:             19 :     if (argc > 1)
                                741                 :                :     {
                                742   [ +  +  -  + ]:             18 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
                                743                 :                :         {
                                744                 :              1 :             usage();
                                745                 :              1 :             exit(0);
                                746                 :                :         }
                                747         [ +  - ]:             17 :         else if (strcmp(argv[1], "-V") == 0 ||
                                748         [ +  + ]:             17 :                  strcmp(argv[1], "--version") == 0)
                                749                 :                :         {
                                750                 :              1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
                                751                 :              1 :             exit(0);
                                752                 :                :         }
                                753                 :                :     }
                                754                 :                : 
  999 peter@eisentraut.org      755                 :             86 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
 4190 rhaas@postgresql.org      756         [ +  + ]:             86 :                             long_options, &option_index)) != -1)
                                757                 :                :     {
                                758   [ +  -  +  +  :             70 :         switch (c)
                                     -  +  +  -  -  
                                     -  -  -  -  +  
                                     +  -  -  +  +  
                                        +  +  -  + ]
                                759                 :                :         {
                                760                 :                : /* general options */
                                761                 :              7 :             case 'f':
                                762                 :              7 :                 outfile = pg_strdup(optarg);
                                763                 :              7 :                 break;
 4122 andres@anarazel.de        764                 :UBC           0 :             case 'F':
 1505 michael@paquier.xyz       765         [ #  # ]:              0 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
                                766                 :                :                                       INT_MAX / 1000,
                                767                 :                :                                       &fsync_interval))
 4122 andres@anarazel.de        768                 :              0 :                     exit(1);
 1505 michael@paquier.xyz       769                 :              0 :                 fsync_interval *= 1000;
 4122 andres@anarazel.de        770                 :              0 :                 break;
 4190 rhaas@postgresql.org      771                 :CBC           6 :             case 'n':
                                772                 :              6 :                 noloop = 1;
                                773                 :              6 :                 break;
 1529 akapila@postgresql.o      774                 :              2 :             case 't':
                                775                 :              2 :                 two_phase = true;
                                776                 :              2 :                 break;
  999 peter@eisentraut.org      777                 :UBC           0 :             case 'v':
                                778                 :              0 :                 verbose++;
                                779                 :              0 :                 break;
  155 msawada@postgresql.o      780                 :CBC           1 :             case 5:
                                781                 :              1 :                 failover = true;
                                782                 :              1 :                 break;
                                783                 :                : /* connection options */
 4190 rhaas@postgresql.org      784                 :             13 :             case 'd':
                                785                 :             13 :                 dbname = pg_strdup(optarg);
                                786                 :             13 :                 break;
 4190 rhaas@postgresql.org      787                 :UBC           0 :             case 'h':
                                788                 :              0 :                 dbhost = pg_strdup(optarg);
                                789                 :              0 :                 break;
                                790                 :              0 :             case 'p':
                                791                 :              0 :                 dbport = pg_strdup(optarg);
                                792                 :              0 :                 break;
                                793                 :              0 :             case 'U':
                                794                 :              0 :                 dbuser = pg_strdup(optarg);
                                795                 :              0 :                 break;
                                796                 :              0 :             case 'w':
                                797                 :              0 :                 dbgetpassword = -1;
                                798                 :              0 :                 break;
                                799                 :              0 :             case 'W':
                                800                 :              0 :                 dbgetpassword = 1;
                                801                 :              0 :                 break;
                                802                 :                : /* replication options */
 4122 andres@anarazel.de        803                 :              0 :             case 'I':
   61 alvherre@kurilemu.de      804         [ #  # ]:UNC           0 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1247 tgl@sss.pgh.pa.us         805                 :UBC           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
 4122 andres@anarazel.de        806                 :              0 :                 startpos = ((uint64) hi) << 32 | lo;
                                807                 :              0 :                 break;
 3167 simon@2ndQuadrant.co      808                 :CBC           6 :             case 'E':
   61 alvherre@kurilemu.de      809         [ -  + ]:GNC           6 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1247 tgl@sss.pgh.pa.us         810                 :UBC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
 3167 simon@2ndQuadrant.co      811                 :CBC           6 :                 endpos = ((uint64) hi) << 32 | lo;
                                812                 :              6 :                 break;
 4190 rhaas@postgresql.org      813                 :              6 :             case 'o':
                                814                 :                :                 {
 4141 bruce@momjian.us          815                 :              6 :                     char       *data = pg_strdup(optarg);
                                816                 :              6 :                     char       *val = strchr(data, '=');
                                817                 :                : 
 4190 rhaas@postgresql.org      818         [ +  - ]:              6 :                     if (val != NULL)
                                819                 :                :                     {
                                820                 :                :                         /* remove =; separate data from val */
                                821                 :              6 :                         *val = '\0';
                                822                 :              6 :                         val++;
                                823                 :                :                     }
                                824                 :                : 
                                825                 :              6 :                     noptions += 1;
 4141 bruce@momjian.us          826                 :              6 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
                                827                 :                : 
 4190 rhaas@postgresql.org      828                 :              6 :                     options[(noptions - 1) * 2] = data;
                                829                 :              6 :                     options[(noptions - 1) * 2 + 1] = val;
                                830                 :                :                 }
                                831                 :                : 
                                832                 :              6 :                 break;
 4190 rhaas@postgresql.org      833                 :UBC           0 :             case 'P':
                                834                 :              0 :                 plugin = pg_strdup(optarg);
                                835                 :              0 :                 break;
                                836                 :              0 :             case 's':
 1505 michael@paquier.xyz       837         [ #  # ]:              0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
                                838                 :                :                                       INT_MAX / 1000,
                                839                 :                :                                       &standby_message_timeout))
 4190 rhaas@postgresql.org      840                 :              0 :                     exit(1);
 1505 michael@paquier.xyz       841                 :              0 :                 standby_message_timeout *= 1000;
 4190 rhaas@postgresql.org      842                 :              0 :                 break;
 4190 rhaas@postgresql.org      843                 :CBC          15 :             case 'S':
                                844                 :             15 :                 replication_slot = pg_strdup(optarg);
                                845                 :             15 :                 break;
                                846                 :                : /* action */
                                847                 :              3 :             case 1:
                                848                 :              3 :                 do_create_slot = true;
                                849                 :              3 :                 break;
                                850                 :              8 :             case 2:
                                851                 :              8 :                 do_start_slot = true;
                                852                 :              8 :                 break;
                                853                 :              2 :             case 3:
                                854                 :              2 :                 do_drop_slot = true;
                                855                 :              2 :                 break;
 3709 andres@anarazel.de        856                 :UBC           0 :             case 4:
                                857                 :              0 :                 slot_exists_ok = true;
                                858                 :              0 :                 break;
                                859                 :                : 
 4190 rhaas@postgresql.org      860                 :CBC           1 :             default:
                                861                 :                :                 /* getopt_long already emitted a complaint */
 1247 tgl@sss.pgh.pa.us         862                 :              1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      863                 :              1 :                 exit(1);
                                864                 :                :         }
                                865                 :                :     }
                                866                 :                : 
                                867                 :                :     /*
                                868                 :                :      * Any non-option arguments?
                                869                 :                :      */
                                870         [ -  + ]:             16 :     if (optind < argc)
                                871                 :                :     {
 2350 peter@eisentraut.org      872                 :UBC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
                                873                 :                :                      argv[optind]);
 1247 tgl@sss.pgh.pa.us         874                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      875                 :              0 :         exit(1);
                                876                 :                :     }
                                877                 :                : 
                                878                 :                :     /*
                                879                 :                :      * Required arguments
                                880                 :                :      */
 4190 rhaas@postgresql.org      881         [ +  + ]:CBC          16 :     if (replication_slot == NULL)
                                882                 :                :     {
 2350 peter@eisentraut.org      883                 :              1 :         pg_log_error("no slot specified");
 1247 tgl@sss.pgh.pa.us         884                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      885                 :              1 :         exit(1);
                                886                 :                :     }
                                887                 :                : 
                                888   [ +  +  +  + ]:             15 :     if (do_start_slot && outfile == NULL)
                                889                 :                :     {
 2350 peter@eisentraut.org      890                 :              1 :         pg_log_error("no target file specified");
 1247 tgl@sss.pgh.pa.us         891                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      892                 :              1 :         exit(1);
                                893                 :                :     }
                                894                 :                : 
                                895   [ +  +  +  + ]:             14 :     if (!do_drop_slot && dbname == NULL)
                                896                 :                :     {
 2350 peter@eisentraut.org      897                 :              1 :         pg_log_error("no database specified");
 1247 tgl@sss.pgh.pa.us         898                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      899                 :              1 :         exit(1);
                                900                 :                :     }
                                901                 :                : 
                                902   [ +  +  +  +  :             13 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
                                              +  + ]
                                903                 :                :     {
 2350 peter@eisentraut.org      904                 :              1 :         pg_log_error("at least one action needs to be specified");
 1247 tgl@sss.pgh.pa.us         905                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      906                 :              1 :         exit(1);
                                907                 :                :     }
                                908                 :                : 
                                909   [ +  +  +  -  :             12 :     if (do_drop_slot && (do_create_slot || do_start_slot))
                                              -  + ]
                                910                 :                :     {
 2350 peter@eisentraut.org      911                 :UBC           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
 1247 tgl@sss.pgh.pa.us         912                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      913                 :              0 :         exit(1);
                                914                 :                :     }
                                915                 :                : 
 4043 andres@anarazel.de        916   [ -  +  -  -  :CBC          12 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
                                              -  - ]
                                917                 :                :     {
 2350 peter@eisentraut.org      918                 :UBC           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
 1247 tgl@sss.pgh.pa.us         919                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4190 rhaas@postgresql.org      920                 :              0 :         exit(1);
                                921                 :                :     }
                                922                 :                : 
 3167 simon@2ndQuadrant.co      923   [ +  +  -  + ]:CBC          12 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
                                924                 :                :     {
 2350 peter@eisentraut.org      925                 :UBC           0 :         pg_log_error("--endpos may only be specified with --start");
 1247 tgl@sss.pgh.pa.us         926                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3167 simon@2ndQuadrant.co      927                 :              0 :         exit(1);
                                928                 :                :     }
                                929                 :                : 
  155 msawada@postgresql.o      930         [ +  + ]:CBC          12 :     if (!do_create_slot)
                                931                 :                :     {
                                932         [ +  + ]:              9 :         if (two_phase)
                                933                 :                :         {
   69 peter@eisentraut.org      934                 :              1 :             pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
  155 msawada@postgresql.o      935                 :              1 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                936                 :              1 :             exit(1);
                                937                 :                :         }
                                938                 :                : 
                                939         [ -  + ]:              8 :         if (failover)
                                940                 :                :         {
   69 peter@eisentraut.org      941                 :UBC           0 :             pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
  155 msawada@postgresql.o      942                 :              0 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                943                 :              0 :             exit(1);
                                944                 :                :         }
                                945                 :                :     }
                                946                 :                : 
                                947                 :                :     /*
                                948                 :                :      * Obtain a connection to server.  Notably, if we need a password, we want
                                949                 :                :      * to collect it from the user immediately.
                                950                 :                :      */
 3993 andres@anarazel.de        951                 :CBC          11 :     conn = GetConnection();
                                952         [ -  + ]:             11 :     if (!conn)
                                953                 :                :         /* Error message already written in GetConnection() */
 3993 andres@anarazel.de        954                 :UBC           0 :         exit(1);
 2443 peter@eisentraut.org      955                 :CBC          11 :     atexit(disconnect_atexit);
                                956                 :                : 
                                957                 :                :     /*
                                958                 :                :      * Trap signals.  (Don't do this until after the initial password prompt,
                                959                 :                :      * if one is needed, in GetConnection.)
                                960                 :                :      */
                                961                 :                : #ifndef WIN32
 1088 dgustafsson@postgres      962                 :             11 :     pqsignal(SIGINT, sigexit_handler);
                                963                 :             11 :     pqsignal(SIGTERM, sigexit_handler);
 1385 tgl@sss.pgh.pa.us         964                 :             11 :     pqsignal(SIGHUP, sighup_handler);
                                965                 :                : #endif
                                966                 :                : 
                                967                 :                :     /*
                                968                 :                :      * Run IDENTIFY_SYSTEM to check the connection type for each action.
                                969                 :                :      * --create-slot and --start actions require a database-specific
                                970                 :                :      * replication connection because they handle logical replication slots.
                                971                 :                :      * --drop-slot can remove replication slots from any replication
                                972                 :                :      * connection without this restriction.
                                973                 :                :      */
 3993 andres@anarazel.de        974         [ -  + ]:             11 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 2443 peter@eisentraut.org      975                 :UBC           0 :         exit(1);
                                976                 :                : 
  165 fujii@postgresql.org      977   [ +  +  -  + ]:CBC          11 :     if (!do_drop_slot && db_name == NULL)
 1247 tgl@sss.pgh.pa.us         978                 :UBC           0 :         pg_fatal("could not establish database-specific replication connection");
                                979                 :                : 
                                980                 :                :     /*
                                981                 :                :      * Set umask so that directories/files are created with the same
                                982                 :                :      * permissions as directories/files in the source data directory.
                                983                 :                :      *
                                984                 :                :      * pg_mode_mask is set to owner-only by default and then updated in
                                985                 :                :      * GetConnection() where we get the mode from the server-side with
                                986                 :                :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
                                987                 :                :      */
 2709 sfrost@snowman.net        988                 :CBC          11 :     umask(pg_mode_mask);
                                989                 :                : 
                                990                 :                :     /* Drop a replication slot. */
 4190 rhaas@postgresql.org      991         [ +  + ]:             11 :     if (do_drop_slot)
                                992                 :                :     {
                                993         [ -  + ]:              2 :         if (verbose)
 2350 peter@eisentraut.org      994                 :UBC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
                                995                 :                : 
 3993 andres@anarazel.de        996         [ -  + ]:CBC           2 :         if (!DropReplicationSlot(conn, replication_slot))
 2443 peter@eisentraut.org      997                 :UBC           0 :             exit(1);
                                998                 :                :     }
                                999                 :                : 
                               1000                 :                :     /* Create a replication slot. */
 4190 rhaas@postgresql.org     1001         [ +  + ]:CBC          11 :     if (do_create_slot)
                               1002                 :                :     {
                               1003         [ -  + ]:              3 :         if (verbose)
 2350 peter@eisentraut.org     1004                 :UBC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
                               1005                 :                : 
 2902 peter_e@gmx.net          1006         [ -  + ]:CBC           3 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
                               1007                 :                :                                    false, false, slot_exists_ok, two_phase,
                               1008                 :                :                                    failover))
 2443 peter@eisentraut.org     1009                 :UBC           0 :             exit(1);
 3709 andres@anarazel.de       1010                 :CBC           3 :         startpos = InvalidXLogRecPtr;
                               1011                 :                :     }
                               1012                 :                : 
 4190 rhaas@postgresql.org     1013         [ +  + ]:             11 :     if (!do_start_slot)
 2443 peter@eisentraut.org     1014                 :              5 :         exit(0);
                               1015                 :                : 
                               1016                 :                :     /* Stream loop */
                               1017                 :                :     while (true)
                               1018                 :                :     {
 3993 andres@anarazel.de       1019                 :              6 :         StreamLogicalLog();
 4190 rhaas@postgresql.org     1020         [ +  - ]:              6 :         if (time_to_abort)
                               1021                 :                :         {
                               1022                 :                :             /*
                               1023                 :                :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
                               1024                 :                :              * not an error, so exit without an errorcode.
                               1025                 :                :              */
 2443 peter@eisentraut.org     1026                 :              6 :             exit(0);
                               1027                 :                :         }
 4190 rhaas@postgresql.org     1028         [ #  # ]:UBC           0 :         else if (noloop)
 1247 tgl@sss.pgh.pa.us        1029                 :              0 :             pg_fatal("disconnected");
                               1030                 :                :         else
                               1031                 :                :         {
                               1032                 :                :             /* translator: check source for value for %d */
 2350 peter@eisentraut.org     1033                 :              0 :             pg_log_info("disconnected; waiting %d seconds to try again",
                               1034                 :                :                         RECONNECT_SLEEP_TIME);
 4190 rhaas@postgresql.org     1035                 :              0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                               1036                 :                :         }
                               1037                 :                :     }
                               1038                 :                : }
                               1039                 :                : 
                               1040                 :                : /*
                               1041                 :                :  * Fsync our output data, and send a feedback message to the server.  Returns
                               1042                 :                :  * true if successful, false otherwise.
                               1043                 :                :  *
                               1044                 :                :  * If successful, *now is updated to the current timestamp just before sending
                               1045                 :                :  * feedback.
                               1046                 :                :  */
                               1047                 :                : static bool
 3167 simon@2ndQuadrant.co     1048                 :CBC           5 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
                               1049                 :                : {
                               1050                 :                :     /* flush data to disk, so that we send a recent flush pointer */
                               1051         [ -  + ]:              5 :     if (!OutputFsync(*now))
 3167 simon@2ndQuadrant.co     1052                 :UBC           0 :         return false;
 3167 simon@2ndQuadrant.co     1053                 :CBC           5 :     *now = feGetCurrentTimestamp();
                               1054         [ -  + ]:              5 :     if (!sendFeedback(conn, *now, true, false))
 3167 simon@2ndQuadrant.co     1055                 :UBC           0 :         return false;
                               1056                 :                : 
 3167 simon@2ndQuadrant.co     1057                 :CBC           5 :     return true;
                               1058                 :                : }
                               1059                 :                : 
                               1060                 :                : /*
                               1061                 :                :  * Try to inform the server about our upcoming demise, but don't wait around or
                               1062                 :                :  * retry on failure.
                               1063                 :                :  */
                               1064                 :                : static void
  779 michael@paquier.xyz      1065                 :              6 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
                               1066                 :                :                    XLogRecPtr lsn)
                               1067                 :                : {
 3167 simon@2ndQuadrant.co     1068                 :              6 :     (void) PQputCopyEnd(conn, NULL);
                               1069                 :              6 :     (void) PQflush(conn);
                               1070                 :                : 
                               1071         [ -  + ]:              6 :     if (verbose)
                               1072                 :                :     {
  779 michael@paquier.xyz      1073   [ #  #  #  #  :UBC           0 :         switch (reason)
                                                 # ]
                               1074                 :                :         {
                               1075                 :              0 :             case STREAM_STOP_SIGNAL:
                               1076                 :              0 :                 pg_log_info("received interrupt signal, exiting");
                               1077                 :              0 :                 break;
                               1078                 :              0 :             case STREAM_STOP_KEEPALIVE:
   61 alvherre@kurilemu.de     1079                 :UNC           0 :                 pg_log_info("end position %X/%08X reached by keepalive",
                               1080                 :                :                             LSN_FORMAT_ARGS(endpos));
  779 michael@paquier.xyz      1081                 :UBC           0 :                 break;
                               1082                 :              0 :             case STREAM_STOP_END_OF_WAL:
                               1083         [ #  # ]:              0 :                 Assert(!XLogRecPtrIsInvalid(lsn));
   61 alvherre@kurilemu.de     1084                 :UNC           0 :                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
                               1085                 :                :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
  779 michael@paquier.xyz      1086                 :UBC           0 :                 break;
                               1087                 :              0 :             case STREAM_STOP_NONE:
                               1088                 :              0 :                 Assert(false);
                               1089                 :                :                 break;
                               1090                 :                :         }
                               1091                 :                :     }
 3167 simon@2ndQuadrant.co     1092                 :CBC           6 : }
        

Generated by: LCOV version 2.4-beta