LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit LBC UBC CBC
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 93.3 % 312 291 10 11 291
Current Date: 2025-09-06 07:49:51 +0900 Functions: 84.6 % 13 11 1 1 11
Baseline: lcov-20250906-005545-baseline Branches: 77.3 % 264 204 7 53 204
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 7 7 7
(30,360] days: 88.0 % 108 95 2 11 95
(360..) days: 95.9 % 197 189 8 189
Function coverage date bins:
(30,360] days: 66.7 % 3 2 1 2
(360..) days: 90.0 % 10 9 1 9
Branch coverage date bins:
(7,30] days: 78.6 % 14 11 3 11
(30,360] days: 75.6 % 90 68 3 19 68
(360..) days: 78.1 % 160 125 4 31 125

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * read_stream.c
                                  4                 :                :  *    Mechanism for accessing buffered relation data with look-ahead
                                  5                 :                :  *
                                  6                 :                :  * Code that needs to access relation data typically pins blocks one at a
                                  7                 :                :  * time, often in a predictable order that might be sequential or data-driven.
                                  8                 :                :  * Calling the simple ReadBuffer() function for each block is inefficient,
                                  9                 :                :  * because blocks that are not yet in the buffer pool require I/O operations
                                 10                 :                :  * that are small and might stall waiting for storage.  This mechanism looks
                                 11                 :                :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
                                 12                 :                :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
                                 13                 :                :  * distance.
                                 14                 :                :  *
                                 15                 :                :  * A user-provided callback generates a stream of block numbers that is used
                                 16                 :                :  * to form reads of up to io_combine_limit, by attempting to merge them with a
                                 17                 :                :  * pending read.  When that isn't possible, the existing pending read is sent
                                 18                 :                :  * to StartReadBuffers() so that a new one can begin to form.
                                 19                 :                :  *
                                 20                 :                :  * The algorithm for controlling the look-ahead distance is based on recent
                                 21                 :                :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
                                 22                 :                :  * in looking ahead more than one block.  This is the default initial
                                 23                 :                :  * assumption, but when blocks needing I/O are streamed, the distance is
                                 24                 :                :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
                                 25                 :                :  * is reduced gradually when cached blocks are streamed.
                                 26                 :                :  *
                                 27                 :                :  * The main data structure is a circular queue of buffers of size
                                 28                 :                :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
                                 29                 :                :  * returned by read_stream_next_buffer().  Each buffer also has an optional
                                 30                 :                :  * variable sized object that is passed from the callback to the consumer of
                                 31                 :                :  * buffers.
                                 32                 :                :  *
                                 33                 :                :  * Parallel to the queue of buffers, there is a circular queue of in-progress
                                 34                 :                :  * I/Os that have been started with StartReadBuffers(), and for which
                                 35                 :                :  * WaitReadBuffers() must be called before returning the buffer.
                                 36                 :                :  *
                                 37                 :                :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
                                 38                 :                :  * successive calls, then these data structures might appear as follows:
                                 39                 :                :  *
                                 40                 :                :  *                          buffers buf/data       ios
                                 41                 :                :  *
                                 42                 :                :  *                          +----+  +-----+       +--------+
                                 43                 :                :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
                                 44                 :                :  *                          +----+  +-----+  |    +--------+
                                 45                 :                :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
                                 46                 :                :  *                          +----+  +-----+  | |  +--------+
                                 47                 :                :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
                                 48                 :                :  *                          +----+  +-----+    |  +--------+
                                 49                 :                :  *                          | 43 |  |  ?  |    |  |        |
                                 50                 :                :  *                          +----+  +-----+    |  +--------+
                                 51                 :                :  *                          | 44 |  |  ?  |    |  |        |
                                 52                 :                :  *                          +----+  +-----+    |  +--------+
                                 53                 :                :  *                          | 60 |  |  ?  |<---+
                                 54                 :                :  *                          +----+  +-----+
                                 55                 :                :  *     next_buffer_index -> |    |  |     |
                                 56                 :                :  *                          +----+  +-----+
                                 57                 :                :  *
                                 58                 :                :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
                                 59                 :                :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
                                 60                 :                :  * the range 42..44 requires an I/O wait before its buffers are returned, as
                                 61                 :                :  * does block 60.
                                 62                 :                :  *
                                 63                 :                :  *
                                 64                 :                :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
                                 65                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 66                 :                :  *
                                 67                 :                :  * IDENTIFICATION
                                 68                 :                :  *    src/backend/storage/aio/read_stream.c
                                 69                 :                :  *
                                 70                 :                :  *-------------------------------------------------------------------------
                                 71                 :                :  */
                                 72                 :                : #include "postgres.h"
                                 73                 :                : 
                                 74                 :                : #include "miscadmin.h"
                                 75                 :                : #include "storage/aio.h"
                                 76                 :                : #include "storage/fd.h"
                                 77                 :                : #include "storage/smgr.h"
                                 78                 :                : #include "storage/read_stream.h"
                                 79                 :                : #include "utils/memdebug.h"
                                 80                 :                : #include "utils/rel.h"
                                 81                 :                : #include "utils/spccache.h"
                                 82                 :                : 
                                 83                 :                : typedef struct InProgressIO
                                 84                 :                : {
                                 85                 :                :     int16       buffer_index;
                                 86                 :                :     ReadBuffersOperation op;
                                 87                 :                : } InProgressIO;
                                 88                 :                : 
                                 89                 :                : /*
                                 90                 :                :  * State for managing a stream of reads.
                                 91                 :                :  */
                                 92                 :                : struct ReadStream
                                 93                 :                : {
                                 94                 :                :     int16       max_ios;
                                 95                 :                :     int16       io_combine_limit;
                                 96                 :                :     int16       ios_in_progress;
                                 97                 :                :     int16       queue_size;
                                 98                 :                :     int16       max_pinned_buffers;
                                 99                 :                :     int16       forwarded_buffers;
                                100                 :                :     int16       pinned_buffers;
                                101                 :                :     int16       distance;
                                102                 :                :     int16       initialized_buffers;
                                103                 :                :     int         read_buffers_flags;
                                104                 :                :     bool        sync_mode;      /* using io_method=sync */
                                105                 :                :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
                                106                 :                :     bool        advice_enabled;
                                107                 :                :     bool        temporary;
                                108                 :                : 
                                109                 :                :     /*
                                110                 :                :      * One-block buffer to support 'ungetting' a block number, to resolve flow
                                111                 :                :      * control problems when I/Os are split.
                                112                 :                :      */
                                113                 :                :     BlockNumber buffered_blocknum;
                                114                 :                : 
                                115                 :                :     /*
                                116                 :                :      * The callback that will tell us which block numbers to read, and an
                                117                 :                :      * opaque pointer that will be pass to it for its own purposes.
                                118                 :                :      */
                                119                 :                :     ReadStreamBlockNumberCB callback;
                                120                 :                :     void       *callback_private_data;
                                121                 :                : 
                                122                 :                :     /* Next expected block, for detecting sequential access. */
                                123                 :                :     BlockNumber seq_blocknum;
                                124                 :                :     BlockNumber seq_until_processed;
                                125                 :                : 
                                126                 :                :     /* The read operation we are currently preparing. */
                                127                 :                :     BlockNumber pending_read_blocknum;
                                128                 :                :     int16       pending_read_nblocks;
                                129                 :                : 
                                130                 :                :     /* Space for buffers and optional per-buffer private data. */
                                131                 :                :     size_t      per_buffer_data_size;
                                132                 :                :     void       *per_buffer_data;
                                133                 :                : 
                                134                 :                :     /* Read operations that have been started but not waited for yet. */
                                135                 :                :     InProgressIO *ios;
                                136                 :                :     int16       oldest_io_index;
                                137                 :                :     int16       next_io_index;
                                138                 :                : 
                                139                 :                :     bool        fast_path;
                                140                 :                : 
                                141                 :                :     /* Circular queue of buffers. */
                                142                 :                :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
                                143                 :                :     int16       next_buffer_index;  /* Index of next buffer to pin */
                                144                 :                :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
                                145                 :                : };
                                146                 :                : 
                                147                 :                : /*
                                148                 :                :  * Return a pointer to the per-buffer data by index.
                                149                 :                :  */
                                150                 :                : static inline void *
  521 tmunro@postgresql.or      151                 :CBC     3040952 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
                                152                 :                : {
                                153                 :        6081904 :     return (char *) stream->per_buffer_data +
                                154                 :        3040952 :         stream->per_buffer_data_size * buffer_index;
                                155                 :                : }
                                156                 :                : 
                                157                 :                : /*
                                158                 :                :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
                                159                 :                :  * blocks [current_blocknum, last_exclusive).
                                160                 :                :  */
                                161                 :                : BlockNumber
  368 noah@leadboat.com         162                 :         334631 : block_range_read_stream_cb(ReadStream *stream,
                                163                 :                :                            void *callback_private_data,
                                164                 :                :                            void *per_buffer_data)
                                165                 :                : {
                                166                 :         334631 :     BlockRangeReadStreamPrivate *p = callback_private_data;
                                167                 :                : 
                                168         [ +  + ]:         334631 :     if (p->current_blocknum < p->last_exclusive)
                                169                 :         270277 :         return p->current_blocknum++;
                                170                 :                : 
                                171                 :          64354 :     return InvalidBlockNumber;
                                172                 :                : }
                                173                 :                : 
                                174                 :                : /*
                                175                 :                :  * Ask the callback which block it would like us to read next, with a one block
                                176                 :                :  * buffer in front to allow read_stream_unget_block() to work.
                                177                 :                :  */
                                178                 :                : static inline BlockNumber
  521 tmunro@postgresql.or      179                 :        4318544 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
                                180                 :                : {
                                181                 :                :     BlockNumber blocknum;
                                182                 :                : 
  371                           183                 :        4318544 :     blocknum = stream->buffered_blocknum;
                                184         [ -  + ]:        4318544 :     if (blocknum != InvalidBlockNumber)
  371 tmunro@postgresql.or      185                 :LBC         (2) :         stream->buffered_blocknum = InvalidBlockNumber;
                                186                 :                :     else
                                187                 :                :     {
                                188                 :                :         /*
                                189                 :                :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
                                190                 :                :          * the "noaccess" state that was set when the consumer moved past this
                                191                 :                :          * entry last time around the queue, and should also catch callbacks
                                192                 :                :          * that fail to initialize data that the buffer consumer later
                                193                 :                :          * accesses.  On the first go around, it is undefined already.
                                194                 :                :          */
                                195                 :                :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
                                196                 :                :                                     stream->per_buffer_data_size);
  371 tmunro@postgresql.or      197                 :CBC     4318544 :         blocknum = stream->callback(stream,
                                198                 :                :                                     stream->callback_private_data,
                                199                 :                :                                     per_buffer_data);
                                200                 :                :     }
                                201                 :                : 
                                202                 :        4318544 :     return blocknum;
                                203                 :                : }
                                204                 :                : 
                                205                 :                : /*
                                206                 :                :  * In order to deal with buffer shortages and I/O limits after short reads, we
                                207                 :                :  * sometimes need to defer handling of a block we've already consumed from the
                                208                 :                :  * registered callback until later.
                                209                 :                :  */
                                210                 :                : static inline void
  521 tmunro@postgresql.or      211                 :LBC         (2) : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
                                212                 :                : {
                                213                 :                :     /* We shouldn't ever unget more than one block. */
  371                           214         [ #  # ]:            (2) :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
                                215         [ #  # ]:            (2) :     Assert(blocknum != InvalidBlockNumber);
                                216                 :            (2) :     stream->buffered_blocknum = blocknum;
  521                           217                 :            (2) : }
                                218                 :                : 
                                219                 :                : /*
                                220                 :                :  * Start as much of the current pending read as we can.  If we have to split it
                                221                 :                :  * because of the per-backend buffer limit, or the buffer manager decides to
                                222                 :                :  * split it, then the pending read is adjusted to hold the remaining portion.
                                223                 :                :  *
                                224                 :                :  * We can always start a read of at least size one if we have no progress yet.
                                225                 :                :  * Otherwise it's possible that we can't start a read at all because of a lack
                                226                 :                :  * of buffers, and then false is returned.  Buffer shortages also reduce the
                                227                 :                :  * distance to a level that prevents look-ahead until buffers are released.
                                228                 :                :  */
                                229                 :                : static bool
  175 tmunro@postgresql.or      230                 :CBC     1441194 : read_stream_start_pending_read(ReadStream *stream)
                                231                 :                : {
                                232                 :                :     bool        need_wait;
                                233                 :                :     int         requested_nblocks;
                                234                 :                :     int         nblocks;
                                235                 :                :     int         flags;
                                236                 :                :     int         forwarded;
                                237                 :                :     int16       io_index;
                                238                 :                :     int16       overflow;
                                239                 :                :     int16       buffer_index;
                                240                 :                :     int         buffer_limit;
                                241                 :                : 
                                242                 :                :     /* This should only be called with a pending read. */
  521                           243         [ -  + ]:        1441194 :     Assert(stream->pending_read_nblocks > 0);
  177                           244         [ -  + ]:        1441194 :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
                                245                 :                : 
                                246                 :                :     /* We had better not exceed the per-stream buffer limit with this read. */
  521                           247         [ -  + ]:        1441194 :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
                                248                 :                :            stream->max_pinned_buffers);
                                249                 :                : 
                                250                 :                : #ifdef USE_ASSERT_CHECKING
                                251                 :                :     /* We had better not be overwriting an existing pinned buffer. */
                                252         [ +  + ]:        1441194 :     if (stream->pinned_buffers > 0)
                                253         [ -  + ]:          12104 :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
                                254                 :                :     else
                                255         [ -  + ]:        1429090 :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
                                256                 :                : 
                                257                 :                :     /*
                                258                 :                :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
                                259                 :                :      * had to split the operation should match the leading blocks of this
                                260                 :                :      * following StartReadBuffers() call.
                                261                 :                :      */
   28                           262         [ -  + ]:        1441194 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                263         [ +  + ]:        1442536 :     for (int i = 0; i < stream->forwarded_buffers; ++i)
                                264         [ -  + ]:           1342 :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
                                265                 :                :                stream->pending_read_blocknum + i);
                                266                 :                : 
                                267                 :                :     /*
                                268                 :                :      * Check that we've cleared the queue/overflow entries corresponding to
                                269                 :                :      * the rest of the blocks covered by this read, unless it's the first go
                                270                 :                :      * around and we haven't even initialized them yet.
                                271                 :                :      */
                                272         [ +  + ]:        3346438 :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
                                273   [ +  +  -  + ]:        1905244 :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
                                274                 :                :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
                                275                 :                : #endif
                                276                 :                : 
                                277                 :                :     /* Do we need to issue read-ahead advice? */
  160 andres@anarazel.de        278                 :        1441194 :     flags = stream->read_buffers_flags;
  175 tmunro@postgresql.or      279         [ +  + ]:        1441194 :     if (stream->advice_enabled)
                                280                 :                :     {
                                281         [ +  + ]:           1669 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
                                282                 :                :         {
                                283                 :                :             /*
                                284                 :                :              * Sequential:  Issue advice until the preadv() calls have caught
                                285                 :                :              * up with the first advice issued for this sequential region, and
                                286                 :                :              * then stay of the way of the kernel's own read-ahead.
                                287                 :                :              */
                                288         [ +  + ]:             23 :             if (stream->seq_until_processed != InvalidBlockNumber)
  160 andres@anarazel.de        289                 :              1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                290                 :                :         }
                                291                 :                :         else
                                292                 :                :         {
                                293                 :                :             /*
                                294                 :                :              * Random jump:  Note the starting location of a new potential
                                295                 :                :              * sequential region and start issuing advice.  Skip it this time
                                296                 :                :              * if the preadv() follows immediately, eg first block in stream.
                                297                 :                :              */
  175 tmunro@postgresql.or      298                 :           1646 :             stream->seq_until_processed = stream->pending_read_blocknum;
                                299         [ +  + ]:           1646 :             if (stream->pinned_buffers > 0)
  160 andres@anarazel.de        300                 :             24 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                301                 :                :         }
                                302                 :                :     }
                                303                 :                : 
                                304                 :                :     /*
                                305                 :                :      * How many more buffers is this backend allowed?
                                306                 :                :      *
                                307                 :                :      * Forwarded buffers are already pinned and map to the leading blocks of
                                308                 :                :      * the pending read (the remaining portion of an earlier short read that
                                309                 :                :      * we're about to continue).  They are not counted in pinned_buffers, but
                                310                 :                :      * they are counted as pins already held by this backend according to the
                                311                 :                :      * buffer manager, so they must be added to the limit it grants us.
                                312                 :                :      */
  176 tmunro@postgresql.or      313         [ +  + ]:        1441194 :     if (stream->temporary)
                                314         [ +  - ]:          11906 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
                                315                 :                :     else
                                316         [ +  - ]:        1429288 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
  169                           317         [ -  + ]:        1441194 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                318                 :                : 
                                319                 :        1441194 :     buffer_limit += stream->forwarded_buffers;
  152 andres@anarazel.de        320                 :        1441194 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
                                321                 :                : 
  176 tmunro@postgresql.or      322   [ +  +  +  - ]:        1441194 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
                                323                 :         600143 :         buffer_limit = 1;       /* guarantee progress */
                                324                 :                : 
                                325                 :                :     /* Does the per-backend limit affect this read? */
                                326                 :        1441194 :     nblocks = stream->pending_read_nblocks;
                                327         [ +  + ]:        1441194 :     if (buffer_limit < nblocks)
                                328                 :                :     {
                                329                 :                :         int16       new_distance;
                                330                 :                : 
                                331                 :                :         /* Shrink distance: no more look-ahead until buffers are released. */
                                332                 :           2276 :         new_distance = stream->pinned_buffers + buffer_limit;
                                333         [ +  + ]:           2276 :         if (stream->distance > new_distance)
                                334                 :           1695 :             stream->distance = new_distance;
                                335                 :                : 
                                336                 :                :         /* Unless we have nothing to give the consumer, stop here. */
                                337         [ +  + ]:           2276 :         if (stream->pinned_buffers > 0)
                                338                 :           1001 :             return false;
                                339                 :                : 
                                340                 :                :         /* A short read is required to make progress. */
                                341                 :           1275 :         nblocks = buffer_limit;
                                342                 :                :     }
                                343                 :                : 
                                344                 :                :     /*
                                345                 :                :      * We say how many blocks we want to read, but it may be smaller on return
                                346                 :                :      * if the buffer manager decides to shorten the read.  Initialize buffers
                                347                 :                :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
                                348                 :                :      * and keep the original nblocks number so we can check for forwarded
                                349                 :                :      * buffers as output, below.
                                350                 :                :      */
  521                           351                 :        1440193 :     buffer_index = stream->next_buffer_index;
                                352                 :        1440193 :     io_index = stream->next_io_index;
  169                           353         [ +  + ]:        2342099 :     while (stream->initialized_buffers < buffer_index + nblocks)
                                354                 :         901906 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
                                355                 :        1440193 :     requested_nblocks = nblocks;
  521                           356                 :        1440193 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                357                 :        1440193 :                                  &stream->buffers[buffer_index],
                                358                 :                :                                  stream->pending_read_blocknum,
                                359                 :                :                                  &nblocks,
                                360                 :                :                                  flags);
                                361                 :        1440187 :     stream->pinned_buffers += nblocks;
                                362                 :                : 
                                363                 :                :     /* Remember whether we need to wait before returning this buffer. */
                                364         [ +  + ]:        1440187 :     if (!need_wait)
                                365                 :                :     {
                                366                 :                :         /* Look-ahead distance decays, no I/O necessary. */
                                367         [ +  + ]:         896721 :         if (stream->distance > 1)
                                368                 :          17232 :             stream->distance--;
                                369                 :                :     }
                                370                 :                :     else
                                371                 :                :     {
                                372                 :                :         /*
                                373                 :                :          * Remember to call WaitReadBuffers() before returning head buffer.
                                374                 :                :          * Look-ahead distance will be adjusted after waiting.
                                375                 :                :          */
                                376                 :         543466 :         stream->ios[io_index].buffer_index = buffer_index;
                                377         [ +  + ]:         543466 :         if (++stream->next_io_index == stream->max_ios)
                                378                 :          22308 :             stream->next_io_index = 0;
                                379         [ -  + ]:         543466 :         Assert(stream->ios_in_progress < stream->max_ios);
                                380                 :         543466 :         stream->ios_in_progress++;
                                381                 :         543466 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
                                382                 :                :     }
                                383                 :                : 
                                384                 :                :     /*
                                385                 :                :      * How many pins were acquired but forwarded to the next call?  These need
                                386                 :                :      * to be passed to the next StartReadBuffers() call by leaving them
                                387                 :                :      * exactly where they are in the queue, or released if the stream ends
                                388                 :                :      * early.  We need the number for accounting purposes, since they are not
                                389                 :                :      * counted in stream->pinned_buffers but we already hold them.
                                390                 :                :      */
  169                           391                 :        1440187 :     forwarded = 0;
                                392         [ +  + ]:        1441530 :     while (nblocks + forwarded < requested_nblocks &&
                                393         [ +  + ]:          52443 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
                                394                 :           1343 :         forwarded++;
                                395                 :        1440187 :     stream->forwarded_buffers = forwarded;
                                396                 :                : 
                                397                 :                :     /*
                                398                 :                :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
                                399                 :                :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
                                400                 :                :      * the front of the array where they'll be consumed, but also leave a copy
                                401                 :                :      * in the overflow zone which the I/O operation has a pointer to (it needs
                                402                 :                :      * a contiguous array).  Both copies will be cleared when the buffers are
                                403                 :                :      * handed to the consumer.
                                404                 :                :      */
                                405                 :        1440187 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
  521                           406         [ +  + ]:        1440187 :     if (overflow > 0)
                                407                 :                :     {
  169                           408         [ -  + ]:            306 :         Assert(overflow < stream->queue_size);    /* can't overlap */
                                409                 :            306 :         memcpy(&stream->buffers[0],
                                410                 :            306 :                &stream->buffers[stream->queue_size],
                                411                 :                :                sizeof(stream->buffers[0]) * overflow);
                                412                 :                :     }
                                413                 :                : 
                                414                 :                :     /* Compute location of start of next read, without using % operator. */
  521                           415                 :        1440187 :     buffer_index += nblocks;
                                416         [ +  + ]:        1440187 :     if (buffer_index >= stream->queue_size)
                                417                 :         278815 :         buffer_index -= stream->queue_size;
                                418   [ +  -  -  + ]:        1440187 :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                419                 :        1440187 :     stream->next_buffer_index = buffer_index;
                                420                 :                : 
                                421                 :                :     /* Adjust the pending read to cover the remaining portion, if any. */
                                422                 :        1440187 :     stream->pending_read_blocknum += nblocks;
                                423                 :        1440187 :     stream->pending_read_nblocks -= nblocks;
                                424                 :                : 
  176                           425                 :        1440187 :     return true;
                                426                 :                : }
                                427                 :                : 
                                428                 :                : static void
  175                           429                 :        2566751 : read_stream_look_ahead(ReadStream *stream)
                                430                 :                : {
                                431                 :                :     /*
                                432                 :                :      * Allow amortizing the cost of submitting IO over multiple IOs. This
                                433                 :                :      * requires that we don't do any operations that could lead to a deadlock
                                434                 :                :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
                                435                 :                :      * careful.
                                436                 :                :      */
  160 andres@anarazel.de        437         [ +  + ]:        2566751 :     if (stream->batch_mode)
                                438                 :        2483048 :         pgaio_enter_batchmode();
                                439                 :                : 
  521 tmunro@postgresql.or      440         [ +  - ]:        4170359 :     while (stream->ios_in_progress < stream->max_ios &&
                                441         [ +  + ]:        4170359 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
                                442                 :                :     {
                                443                 :                :         BlockNumber blocknum;
                                444                 :                :         int16       buffer_index;
                                445                 :                :         void       *per_buffer_data;
                                446                 :                : 
  177                           447         [ +  + ]:        2476918 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
                                448                 :                :         {
  175                           449                 :           3550 :             read_stream_start_pending_read(stream);
  521                           450                 :           3550 :             continue;
                                451                 :                :         }
                                452                 :                : 
                                453                 :                :         /*
                                454                 :                :          * See which block the callback wants next in the stream.  We need to
                                455                 :                :          * compute the index of the Nth block of the pending read including
                                456                 :                :          * wrap-around, but we don't want to use the expensive % operator.
                                457                 :                :          */
                                458                 :        2473368 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
                                459         [ +  + ]:        2473368 :         if (buffer_index >= stream->queue_size)
                                460                 :           1694 :             buffer_index -= stream->queue_size;
                                461   [ +  -  -  + ]:        2473368 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                462                 :        2473368 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
                                463                 :        2473368 :         blocknum = read_stream_get_block(stream, per_buffer_data);
                                464         [ +  + ]:        2473368 :         if (blocknum == InvalidBlockNumber)
                                465                 :                :         {
                                466                 :                :             /* End of stream. */
                                467                 :         873310 :             stream->distance = 0;
                                468                 :         873310 :             break;
                                469                 :                :         }
                                470                 :                : 
                                471                 :                :         /* Can we merge it with the pending read? */
                                472         [ +  + ]:        1600058 :         if (stream->pending_read_nblocks > 0 &&
                                473         [ +  + ]:         212623 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
                                474                 :                :         {
                                475                 :         212579 :             stream->pending_read_nblocks++;
                                476                 :         212579 :             continue;
                                477                 :                :         }
                                478                 :                : 
                                479                 :                :         /* We have to start the pending read before we can build another. */
  517                           480         [ +  + ]:        1387532 :         while (stream->pending_read_nblocks > 0)
                                481                 :                :         {
  175                           482         [ +  - ]:             53 :             if (!read_stream_start_pending_read(stream) ||
  176                           483         [ -  + ]:             53 :                 stream->ios_in_progress == stream->max_ios)
                                484                 :                :             {
                                485                 :                :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
  521 tmunro@postgresql.or      486                 :LBC         (2) :                 read_stream_unget_block(stream, blocknum);
  160 andres@anarazel.de        487         [ #  # ]:            (2) :                 if (stream->batch_mode)
                                488                 :            (2) :                     pgaio_exit_batchmode();
  521 tmunro@postgresql.or      489                 :            (2) :                 return;
                                490                 :                :             }
                                491                 :                :         }
                                492                 :                : 
                                493                 :                :         /* This is the start of a new pending read. */
  521 tmunro@postgresql.or      494                 :CBC     1387479 :         stream->pending_read_blocknum = blocknum;
                                495                 :        1387479 :         stream->pending_read_nblocks = 1;
                                496                 :                :     }
                                497                 :                : 
                                498                 :                :     /*
                                499                 :                :      * We don't start the pending read just because we've hit the distance
                                500                 :                :      * limit, preferring to give it another chance to grow to full
                                501                 :                :      * io_combine_limit size once more buffers have been consumed.  However,
                                502                 :                :      * if we've already reached io_combine_limit, or we've reached the
                                503                 :                :      * distance limit and there isn't anything pinned yet, or the callback has
                                504                 :                :      * signaled end-of-stream, we start the read immediately.  Note that the
                                505                 :                :      * pending read can exceed the distance goal, if the latter was reduced
                                506                 :                :      * after hitting the per-backend buffer limit.
                                507                 :                :      */
                                508         [ +  + ]:        2566751 :     if (stream->pending_read_nblocks > 0 &&
  177                           509         [ +  + ]:        1474999 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
  176                           510         [ +  + ]:        1467386 :          (stream->pending_read_nblocks >= stream->distance &&
  521                           511         [ +  + ]:        1429978 :           stream->pinned_buffers == 0) ||
                                512         [ +  + ]:          43164 :          stream->distance == 0) &&
                                513         [ +  - ]:        1437591 :         stream->ios_in_progress < stream->max_ios)
  175                           514                 :        1437591 :         read_stream_start_pending_read(stream);
                                515                 :                : 
                                516                 :                :     /*
                                517                 :                :      * There should always be something pinned when we leave this function,
                                518                 :                :      * whether started by this call or not, unless we've hit the end of the
                                519                 :                :      * stream.  In the worst case we can always make progress one buffer at a
                                520                 :                :      * time.
                                521                 :                :      */
  176                           522   [ +  +  -  + ]:        2566745 :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
                                523                 :                : 
  160 andres@anarazel.de        524         [ +  + ]:        2566745 :     if (stream->batch_mode)
                                525                 :        2483042 :         pgaio_exit_batchmode();
                                526                 :                : }
                                527                 :                : 
                                528                 :                : /*
                                529                 :                :  * Create a new read stream object that can be used to perform the equivalent
                                530                 :                :  * of a series of ReadBuffer() calls for one fork of one relation.
                                531                 :                :  * Internally, it generates larger vectored reads where possible by looking
                                532                 :                :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
                                533                 :                :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
                                534                 :                :  * write extra data for each block into the space provided to it.  It will
                                535                 :                :  * also receive callback_private_data for its own purposes.
                                536                 :                :  */
                                537                 :                : static ReadStream *
  413 noah@leadboat.com         538                 :         437347 : read_stream_begin_impl(int flags,
                                539                 :                :                        BufferAccessStrategy strategy,
                                540                 :                :                        Relation rel,
                                541                 :                :                        SMgrRelation smgr,
                                542                 :                :                        char persistence,
                                543                 :                :                        ForkNumber forknum,
                                544                 :                :                        ReadStreamBlockNumberCB callback,
                                545                 :                :                        void *callback_private_data,
                                546                 :                :                        size_t per_buffer_data_size)
                                547                 :                : {
                                548                 :                :     ReadStream *stream;
                                549                 :                :     size_t      size;
                                550                 :                :     int16       queue_size;
                                551                 :                :     int16       queue_overflow;
                                552                 :                :     int         max_ios;
                                553                 :                :     int         strategy_pin_limit;
                                554                 :                :     uint32      max_pinned_buffers;
                                555                 :                :     uint32      max_possible_buffer_limit;
                                556                 :                :     Oid         tablespace_id;
                                557                 :                : 
                                558                 :                :     /*
                                559                 :                :      * Decide how many I/Os we will allow to run at the same time.  That
                                560                 :                :      * currently means advice to the kernel to tell it that we will soon read.
                                561                 :                :      * This number also affects how far we look ahead for opportunities to
                                562                 :                :      * start more I/Os.
                                563                 :                :      */
  521 tmunro@postgresql.or      564                 :         437347 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
                                565   [ +  +  +  + ]:         437347 :     if (!OidIsValid(MyDatabaseId) ||
  413 noah@leadboat.com         566   [ +  +  +  + ]:         511819 :         (rel && IsCatalogRelation(rel)) ||
  521 tmunro@postgresql.or      567                 :         141025 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
                                568                 :                :     {
                                569                 :                :         /*
                                570                 :                :          * Avoid circularity while trying to look up tablespace settings or
                                571                 :                :          * before spccache.c is ready.
                                572                 :                :          */
                                573                 :         351169 :         max_ios = effective_io_concurrency;
                                574                 :                :     }
                                575         [ +  + ]:          86178 :     else if (flags & READ_STREAM_MAINTENANCE)
                                576                 :           6251 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
                                577                 :                :     else
                                578                 :          79927 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
                                579                 :                : 
                                580                 :                :     /* Cap to INT16_MAX to avoid overflowing below */
                                581                 :         437347 :     max_ios = Min(max_ios, PG_INT16_MAX);
                                582                 :                : 
                                583                 :                :     /*
                                584                 :                :      * If starting a multi-block I/O near the end of the queue, we might
                                585                 :                :      * temporarily need extra space for overflowing buffers before they are
                                586                 :                :      * moved to regular circular position.  This is the maximum extra space we
                                587                 :                :      * could need.
                                588                 :                :      */
  177                           589                 :         437347 :     queue_overflow = io_combine_limit - 1;
                                590                 :                : 
                                591                 :                :     /*
                                592                 :                :      * Choose the maximum number of buffers we're prepared to pin.  We try to
                                593                 :                :      * pin fewer if we can, though.  We add one so that we can make progress
                                594                 :                :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
                                595                 :                :      * this also allows an extra full I/O's worth of buffers: after an I/O
                                596                 :                :      * finishes we don't want to have to wait for its buffers to be consumed
                                597                 :                :      * before starting a new one.
                                598                 :                :      *
                                599                 :                :      * Be careful not to allow int16 to overflow.  That is possible with the
                                600                 :                :      * current GUC range limits, so this is an artificial limit of ~32k
                                601                 :                :      * buffers and we'd need to adjust the types to exceed that.  We also have
                                602                 :                :      * to allow for the spare entry and the overflow space.
                                603                 :                :      */
  191                           604                 :         437347 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
  521                           605                 :         437347 :     max_pinned_buffers = Min(max_pinned_buffers,
                                606                 :                :                              PG_INT16_MAX - queue_overflow - 1);
                                607                 :                : 
                                608                 :                :     /* Give the strategy a chance to limit the number of buffers we pin. */
  518                           609                 :         437347 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
                                610                 :         437347 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
                                611                 :                : 
                                612                 :                :     /*
                                613                 :                :      * Also limit our queue to the maximum number of pins we could ever be
                                614                 :                :      * allowed to acquire according to the buffer manager.  We may not really
                                615                 :                :      * be able to use them all due to other pins held by this backend, but
                                616                 :                :      * we'll check that later in read_stream_start_pending_read().
                                617                 :                :      */
  521                           618         [ +  + ]:         437347 :     if (SmgrIsTemp(smgr))
  176                           619                 :           6789 :         max_possible_buffer_limit = GetLocalPinLimit();
                                620                 :                :     else
                                621                 :         430558 :         max_possible_buffer_limit = GetPinLimit();
                                622                 :         437347 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
                                623                 :                : 
                                624                 :                :     /*
                                625                 :                :      * The limit might be zero on a system configured with too few buffers for
                                626                 :                :      * the number of connections.  We need at least one to make progress.
                                627                 :                :      */
                                628         [ +  + ]:         437347 :     max_pinned_buffers = Max(1, max_pinned_buffers);
                                629                 :                : 
                                630                 :                :     /*
                                631                 :                :      * We need one extra entry for buffers and per-buffer data, because users
                                632                 :                :      * of per-buffer data have access to the object until the next call to
                                633                 :                :      * read_stream_next_buffer(), so we need a gap between the head and tail
                                634                 :                :      * of the queue so that we don't clobber it.
                                635                 :                :      */
  521                           636                 :         437347 :     queue_size = max_pinned_buffers + 1;
                                637                 :                : 
                                638                 :                :     /*
                                639                 :                :      * Allocate the object, the buffers, the ios and per_buffer_data space in
                                640                 :                :      * one big chunk.  Though we have queue_size buffers, we want to be able
                                641                 :                :      * to assume that all the buffers for a single read are contiguous (i.e.
                                642                 :                :      * don't wrap around halfway through), so we allow temporary overflows of
                                643                 :                :      * up to the maximum possible overflow size.
                                644                 :                :      */
                                645                 :         437347 :     size = offsetof(ReadStream, buffers);
  177                           646                 :         437347 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
  521                           647         [ +  + ]:         437347 :     size += sizeof(InProgressIO) * Max(1, max_ios);
                                648                 :         437347 :     size += per_buffer_data_size * queue_size;
                                649                 :         437347 :     size += MAXIMUM_ALIGNOF * 2;
                                650                 :         437347 :     stream = (ReadStream *) palloc(size);
                                651                 :         437347 :     memset(stream, 0, offsetof(ReadStream, buffers));
                                652                 :         437347 :     stream->ios = (InProgressIO *)
  177                           653                 :         437347 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
  521                           654         [ +  + ]:         437347 :     if (per_buffer_data_size > 0)
                                655                 :          21694 :         stream->per_buffer_data = (void *)
                                656         [ +  + ]:          21694 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
                                657                 :                : 
  160 andres@anarazel.de        658                 :         437347 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
                                659                 :         437347 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
                                660                 :                : 
                                661                 :                : #ifdef USE_PREFETCH
                                662                 :                : 
                                663                 :                :     /*
                                664                 :                :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
                                665                 :                :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
                                666                 :                :      * caller hasn't promised sequential access (overriding our detection
                                667                 :                :      * heuristics), and max_ios hasn't been set to zero.
                                668                 :                :      */
                                669         [ +  + ]:         437347 :     if (stream->sync_mode &&
                                670         [ +  - ]:           2878 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
  521 tmunro@postgresql.or      671   [ +  +  +  - ]:           2878 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
                                672                 :                :         max_ios > 0)
                                673                 :            692 :         stream->advice_enabled = true;
                                674                 :                : #endif
                                675                 :                : 
                                676                 :                :     /*
                                677                 :                :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
                                678                 :                :      * we still need to allocate space to combine and run one I/O.  Bump it up
                                679                 :                :      * to one, and remember to ask for synchronous I/O only.
                                680                 :                :      */
                                681         [ +  + ]:         437347 :     if (max_ios == 0)
                                682                 :                :     {
                                683                 :              7 :         max_ios = 1;
  160 andres@anarazel.de        684                 :              7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
                                685                 :                :     }
                                686                 :                : 
                                687                 :                :     /*
                                688                 :                :      * Capture stable values for these two GUC-derived numbers for the
                                689                 :                :      * lifetime of this stream, so we don't have to worry about the GUCs
                                690                 :                :      * changing underneath us beyond this point.
                                691                 :                :      */
  521 tmunro@postgresql.or      692                 :         437347 :     stream->max_ios = max_ios;
  177                           693                 :         437347 :     stream->io_combine_limit = io_combine_limit;
                                694                 :                : 
  521                           695                 :         437347 :     stream->per_buffer_data_size = per_buffer_data_size;
                                696                 :         437347 :     stream->max_pinned_buffers = max_pinned_buffers;
                                697                 :         437347 :     stream->queue_size = queue_size;
                                698                 :         437347 :     stream->callback = callback;
                                699                 :         437347 :     stream->callback_private_data = callback_private_data;
  371                           700                 :         437347 :     stream->buffered_blocknum = InvalidBlockNumber;
  175                           701                 :         437347 :     stream->seq_blocknum = InvalidBlockNumber;
                                702                 :         437347 :     stream->seq_until_processed = InvalidBlockNumber;
  176                           703                 :         437347 :     stream->temporary = SmgrIsTemp(smgr);
                                704                 :                : 
                                705                 :                :     /*
                                706                 :                :      * Skip the initial ramp-up phase if the caller says we're going to be
                                707                 :                :      * reading the whole relation.  This way we start out assuming we'll be
                                708                 :                :      * doing full io_combine_limit sized reads.
                                709                 :                :      */
  521                           710         [ +  + ]:         437347 :     if (flags & READ_STREAM_FULL)
  177                           711                 :          64494 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                712                 :                :     else
  521                           713                 :         372853 :         stream->distance = 1;
                                714                 :                : 
                                715                 :                :     /*
                                716                 :                :      * Since we always access the same relation, we can initialize parts of
                                717                 :                :      * the ReadBuffersOperation objects and leave them that way, to avoid
                                718                 :                :      * wasting CPU cycles writing to them for each read.
                                719                 :                :      */
                                720         [ +  + ]:        7456316 :     for (int i = 0; i < max_ios; ++i)
                                721                 :                :     {
                                722                 :        7018969 :         stream->ios[i].op.rel = rel;
  413 noah@leadboat.com         723                 :        7018969 :         stream->ios[i].op.smgr = smgr;
                                724                 :        7018969 :         stream->ios[i].op.persistence = persistence;
  521 tmunro@postgresql.or      725                 :        7018969 :         stream->ios[i].op.forknum = forknum;
                                726                 :        7018969 :         stream->ios[i].op.strategy = strategy;
                                727                 :                :     }
                                728                 :                : 
                                729                 :         437347 :     return stream;
                                730                 :                : }
                                731                 :                : 
                                732                 :                : /*
                                733                 :                :  * Create a new read stream for reading a relation.
                                734                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                735                 :                :  */
                                736                 :                : ReadStream *
  413 noah@leadboat.com         737                 :         378525 : read_stream_begin_relation(int flags,
                                738                 :                :                            BufferAccessStrategy strategy,
                                739                 :                :                            Relation rel,
                                740                 :                :                            ForkNumber forknum,
                                741                 :                :                            ReadStreamBlockNumberCB callback,
                                742                 :                :                            void *callback_private_data,
                                743                 :                :                            size_t per_buffer_data_size)
                                744                 :                : {
                                745                 :         378525 :     return read_stream_begin_impl(flags,
                                746                 :                :                                   strategy,
                                747                 :                :                                   rel,
                                748                 :                :                                   RelationGetSmgr(rel),
                                749                 :         378525 :                                   rel->rd_rel->relpersistence,
                                750                 :                :                                   forknum,
                                751                 :                :                                   callback,
                                752                 :                :                                   callback_private_data,
                                753                 :                :                                   per_buffer_data_size);
                                754                 :                : }
                                755                 :                : 
                                756                 :                : /*
                                757                 :                :  * Create a new read stream for reading a SMgr relation.
                                758                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                759                 :                :  */
                                760                 :                : ReadStream *
                                761                 :          58822 : read_stream_begin_smgr_relation(int flags,
                                762                 :                :                                 BufferAccessStrategy strategy,
                                763                 :                :                                 SMgrRelation smgr,
                                764                 :                :                                 char smgr_persistence,
                                765                 :                :                                 ForkNumber forknum,
                                766                 :                :                                 ReadStreamBlockNumberCB callback,
                                767                 :                :                                 void *callback_private_data,
                                768                 :                :                                 size_t per_buffer_data_size)
                                769                 :                : {
                                770                 :          58822 :     return read_stream_begin_impl(flags,
                                771                 :                :                                   strategy,
                                772                 :                :                                   NULL,
                                773                 :                :                                   smgr,
                                774                 :                :                                   smgr_persistence,
                                775                 :                :                                   forknum,
                                776                 :                :                                   callback,
                                777                 :                :                                   callback_private_data,
                                778                 :                :                                   per_buffer_data_size);
                                779                 :                : }
                                780                 :                : 
                                781                 :                : /*
                                782                 :                :  * Pull one pinned buffer out of a stream.  Each call returns successive
                                783                 :                :  * blocks in the order specified by the callback.  If per_buffer_data_size was
                                784                 :                :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
                                785                 :                :  * per-buffer data that the callback had a chance to populate, which remains
                                786                 :                :  * valid until the next call to read_stream_next_buffer().  When the stream
                                787                 :                :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
                                788                 :                :  * the stream early at any time by calling read_stream_end().
                                789                 :                :  */
                                790                 :                : Buffer
  521 tmunro@postgresql.or      791                 :        5306077 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                                792                 :                : {
                                793                 :                :     Buffer      buffer;
                                794                 :                :     int16       oldest_buffer_index;
                                795                 :                : 
                                796                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                797                 :                : 
                                798                 :                :     /*
                                799                 :                :      * A fast path for all-cached scans.  This is the same as the usual
                                800                 :                :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
                                801                 :                :      * we can skip the queue management code, stay in the same buffer slot and
                                802                 :                :      * use singular StartReadBuffer().
                                803                 :                :      */
                                804         [ +  + ]:        5306077 :     if (likely(stream->fast_path))
                                805                 :                :     {
                                806                 :                :         BlockNumber next_blocknum;
                                807                 :                : 
                                808                 :                :         /* Fast path assumptions. */
                                809         [ -  + ]:        1845176 :         Assert(stream->ios_in_progress == 0);
  169                           810         [ -  + ]:        1845176 :         Assert(stream->forwarded_buffers == 0);
  521                           811         [ -  + ]:        1845176 :         Assert(stream->pinned_buffers == 1);
                                812         [ -  + ]:        1845176 :         Assert(stream->distance == 1);
  518                           813         [ -  + ]:        1845176 :         Assert(stream->pending_read_nblocks == 0);
  521                           814         [ -  + ]:        1845176 :         Assert(stream->per_buffer_data_size == 0);
  169                           815         [ -  + ]:        1845176 :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
                                816                 :                : 
                                817                 :                :         /* We're going to return the buffer we pinned last time. */
  521                           818                 :        1845176 :         oldest_buffer_index = stream->oldest_buffer_index;
                                819         [ -  + ]:        1845176 :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
                                820                 :                :                stream->next_buffer_index);
                                821                 :        1845176 :         buffer = stream->buffers[oldest_buffer_index];
                                822         [ -  + ]:        1845176 :         Assert(buffer != InvalidBuffer);
                                823                 :                : 
                                824                 :                :         /* Choose the next block to pin. */
  371                           825                 :        1845176 :         next_blocknum = read_stream_get_block(stream, NULL);
                                826                 :                : 
  518                           827         [ +  + ]:        1845176 :         if (likely(next_blocknum != InvalidBlockNumber))
                                828                 :                :         {
  160 andres@anarazel.de        829                 :        1764749 :             int         flags = stream->read_buffers_flags;
                                830                 :                : 
                                831         [ +  + ]:        1764749 :             if (stream->advice_enabled)
                                832                 :            554 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                833                 :                : 
                                834                 :                :             /*
                                835                 :                :              * Pin a buffer for the next call.  Same buffer entry, and
                                836                 :                :              * arbitrary I/O entry (they're all free).  We don't have to
                                837                 :                :              * adjust pinned_buffers because we're transferring one to caller
                                838                 :                :              * but pinning one more.
                                839                 :                :              *
                                840                 :                :              * In the fast path we don't need to check the pin limit.  We're
                                841                 :                :              * always allowed at least one pin so that progress can be made,
                                842                 :                :              * and that's all we need here.  Although two pins are momentarily
                                843                 :                :              * held at the same time, the model used here is that the stream
                                844                 :                :              * holds only one, and the other now belongs to the caller.
                                845                 :                :              */
  518 tmunro@postgresql.or      846         [ +  + ]:        1764749 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
                                847                 :                :                                         &stream->buffers[oldest_buffer_index],
                                848                 :                :                                         next_blocknum,
                                849                 :                :                                         flags)))
                                850                 :                :             {
                                851                 :                :                 /* Fast return. */
                                852                 :        1751047 :                 return buffer;
                                853                 :                :             }
                                854                 :                : 
                                855                 :                :             /* Next call must wait for I/O for the newly pinned buffer. */
  521                           856                 :          13702 :             stream->oldest_io_index = 0;
                                857                 :          13702 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
                                858                 :          13702 :             stream->ios_in_progress = 1;
                                859                 :          13702 :             stream->ios[0].buffer_index = oldest_buffer_index;
                                860                 :          13702 :             stream->seq_blocknum = next_blocknum + 1;
                                861                 :                :         }
                                862                 :                :         else
                                863                 :                :         {
                                864                 :                :             /* No more blocks, end of stream. */
  518                           865                 :          80427 :             stream->distance = 0;
                                866                 :          80427 :             stream->oldest_buffer_index = stream->next_buffer_index;
                                867                 :          80427 :             stream->pinned_buffers = 0;
  169                           868                 :          80427 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
                                869                 :                :         }
                                870                 :                : 
  518                           871                 :          94129 :         stream->fast_path = false;
  521                           872                 :          94129 :         return buffer;
                                873                 :                :     }
                                874                 :                : #endif
                                875                 :                : 
                                876         [ +  + ]:        3460901 :     if (unlikely(stream->pinned_buffers == 0))
                                877                 :                :     {
                                878         [ -  + ]:        2509805 :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
                                879                 :                : 
                                880                 :                :         /* End of stream reached?  */
                                881         [ +  + ]:        2509805 :         if (stream->distance == 0)
                                882                 :        1461752 :             return InvalidBuffer;
                                883                 :                : 
                                884                 :                :         /*
                                885                 :                :          * The usual order of operations is that we look ahead at the bottom
                                886                 :                :          * of this function after potentially finishing an I/O and making
                                887                 :                :          * space for more, but if we're just starting up we'll need to crank
                                888                 :                :          * the handle to get started.
                                889                 :                :          */
  175                           890                 :        1048053 :         read_stream_look_ahead(stream);
                                891                 :                : 
                                892                 :                :         /* End of stream reached? */
  521                           893         [ +  + ]:        1048053 :         if (stream->pinned_buffers == 0)
                                894                 :                :         {
                                895         [ -  + ]:         480432 :             Assert(stream->distance == 0);
                                896                 :         480432 :             return InvalidBuffer;
                                897                 :                :         }
                                898                 :                :     }
                                899                 :                : 
                                900                 :                :     /* Grab the oldest pinned buffer and associated per-buffer data. */
                                901         [ -  + ]:        1518717 :     Assert(stream->pinned_buffers > 0);
                                902                 :        1518717 :     oldest_buffer_index = stream->oldest_buffer_index;
                                903   [ +  -  -  + ]:        1518717 :     Assert(oldest_buffer_index >= 0 &&
                                904                 :                :            oldest_buffer_index < stream->queue_size);
                                905                 :        1518717 :     buffer = stream->buffers[oldest_buffer_index];
                                906         [ +  + ]:        1518717 :     if (per_buffer_data)
                                907                 :         283785 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
                                908                 :                : 
                                909         [ -  + ]:        1518717 :     Assert(BufferIsValid(buffer));
                                910                 :                : 
                                911                 :                :     /* Do we have to wait for an associated I/O first? */
                                912         [ +  + ]:        1518717 :     if (stream->ios_in_progress > 0 &&
                                913         [ +  + ]:         639156 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
                                914                 :                :     {
                                915                 :         556869 :         int16       io_index = stream->oldest_io_index;
                                916                 :                :         int32       distance;   /* wider temporary value, clamped below */
                                917                 :                : 
                                918                 :                :         /* Sanity check that we still agree on the buffers. */
                                919         [ -  + ]:         556869 :         Assert(stream->ios[io_index].op.buffers ==
                                920                 :                :                &stream->buffers[oldest_buffer_index]);
                                921                 :                : 
                                922                 :         556869 :         WaitReadBuffers(&stream->ios[io_index].op);
                                923                 :                : 
                                924         [ -  + ]:         556850 :         Assert(stream->ios_in_progress > 0);
                                925                 :         556850 :         stream->ios_in_progress--;
                                926         [ +  + ]:         556850 :         if (++stream->oldest_io_index == stream->max_ios)
                                927                 :          22308 :             stream->oldest_io_index = 0;
                                928                 :                : 
                                929                 :                :         /* Look-ahead distance ramps up rapidly after we do I/O. */
  174                           930                 :         556850 :         distance = stream->distance * 2;
                                931                 :         556850 :         distance = Min(distance, stream->max_pinned_buffers);
                                932                 :         556850 :         stream->distance = distance;
                                933                 :                : 
                                934                 :                :         /*
                                935                 :                :          * If we've reached the first block of a sequential region we're
                                936                 :                :          * issuing advice for, cancel that until the next jump.  The kernel
                                937                 :                :          * will see the sequential preadv() pattern starting here.
                                938                 :                :          */
  175                           939         [ +  + ]:         556850 :         if (stream->advice_enabled &&
                                940         [ +  + ]:            273 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
                                941                 :            251 :             stream->seq_until_processed = InvalidBlockNumber;
                                942                 :                :     }
                                943                 :                : 
                                944                 :                :     /*
                                945                 :                :      * We must zap this queue entry, or else it would appear as a forwarded
                                946                 :                :      * buffer.  If it's potentially in the overflow zone (ie from a
                                947                 :                :      * multi-block I/O that wrapped around the queue), also zap the copy.
                                948                 :                :      */
  521                           949                 :        1518698 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
  169                           950         [ +  + ]:        1518698 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
                                951                 :        1174314 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
                                952                 :                :             InvalidBuffer;
                                953                 :                : 
                                954                 :                : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
                                955                 :                : 
                                956                 :                :     /*
                                957                 :                :      * The caller will get access to the per-buffer data, until the next call.
                                958                 :                :      * We wipe the one before, which is never occupied because queue_size
                                959                 :                :      * allowed one extra element.  This will hopefully trip up client code
                                960                 :                :      * that is holding a dangling pointer to it.
                                961                 :                :      */
  521                           962         [ +  + ]:        1518698 :     if (stream->per_buffer_data)
                                963                 :                :     {
                                964                 :                :         void       *per_buffer_data;
                                965                 :                : 
  203                           966         [ +  + ]:         567598 :         per_buffer_data = get_per_buffer_data(stream,
                                967                 :                :                                               oldest_buffer_index == 0 ?
                                968                 :          71462 :                                               stream->queue_size - 1 :
                                969                 :         212337 :                                               oldest_buffer_index - 1);
                                970                 :                : 
                                971                 :                : #if defined(CLOBBER_FREED_MEMORY)
                                972                 :                :         /* This also tells Valgrind the memory is "noaccess". */
                                973                 :         283799 :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
                                974                 :                : #elif defined(USE_VALGRIND)
                                975                 :                :         /* Tell it ourselves. */
                                976                 :                :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
                                977                 :                :                                    stream->per_buffer_data_size);
                                978                 :                : #endif
                                979                 :                :     }
                                980                 :                : #endif
                                981                 :                : 
                                982                 :                :     /* Pin transferred to caller. */
  521                           983         [ -  + ]:        1518698 :     Assert(stream->pinned_buffers > 0);
                                984                 :        1518698 :     stream->pinned_buffers--;
                                985                 :                : 
                                986                 :                :     /* Advance oldest buffer, with wrap-around. */
                                987                 :        1518698 :     stream->oldest_buffer_index++;
                                988         [ +  + ]:        1518698 :     if (stream->oldest_buffer_index == stream->queue_size)
                                989                 :         271751 :         stream->oldest_buffer_index = 0;
                                990                 :                : 
                                991                 :                :     /* Prepare for the next call. */
  175                           992                 :        1518698 :     read_stream_look_ahead(stream);
                                993                 :                : 
                                994                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                995                 :                :     /* See if we can take the fast path for all-cached scans next time. */
  521                           996         [ +  + ]:        1518692 :     if (stream->ios_in_progress == 0 &&
  169                           997         [ +  + ]:         962000 :         stream->forwarded_buffers == 0 &&
  521                           998         [ +  + ]:         961485 :         stream->pinned_buffers == 1 &&
                                999         [ +  + ]:         418462 :         stream->distance == 1 &&
  518                          1000         [ +  + ]:         350227 :         stream->pending_read_nblocks == 0 &&
  521                          1001         [ +  + ]:         348828 :         stream->per_buffer_data_size == 0)
                               1002                 :                :     {
                               1003                 :                :         /*
                               1004                 :                :          * The fast path spins on one buffer entry repeatedly instead of
                               1005                 :                :          * rotating through the whole queue and clearing the entries behind
                               1006                 :                :          * it.  If the buffer it starts with happened to be forwarded between
                               1007                 :                :          * StartReadBuffers() calls and also wrapped around the circular queue
                               1008                 :                :          * partway through, then a copy also exists in the overflow zone, and
                               1009                 :                :          * it won't clear it out as the regular path would.  Do that now, so
                               1010                 :                :          * it doesn't need code for that.
                               1011                 :                :          */
   28                          1012         [ +  + ]:         182315 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
                               1013                 :         180908 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
                               1014                 :                :                 InvalidBuffer;
                               1015                 :                : 
  521                          1016                 :         182315 :         stream->fast_path = true;
                               1017                 :                :     }
                               1018                 :                : #endif
                               1019                 :                : 
                               1020                 :        1518692 :     return buffer;
                               1021                 :                : }
                               1022                 :                : 
                               1023                 :                : /*
                               1024                 :                :  * Transitional support for code that would like to perform or skip reads
                               1025                 :                :  * itself, without using the stream.  Returns, and consumes, the next block
                               1026                 :                :  * number that would be read by the stream's look-ahead algorithm, or
                               1027                 :                :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
                               1028                 :                :  * strategy that would be used to read it.
                               1029                 :                :  */
                               1030                 :                : BlockNumber
  353 tmunro@postgresql.or     1031                 :UBC           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
                               1032                 :                : {
                               1033                 :              0 :     *strategy = stream->ios[0].op.strategy;
                               1034                 :              0 :     return read_stream_get_block(stream, NULL);
                               1035                 :                : }
                               1036                 :                : 
                               1037                 :                : /*
                               1038                 :                :  * Reset a read stream by releasing any queued up buffers, allowing the stream
                               1039                 :                :  * to be used again for different blocks.  This can be used to clear an
                               1040                 :                :  * end-of-stream condition and start again, or to throw away blocks that were
                               1041                 :                :  * speculatively read and read some different blocks instead.
                               1042                 :                :  */
                               1043                 :                : void
  521 tmunro@postgresql.or     1044                 :CBC     1048012 : read_stream_reset(ReadStream *stream)
                               1045                 :                : {
                               1046                 :                :     int16       index;
                               1047                 :                :     Buffer      buffer;
                               1048                 :                : 
                               1049                 :                :     /* Stop looking ahead. */
                               1050                 :        1048012 :     stream->distance = 0;
                               1051                 :                : 
                               1052                 :                :     /* Forget buffered block number and fast path state. */
  371                          1053                 :        1048012 :     stream->buffered_blocknum = InvalidBlockNumber;
  518                          1054                 :        1048012 :     stream->fast_path = false;
                               1055                 :                : 
                               1056                 :                :     /* Unpin anything that wasn't consumed. */
  521                          1057         [ +  + ]:        1161859 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
                               1058                 :         113847 :         ReleaseBuffer(buffer);
                               1059                 :                : 
                               1060                 :                :     /* Unpin any unused forwarded buffers. */
  169                          1061                 :        1048012 :     index = stream->next_buffer_index;
                               1062         [ +  + ]:        1048012 :     while (index < stream->initialized_buffers &&
                               1063         [ -  + ]:         227709 :            (buffer = stream->buffers[index]) != InvalidBuffer)
                               1064                 :                :     {
  169 tmunro@postgresql.or     1065         [ #  # ]:UBC           0 :         Assert(stream->forwarded_buffers > 0);
                               1066                 :              0 :         stream->forwarded_buffers--;
                               1067                 :              0 :         ReleaseBuffer(buffer);
                               1068                 :                : 
                               1069                 :              0 :         stream->buffers[index] = InvalidBuffer;
                               1070         [ #  # ]:              0 :         if (index < stream->io_combine_limit - 1)
                               1071                 :              0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
                               1072                 :                : 
                               1073         [ #  # ]:              0 :         if (++index == stream->queue_size)
                               1074                 :              0 :             index = 0;
                               1075                 :                :     }
                               1076                 :                : 
  169 tmunro@postgresql.or     1077         [ -  + ]:CBC     1048012 :     Assert(stream->forwarded_buffers == 0);
  521                          1078         [ -  + ]:        1048012 :     Assert(stream->pinned_buffers == 0);
                               1079         [ -  + ]:        1048012 :     Assert(stream->ios_in_progress == 0);
                               1080                 :                : 
                               1081                 :                :     /* Start off assuming data is cached. */
                               1082                 :        1048012 :     stream->distance = 1;
                               1083                 :        1048012 : }
                               1084                 :                : 
                               1085                 :                : /*
                               1086                 :                :  * Release and free a read stream.
                               1087                 :                :  */
                               1088                 :                : void
                               1089                 :         435069 : read_stream_end(ReadStream *stream)
                               1090                 :                : {
                               1091                 :         435069 :     read_stream_reset(stream);
                               1092                 :         435069 :     pfree(stream);
                               1093                 :         435069 : }
        

Generated by: LCOV version 2.4-beta