LCOV - differential code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Coverage Total Hit UBC GNC CBC DCB
Current: 580b5c2f397fbb2f74c2661cfe53203ed6acead0 vs 481783e69f144936f1ebbc3259809bee518c6c0c Lines: 89.6 % 164 147 17 2 145 4
Current Date: 2025-12-15 09:47:30 +0900 Functions: 100.0 % 14 14 2 12
Baseline: lcov-20251215-010234-baseline Branches: 73.7 % 114 84 30 84
Baseline Date: 2025-12-11 14:11:25 +0900 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(1,7] days: 100.0 % 2 2 2
(360..) days: 89.5 % 162 145 17 145
Function coverage date bins:
(360..) days: 100.0 % 14 14 2 12
Branch coverage date bins:
(360..) days: 73.7 % 114 84 30 84

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  *  parallel_slot.c
                                  4                 :                :  *      Parallel support for front-end parallel database connections
                                  5                 :                :  *
                                  6                 :                :  *
                                  7                 :                :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
                                  8                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                  9                 :                :  *
                                 10                 :                :  * src/fe_utils/parallel_slot.c
                                 11                 :                :  *
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : #if defined(WIN32) && FD_SETSIZE < 1024
                                 16                 :                : #error FD_SETSIZE needs to have been increased
                                 17                 :                : #endif
                                 18                 :                : 
                                 19                 :                : #include "postgres_fe.h"
                                 20                 :                : 
                                 21                 :                : #include <sys/select.h>
                                 22                 :                : 
                                 23                 :                : #include "common/logging.h"
                                 24                 :                : #include "fe_utils/cancel.h"
                                 25                 :                : #include "fe_utils/parallel_slot.h"
                                 26                 :                : #include "fe_utils/query_utils.h"
                                 27                 :                : 
                                 28                 :                : #define ERRCODE_UNDEFINED_TABLE  "42P01"
                                 29                 :                : 
                                 30                 :                : static int  select_loop(int maxFd, fd_set *workerset);
                                 31                 :                : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
                                 32                 :                : 
                                 33                 :                : /*
                                 34                 :                :  * Process (and delete) a query result.  Returns true if there's no problem,
                                 35                 :                :  * false otherwise. It's up to the handler to decide what constitutes a
                                 36                 :                :  * problem.
                                 37                 :                :  */
                                 38                 :                : static bool
 1774 rhaas@postgresql.org       39                 :CBC       13084 : processQueryResult(ParallelSlot *slot, PGresult *result)
                                 40                 :                : {
                                 41         [ -  + ]:          13084 :     Assert(slot->handler != NULL);
                                 42                 :                : 
                                 43                 :                :     /* On failure, the handler should return NULL after freeing the result */
                                 44         [ +  + ]:          13084 :     if (!slot->handler(result, slot->connection, slot->handler_context))
                                 45                 :              6 :         return false;
                                 46                 :                : 
                                 47                 :                :     /* Ok, we have to free it ourself */
                                 48                 :          13078 :     PQclear(result);
                                 49                 :          13078 :     return true;
                                 50                 :                : }
                                 51                 :                : 
                                 52                 :                : /*
                                 53                 :                :  * Consume all the results generated for the given connection until
                                 54                 :                :  * nothing remains.  If at least one error is encountered, return false.
                                 55                 :                :  * Note that this will block if the connection is busy.
                                 56                 :                :  */
                                 57                 :                : static bool
                                 58                 :            252 : consumeQueryResult(ParallelSlot *slot)
                                 59                 :                : {
                                 60                 :            252 :     bool        ok = true;
                                 61                 :                :     PGresult   *result;
                                 62                 :                : 
                                 63                 :            252 :     SetCancelConn(slot->connection);
                                 64         [ +  + ]:            503 :     while ((result = PQgetResult(slot->connection)) != NULL)
                                 65                 :                :     {
                                 66         [ +  + ]:            251 :         if (!processQueryResult(slot, result))
                                 67                 :              6 :             ok = false;
                                 68                 :                :     }
                                 69                 :            252 :     ResetCancelConn();
                                 70                 :            252 :     return ok;
                                 71                 :                : }
                                 72                 :                : 
                                 73                 :                : /*
                                 74                 :                :  * Wait until a file descriptor from the given set becomes readable.
                                 75                 :                :  *
                                 76                 :                :  * Returns the number of ready descriptors, or -1 on failure (including
                                 77                 :                :  * getting a cancel request).
                                 78                 :                :  */
                                 79                 :                : static int
 2017 tgl@sss.pgh.pa.us          80                 :          12881 : select_loop(int maxFd, fd_set *workerset)
                                 81                 :                : {
                                 82                 :                :     int         i;
 2341 michael@paquier.xyz        83                 :          12881 :     fd_set      saveSet = *workerset;
                                 84                 :                : 
                                 85         [ -  + ]:          12881 :     if (CancelRequested)
 2341 michael@paquier.xyz        86                 :UBC           0 :         return -1;
                                 87                 :                : 
                                 88                 :                :     for (;;)
                                 89                 :              0 :     {
                                 90                 :                :         /*
                                 91                 :                :          * On Windows, we need to check once in a while for cancel requests;
                                 92                 :                :          * on other platforms we rely on select() returning when interrupted.
                                 93                 :                :          */
                                 94                 :                :         struct timeval *tvp;
                                 95                 :                : #ifdef WIN32
                                 96                 :                :         struct timeval tv = {0, 1000000};
                                 97                 :                : 
                                 98                 :                :         tvp = &tv;
                                 99                 :                : #else
 2341 michael@paquier.xyz       100                 :CBC       12881 :         tvp = NULL;
                                101                 :                : #endif
                                102                 :                : 
                                103                 :          12881 :         *workerset = saveSet;
                                104                 :          12881 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
                                105                 :                : 
                                106                 :                : #ifdef WIN32
                                107                 :                :         if (i == SOCKET_ERROR)
                                108                 :                :         {
                                109                 :                :             i = -1;
                                110                 :                : 
                                111                 :                :             if (WSAGetLastError() == WSAEINTR)
                                112                 :                :                 errno = EINTR;
                                113                 :                :         }
                                114                 :                : #endif
                                115                 :                : 
                                116   [ -  +  -  - ]:          12881 :         if (i < 0 && errno == EINTR)
 2341 michael@paquier.xyz       117                 :UBC           0 :             continue;           /* ignore this */
 2341 michael@paquier.xyz       118   [ +  -  -  + ]:CBC       12881 :         if (i < 0 || CancelRequested)
 2017 tgl@sss.pgh.pa.us         119                 :UBC           0 :             return -1;          /* but not this */
 2341 michael@paquier.xyz       120         [ -  + ]:CBC       12881 :         if (i == 0)
 2341 michael@paquier.xyz       121                 :UBC           0 :             continue;           /* timeout (Win32 only) */
 2341 michael@paquier.xyz       122                 :CBC       12881 :         break;
                                123                 :                :     }
                                124                 :                : 
                                125                 :          12881 :     return i;
                                126                 :                : }
                                127                 :                : 
                                128                 :                : /*
                                129                 :                :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
                                130                 :                :  * the given dbname is not null, only idle slots connected to the given
                                131                 :                :  * database are considered suitable, otherwise all idle connected slots are
                                132                 :                :  * considered suitable.
                                133                 :                :  */
                                134                 :                : static int
 1740 rhaas@postgresql.org      135                 :          25965 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
                                136                 :                : {
                                137                 :                :     int         i;
                                138                 :                : 
                                139         [ +  + ]:          38982 :     for (i = 0; i < sa->numslots; i++)
                                140                 :                :     {
                                141         [ +  + ]:          26080 :         if (sa->slots[i].inUse)
                                142                 :          12996 :             continue;
                                143                 :                : 
                                144         [ +  + ]:          13084 :         if (sa->slots[i].connection == NULL)
                                145                 :              7 :             continue;
                                146                 :                : 
                                147         [ +  + ]:          13077 :         if (dbname == NULL ||
                                148         [ +  + ]:           7732 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
                                149                 :          13063 :             return i;
                                150                 :                :     }
                                151                 :          12902 :     return -1;
                                152                 :                : }
                                153                 :                : 
                                154                 :                : /*
                                155                 :                :  * Return the offset of the first slot without a database connection, or -1 if
                                156                 :                :  * all slots are connected.
                                157                 :                :  */
                                158                 :                : static int
                                159                 :          13081 : find_unconnected_slot(const ParallelSlotArray *sa)
                                160                 :                : {
                                161                 :                :     int         i;
                                162                 :                : 
                                163         [ +  + ]:          26051 :     for (i = 0; i < sa->numslots; i++)
                                164                 :                :     {
                                165         [ +  + ]:          13156 :         if (sa->slots[i].inUse)
                                166                 :          12956 :             continue;
                                167                 :                : 
                                168         [ +  + ]:            200 :         if (sa->slots[i].connection == NULL)
                                169                 :            186 :             return i;
                                170                 :                :     }
                                171                 :                : 
                                172                 :          12895 :     return -1;
                                173                 :                : }
                                174                 :                : 
                                175                 :                : /*
                                176                 :                :  * Return the offset of the first idle slot, or -1 if all slots are busy.
                                177                 :                :  */
                                178                 :                : static int
                                179                 :          12895 : find_any_idle_slot(const ParallelSlotArray *sa)
                                180                 :                : {
                                181                 :                :     int         i;
                                182                 :                : 
                                183         [ +  + ]:          25847 :     for (i = 0; i < sa->numslots; i++)
                                184         [ +  + ]:          12966 :         if (!sa->slots[i].inUse)
                                185                 :             14 :             return i;
                                186                 :                : 
                                187                 :          12881 :     return -1;
                                188                 :                : }
                                189                 :                : 
                                190                 :                : /*
                                191                 :                :  * Wait for any slot's connection to have query results, consume the results,
                                192                 :                :  * and update the slot's status as appropriate.  Returns true on success,
                                193                 :                :  * false on cancellation, on error, or if no slots are connected.
                                194                 :                :  */
                                195                 :                : static bool
                                196                 :          12881 : wait_on_slots(ParallelSlotArray *sa)
                                197                 :                : {
                                198                 :                :     int         i;
                                199                 :                :     fd_set      slotset;
                                200                 :          12881 :     int         maxFd = 0;
                                201                 :          12881 :     PGconn     *cancelconn = NULL;
                                202                 :                : 
                                203                 :                :     /* We must reconstruct the fd_set for each call to select_loop */
                                204         [ +  + ]:         218977 :     FD_ZERO(&slotset);
                                205                 :                : 
                                206         [ +  + ]:          25833 :     for (i = 0; i < sa->numslots; i++)
                                207                 :                :     {
                                208                 :                :         int         sock;
                                209                 :                : 
                                210                 :                :         /* We shouldn't get here if we still have slots without connections */
                                211         [ -  + ]:          12952 :         Assert(sa->slots[i].connection != NULL);
                                212                 :                : 
                                213                 :          12952 :         sock = PQsocket(sa->slots[i].connection);
                                214                 :                : 
                                215                 :                :         /*
                                216                 :                :          * We don't really expect any connections to lose their sockets after
                                217                 :                :          * startup, but just in case, cope by ignoring them.
                                218                 :                :          */
                                219         [ -  + ]:          12952 :         if (sock < 0)
 1740 rhaas@postgresql.org      220                 :UBC           0 :             continue;
                                221                 :                : 
                                222                 :                :         /* Keep track of the first valid connection we see. */
 1740 rhaas@postgresql.org      223         [ +  + ]:CBC       12952 :         if (cancelconn == NULL)
                                224                 :          12881 :             cancelconn = sa->slots[i].connection;
                                225                 :                : 
                                226                 :          12952 :         FD_SET(sock, &slotset);
                                227         [ +  - ]:          12952 :         if (sock > maxFd)
                                228                 :          12952 :             maxFd = sock;
                                229                 :                :     }
                                230                 :                : 
                                231                 :                :     /*
                                232                 :                :      * If we get this far with no valid connections, processing cannot
                                233                 :                :      * continue.
                                234                 :                :      */
                                235         [ -  + ]:          12881 :     if (cancelconn == NULL)
 1740 rhaas@postgresql.org      236                 :UBC           0 :         return false;
                                237                 :                : 
 1206 michael@paquier.xyz       238                 :CBC       12881 :     SetCancelConn(cancelconn);
 1740 rhaas@postgresql.org      239                 :          12881 :     i = select_loop(maxFd, &slotset);
                                240                 :          12881 :     ResetCancelConn();
                                241                 :                : 
                                242                 :                :     /* failure? */
                                243         [ -  + ]:          12881 :     if (i < 0)
 1740 rhaas@postgresql.org      244                 :UBC           0 :         return false;
                                245                 :                : 
 1740 rhaas@postgresql.org      246         [ +  + ]:CBC       25833 :     for (i = 0; i < sa->numslots; i++)
                                247                 :                :     {
                                248                 :                :         int         sock;
                                249                 :                : 
                                250                 :          12952 :         sock = PQsocket(sa->slots[i].connection);
                                251                 :                : 
                                252   [ +  -  +  + ]:          12952 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
                                253                 :                :         {
                                254                 :                :             /* select() says input is available, so consume it */
                                255                 :          12882 :             PQconsumeInput(sa->slots[i].connection);
                                256                 :                :         }
                                257                 :                : 
                                258                 :                :         /* Collect result(s) as long as any are available */
                                259         [ +  + ]:          25785 :         while (!PQisBusy(sa->slots[i].connection))
                                260                 :                :         {
                                261                 :          25666 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
                                262                 :                : 
                                263         [ +  + ]:          25666 :             if (result != NULL)
                                264                 :                :             {
                                265                 :                :                 /* Handle and discard the command result */
                                266         [ -  + ]:          12833 :                 if (!processQueryResult(&sa->slots[i], result))
 1740 rhaas@postgresql.org      267                 :UBC           0 :                     return false;
                                268                 :                :             }
                                269                 :                :             else
                                270                 :                :             {
                                271                 :                :                 /* This connection has become idle */
    6 nathan@postgresql.or      272                 :GNC       12833 :                 ParallelSlotSetIdle(&sa->slots[i]);
 1740 rhaas@postgresql.org      273                 :CBC       12833 :                 break;
                                274                 :                :             }
                                275                 :                :         }
                                276                 :                :     }
                                277                 :          12881 :     return true;
                                278                 :                : }
                                279                 :                : 
                                280                 :                : /*
                                281                 :                :  * Open a new database connection using the stored connection parameters and
                                282                 :                :  * optionally a given dbname if not null, execute the stored initial command if
                                283                 :                :  * any, and associate the new connection with the given slot.
                                284                 :                :  */
                                285                 :                : static void
                                286                 :             21 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
                                287                 :                : {
                                288                 :                :     const char *old_override;
                                289                 :             21 :     ParallelSlot *slot = &sa->slots[slotno];
                                290                 :                : 
                                291                 :             21 :     old_override = sa->cparams->override_dbname;
                                292         [ +  + ]:             21 :     if (dbname)
                                293                 :             17 :         sa->cparams->override_dbname = dbname;
                                294                 :             21 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
                                295                 :             21 :     sa->cparams->override_dbname = old_override;
                                296                 :                : 
                                297                 :                :     /*
                                298                 :                :      * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
                                299                 :                :      * FD_SET() and allied macros.  Windows defines it as a ceiling on the
                                300                 :                :      * count of file descriptors in the set, not a ceiling on the value of
                                301                 :                :      * each file descriptor; see
                                302                 :                :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
                                303                 :                :      * and
                                304                 :                :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
                                305                 :                :      * We can't ignore that, because Windows starts file descriptors at a
                                306                 :                :      * higher value, delays reuse, and skips values.  With less than ten
                                307                 :                :      * concurrent file descriptors, opened and closed rapidly, one can reach
                                308                 :                :      * file descriptor 1024.
                                309                 :                :      *
                                310                 :                :      * Doing a hard exit here is a bit grotty, but it doesn't seem worth
                                311                 :                :      * complicating the API to make it less grotty.
                                312                 :                :      */
                                313                 :                : #ifdef WIN32
                                314                 :                :     if (slotno >= FD_SETSIZE)
                                315                 :                :     {
                                316                 :                :         pg_log_error("too many jobs for this platform: %d", slotno);
                                317                 :                :         exit(1);
                                318                 :                :     }
                                319                 :                : #else
                                320                 :                :     {
  793 noah@leadboat.com         321                 :             21 :         int         fd = PQsocket(slot->connection);
                                322                 :                : 
                                323         [ -  + ]:             21 :         if (fd >= FD_SETSIZE)
                                324                 :                :         {
  793 noah@leadboat.com         325                 :UBC           0 :             pg_log_error("socket file descriptor out of range for select(): %d",
                                326                 :                :                          fd);
                                327                 :              0 :             pg_log_error_hint("Try fewer jobs.");
                                328                 :              0 :             exit(1);
                                329                 :                :         }
                                330                 :                :     }
                                331                 :                : #endif
                                332                 :                : 
                                333                 :                :     /* Setup the connection using the supplied command, if any. */
 1740 rhaas@postgresql.org      334         [ -  + ]:CBC          21 :     if (sa->initcmd)
 1740 rhaas@postgresql.org      335                 :UBC           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
 2341 michael@paquier.xyz       336                 :CBC          21 : }
                                337                 :                : 
                                338                 :                : /*
                                339                 :                :  * ParallelSlotsGetIdle
                                340                 :                :  *      Return a connection slot that is ready to execute a command.
                                341                 :                :  *
                                342                 :                :  * The slot returned is chosen as follows:
                                343                 :                :  *
                                344                 :                :  * If any idle slot already has an open connection, and if either dbname is
                                345                 :                :  * null or the existing connection is to the given database, that slot will be
                                346                 :                :  * returned allowing the connection to be reused.
                                347                 :                :  *
                                348                 :                :  * Otherwise, if any idle slot is not yet connected to any database, the slot
                                349                 :                :  * will be returned with its connection opened using the stored cparams and
                                350                 :                :  * optionally the given dbname if not null.
                                351                 :                :  *
                                352                 :                :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
                                353                 :                :  * after having its connection disconnected and reconnected using the stored
                                354                 :                :  * cparams and optionally the given dbname if not null.
                                355                 :                :  *
                                356                 :                :  * Otherwise, if any slots have connections that are busy, we loop on select()
                                357                 :                :  * until one socket becomes available.  When this happens, we read the whole
                                358                 :                :  * set and mark as free all sockets that become available.  We then select a
                                359                 :                :  * slot using the same rules as above.
                                360                 :                :  *
                                361                 :                :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
                                362                 :                :  *
                                363                 :                :  * For any connection created, if the stored initcmd is not null, it will be
                                364                 :                :  * executed as a command on the newly formed connection before the slot is
                                365                 :                :  * returned.
                                366                 :                :  *
                                367                 :                :  * If an error occurs, NULL is returned.
                                368                 :                :  */
                                369                 :                : ParallelSlot *
 1740 rhaas@postgresql.org      370                 :          13084 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
                                371                 :                : {
                                372                 :                :     int         offset;
                                373                 :                : 
                                374         [ -  + ]:          13084 :     Assert(sa);
                                375         [ +  - ]:          13084 :     Assert(sa->numslots > 0);
                                376                 :                : 
                                377                 :                :     while (1)
                                378                 :                :     {
                                379                 :                :         /* First choice: a slot already connected to the desired database. */
                                380                 :          25965 :         offset = find_matching_idle_slot(sa, dbname);
                                381         [ +  + ]:          25965 :         if (offset >= 0)
                                382                 :                :         {
                                383                 :          13063 :             sa->slots[offset].inUse = true;
                                384                 :          13063 :             return &sa->slots[offset];
                                385                 :                :         }
                                386                 :                : 
                                387                 :                :         /* Second choice: a slot not connected to any database. */
                                388                 :          12902 :         offset = find_unconnected_slot(sa);
                                389         [ +  + ]:          12902 :         if (offset >= 0)
                                390                 :                :         {
                                391                 :              7 :             connect_slot(sa, offset, dbname);
                                392                 :              7 :             sa->slots[offset].inUse = true;
                                393                 :              7 :             return &sa->slots[offset];
                                394                 :                :         }
                                395                 :                : 
                                396                 :                :         /* Third choice: a slot connected to the wrong database. */
                                397                 :          12895 :         offset = find_any_idle_slot(sa);
                                398         [ +  + ]:          12895 :         if (offset >= 0)
                                399                 :                :         {
                                400                 :             14 :             disconnectDatabase(sa->slots[offset].connection);
                                401                 :             14 :             sa->slots[offset].connection = NULL;
                                402                 :             14 :             connect_slot(sa, offset, dbname);
                                403                 :             14 :             sa->slots[offset].inUse = true;
                                404                 :             14 :             return &sa->slots[offset];
                                405                 :                :         }
                                406                 :                : 
                                407                 :                :         /*
                                408                 :                :          * Fourth choice: block until one or more slots become available. If
                                409                 :                :          * any slots hit a fatal error, we'll find out about that here and
                                410                 :                :          * return NULL.
                                411                 :                :          */
                                412         [ -  + ]:          12881 :         if (!wait_on_slots(sa))
 1740 rhaas@postgresql.org      413                 :UBC           0 :             return NULL;
                                414                 :                :     }
                                415                 :                : }
                                416                 :                : 
                                417                 :                : /*
                                418                 :                :  * ParallelSlotsSetup
                                419                 :                :  *      Prepare a set of parallel slots but do not connect to any database.
                                420                 :                :  *
                                421                 :                :  * This creates and initializes a set of slots, marking all parallel slots as
                                422                 :                :  * free and ready to use.  Establishing connections is delayed until requesting
                                423                 :                :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
                                424                 :                :  * use and must remain valid for the lifetime of the returned array.
                                425                 :                :  */
                                426                 :                : ParallelSlotArray *
 1740 rhaas@postgresql.org      427                 :CBC         182 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
                                428                 :                :                    bool echo, const char *initcmd)
                                429                 :                : {
                                430                 :                :     ParallelSlotArray *sa;
                                431                 :                : 
                                432         [ -  + ]:            182 :     Assert(numslots > 0);
                                433         [ -  + ]:            182 :     Assert(cparams != NULL);
                                434         [ -  + ]:            182 :     Assert(progname != NULL);
                                435                 :                : 
                                436                 :            182 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
                                437                 :            182 :                                        numslots * sizeof(ParallelSlot));
                                438                 :                : 
                                439                 :            182 :     sa->numslots = numslots;
                                440                 :            182 :     sa->cparams = cparams;
                                441                 :            182 :     sa->progname = progname;
                                442                 :            182 :     sa->echo = echo;
                                443                 :            182 :     sa->initcmd = initcmd;
                                444                 :                : 
                                445                 :            182 :     return sa;
                                446                 :                : }
                                447                 :                : 
                                448                 :                : /*
                                449                 :                :  * ParallelSlotsAdoptConn
                                450                 :                :  *      Assign an open connection to the slots array for reuse.
                                451                 :                :  *
                                452                 :                :  * This turns over ownership of an open connection to a slots array.  The
                                453                 :                :  * caller should not further use or close the connection.  All the connection's
                                454                 :                :  * parameters (user, host, port, etc.) except possibly dbname should match
                                455                 :                :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
                                456                 :                :  * these parameters differ, subsequent behavior is undefined.
                                457                 :                :  */
                                458                 :                : void
                                459                 :            179 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
                                460                 :                : {
                                461                 :                :     int         offset;
                                462                 :                : 
                                463                 :            179 :     offset = find_unconnected_slot(sa);
                                464         [ +  - ]:            179 :     if (offset >= 0)
                                465                 :            179 :         sa->slots[offset].connection = conn;
                                466                 :                :     else
 1740 rhaas@postgresql.org      467                 :UBC           0 :         disconnectDatabase(conn);
 2341 michael@paquier.xyz       468                 :CBC         179 : }
                                469                 :                : 
                                470                 :                : /*
                                471                 :                :  * ParallelSlotsTerminate
                                472                 :                :  *      Clean up a set of parallel slots
                                473                 :                :  *
                                474                 :                :  * Iterate through all connections in a given set of ParallelSlots and
                                475                 :                :  * terminate all connections.
                                476                 :                :  */
                                477                 :                : void
 1740 rhaas@postgresql.org      478                 :            182 : ParallelSlotsTerminate(ParallelSlotArray *sa)
                                479                 :                : {
                                480                 :                :     int         i;
                                481                 :                : 
                                482         [ +  + ]:            368 :     for (i = 0; i < sa->numslots; i++)
                                483                 :                :     {
                                484                 :            186 :         PGconn     *conn = sa->slots[i].connection;
                                485                 :                : 
 2341 michael@paquier.xyz       486         [ -  + ]:            186 :         if (conn == NULL)
 2341 michael@paquier.xyz       487                 :UBC           0 :             continue;
                                488                 :                : 
 2341 michael@paquier.xyz       489                 :CBC         186 :         disconnectDatabase(conn);
                                490                 :                :     }
                                491                 :            182 : }
                                492                 :                : 
                                493                 :                : /*
                                494                 :                :  * ParallelSlotsWaitCompletion
                                495                 :                :  *
                                496                 :                :  * Wait for all connections to finish, returning false if at least one
                                497                 :                :  * error has been found on the way.
                                498                 :                :  */
                                499                 :                : bool
 1740 rhaas@postgresql.org      500                 :            247 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
                                501                 :                : {
                                502                 :                :     int         i;
                                503                 :                : 
                                504         [ +  + ]:            493 :     for (i = 0; i < sa->numslots; i++)
                                505                 :                :     {
                                506         [ -  + ]:            252 :         if (sa->slots[i].connection == NULL)
 1740 rhaas@postgresql.org      507                 :UBC           0 :             continue;
 1740 rhaas@postgresql.org      508         [ +  + ]:CBC         252 :         if (!consumeQueryResult(&sa->slots[i]))
 1774                           509                 :              6 :             return false;
                                510                 :                :         /* Mark connection as idle */
    6 nathan@postgresql.or      511                 :GNC         246 :         ParallelSlotSetIdle(&sa->slots[i]);
                                512                 :                :     }
                                513                 :                : 
 1774 rhaas@postgresql.org      514                 :CBC         241 :     return true;
                                515                 :                : }
                                516                 :                : 
                                517                 :                : /*
                                518                 :                :  * TableCommandResultHandler
                                519                 :                :  *
                                520                 :                :  * ParallelSlotResultHandler for results of commands (not queries) against
                                521                 :                :  * tables.
                                522                 :                :  *
                                523                 :                :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
                                524                 :                :  * a missing table.  This is useful for utilities that compile a list of tables
                                525                 :                :  * to process and then run commands (vacuum, reindex, or whatever) against
                                526                 :                :  * those tables, as there is a race condition between the time the list is
                                527                 :                :  * compiled and the time the command attempts to open the table.
                                528                 :                :  *
                                529                 :                :  * For missing tables, logs an error but allows processing to continue.
                                530                 :                :  *
                                531                 :                :  * For all other errors, logs an error and terminates further processing.
                                532                 :                :  *
                                533                 :                :  * res: PGresult from the query executed on the slot's connection
                                534                 :                :  * conn: connection belonging to the slot
                                535                 :                :  * context: unused
                                536                 :                :  */
                                537                 :                : bool
                                538                 :           5349 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
                                539                 :                : {
 1740                           540         [ -  + ]:           5349 :     Assert(res != NULL);
                                541         [ -  + ]:           5349 :     Assert(conn != NULL);
                                542                 :                : 
                                543                 :                :     /*
                                544                 :                :      * If it's an error, report it.  Errors about a missing table are harmless
                                545                 :                :      * so we continue processing; but die for other errors.
                                546                 :                :      */
 1774                           547         [ +  + ]:           5349 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                548                 :                :     {
                                549                 :              6 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
                                550                 :                : 
                                551                 :              6 :         pg_log_error("processing of database \"%s\" failed: %s",
                                552                 :                :                      PQdb(conn), PQerrorMessage(conn));
                                553                 :                : 
                                554   [ +  -  +  - ]:              6 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
                                555                 :                :         {
                                556                 :              6 :             PQclear(res);
 2341 michael@paquier.xyz       557                 :              6 :             return false;
                                558                 :                :         }
                                559                 :                :     }
                                560                 :                : 
                                561                 :           5343 :     return true;
                                562                 :                : }
        

Generated by: LCOV version 2.4-beta