LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit UNC UBC GNC CBC
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 91.3 % 321 293 7 21 2 291
Current Date: 2026-03-14 14:10:32 -0400 Functions: 73.3 % 15 11 2 2 2 9
Baseline: lcov-20260315-024220-baseline Branches: 77.3 % 264 204 60 204
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 22.2 % 9 2 7 2
(30,360] days: 82.5 % 57 47 10 47
(360..) days: 95.7 % 255 244 11 244
Function coverage date bins:
(7,30] days: 0.0 % 2 0 2
(360..) days: 84.6 % 13 11 2 2 9
Branch coverage date bins:
(30,360] days: 67.9 % 56 38 18 38
(360..) days: 79.8 % 208 166 42 166

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * read_stream.c
                                  4                 :                :  *    Mechanism for accessing buffered relation data with look-ahead
                                  5                 :                :  *
                                  6                 :                :  * Code that needs to access relation data typically pins blocks one at a
                                  7                 :                :  * time, often in a predictable order that might be sequential or data-driven.
                                  8                 :                :  * Calling the simple ReadBuffer() function for each block is inefficient,
                                  9                 :                :  * because blocks that are not yet in the buffer pool require I/O operations
                                 10                 :                :  * that are small and might stall waiting for storage.  This mechanism looks
                                 11                 :                :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
                                 12                 :                :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
                                 13                 :                :  * distance.
                                 14                 :                :  *
                                 15                 :                :  * A user-provided callback generates a stream of block numbers that is used
                                 16                 :                :  * to form reads of up to io_combine_limit, by attempting to merge them with a
                                 17                 :                :  * pending read.  When that isn't possible, the existing pending read is sent
                                 18                 :                :  * to StartReadBuffers() so that a new one can begin to form.
                                 19                 :                :  *
                                 20                 :                :  * The algorithm for controlling the look-ahead distance is based on recent
                                 21                 :                :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
                                 22                 :                :  * in looking ahead more than one block.  This is the default initial
                                 23                 :                :  * assumption, but when blocks needing I/O are streamed, the distance is
                                 24                 :                :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
                                 25                 :                :  * is reduced gradually when cached blocks are streamed.
                                 26                 :                :  *
                                 27                 :                :  * The main data structure is a circular queue of buffers of size
                                 28                 :                :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
                                 29                 :                :  * returned by read_stream_next_buffer().  Each buffer also has an optional
                                 30                 :                :  * variable sized object that is passed from the callback to the consumer of
                                 31                 :                :  * buffers.
                                 32                 :                :  *
                                 33                 :                :  * Parallel to the queue of buffers, there is a circular queue of in-progress
                                 34                 :                :  * I/Os that have been started with StartReadBuffers(), and for which
                                 35                 :                :  * WaitReadBuffers() must be called before returning the buffer.
                                 36                 :                :  *
                                 37                 :                :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
                                 38                 :                :  * successive calls, then these data structures might appear as follows:
                                 39                 :                :  *
                                 40                 :                :  *                          buffers buf/data       ios
                                 41                 :                :  *
                                 42                 :                :  *                          +----+  +-----+       +--------+
                                 43                 :                :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
                                 44                 :                :  *                          +----+  +-----+  |    +--------+
                                 45                 :                :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
                                 46                 :                :  *                          +----+  +-----+  | |  +--------+
                                 47                 :                :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
                                 48                 :                :  *                          +----+  +-----+    |  +--------+
                                 49                 :                :  *                          | 43 |  |  ?  |    |  |        |
                                 50                 :                :  *                          +----+  +-----+    |  +--------+
                                 51                 :                :  *                          | 44 |  |  ?  |    |  |        |
                                 52                 :                :  *                          +----+  +-----+    |  +--------+
                                 53                 :                :  *                          | 60 |  |  ?  |<---+
                                 54                 :                :  *                          +----+  +-----+
                                 55                 :                :  *     next_buffer_index -> |    |  |     |
                                 56                 :                :  *                          +----+  +-----+
                                 57                 :                :  *
                                 58                 :                :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
                                 59                 :                :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
                                 60                 :                :  * the range 42..44 requires an I/O wait before its buffers are returned, as
                                 61                 :                :  * does block 60.
                                 62                 :                :  *
                                 63                 :                :  *
                                 64                 :                :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
                                 65                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 66                 :                :  *
                                 67                 :                :  * IDENTIFICATION
                                 68                 :                :  *    src/backend/storage/aio/read_stream.c
                                 69                 :                :  *
                                 70                 :                :  *-------------------------------------------------------------------------
                                 71                 :                :  */
                                 72                 :                : #include "postgres.h"
                                 73                 :                : 
                                 74                 :                : #include "miscadmin.h"
                                 75                 :                : #include "storage/aio.h"
                                 76                 :                : #include "storage/fd.h"
                                 77                 :                : #include "storage/smgr.h"
                                 78                 :                : #include "storage/read_stream.h"
                                 79                 :                : #include "utils/memdebug.h"
                                 80                 :                : #include "utils/rel.h"
                                 81                 :                : #include "utils/spccache.h"
                                 82                 :                : 
                                 83                 :                : typedef struct InProgressIO
                                 84                 :                : {
                                 85                 :                :     int16       buffer_index;
                                 86                 :                :     ReadBuffersOperation op;
                                 87                 :                : } InProgressIO;
                                 88                 :                : 
                                 89                 :                : /*
                                 90                 :                :  * State for managing a stream of reads.
                                 91                 :                :  */
                                 92                 :                : struct ReadStream
                                 93                 :                : {
                                 94                 :                :     int16       max_ios;
                                 95                 :                :     int16       io_combine_limit;
                                 96                 :                :     int16       ios_in_progress;
                                 97                 :                :     int16       queue_size;
                                 98                 :                :     int16       max_pinned_buffers;
                                 99                 :                :     int16       forwarded_buffers;
                                100                 :                :     int16       pinned_buffers;
                                101                 :                :     int16       distance;
                                102                 :                :     int16       initialized_buffers;
                                103                 :                :     int16       resume_distance;
                                104                 :                :     int         read_buffers_flags;
                                105                 :                :     bool        sync_mode;      /* using io_method=sync */
                                106                 :                :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
                                107                 :                :     bool        advice_enabled;
                                108                 :                :     bool        temporary;
                                109                 :                : 
                                110                 :                :     /*
                                111                 :                :      * One-block buffer to support 'ungetting' a block number, to resolve flow
                                112                 :                :      * control problems when I/Os are split.
                                113                 :                :      */
                                114                 :                :     BlockNumber buffered_blocknum;
                                115                 :                : 
                                116                 :                :     /*
                                117                 :                :      * The callback that will tell us which block numbers to read, and an
                                118                 :                :      * opaque pointer that will be pass to it for its own purposes.
                                119                 :                :      */
                                120                 :                :     ReadStreamBlockNumberCB callback;
                                121                 :                :     void       *callback_private_data;
                                122                 :                : 
                                123                 :                :     /* Next expected block, for detecting sequential access. */
                                124                 :                :     BlockNumber seq_blocknum;
                                125                 :                :     BlockNumber seq_until_processed;
                                126                 :                : 
                                127                 :                :     /* The read operation we are currently preparing. */
                                128                 :                :     BlockNumber pending_read_blocknum;
                                129                 :                :     int16       pending_read_nblocks;
                                130                 :                : 
                                131                 :                :     /* Space for buffers and optional per-buffer private data. */
                                132                 :                :     size_t      per_buffer_data_size;
                                133                 :                :     void       *per_buffer_data;
                                134                 :                : 
                                135                 :                :     /* Read operations that have been started but not waited for yet. */
                                136                 :                :     InProgressIO *ios;
                                137                 :                :     int16       oldest_io_index;
                                138                 :                :     int16       next_io_index;
                                139                 :                : 
                                140                 :                :     bool        fast_path;
                                141                 :                : 
                                142                 :                :     /* Circular queue of buffers. */
                                143                 :                :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
                                144                 :                :     int16       next_buffer_index;  /* Index of next buffer to pin */
                                145                 :                :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
                                146                 :                : };
                                147                 :                : 
                                148                 :                : /*
                                149                 :                :  * Return a pointer to the per-buffer data by index.
                                150                 :                :  */
                                151                 :                : static inline void *
  711 tmunro@postgresql.or      152                 :CBC     4060425 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
                                153                 :                : {
                                154                 :        8120850 :     return (char *) stream->per_buffer_data +
                                155                 :        4060425 :         stream->per_buffer_data_size * buffer_index;
                                156                 :                : }
                                157                 :                : 
                                158                 :                : /*
                                159                 :                :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
                                160                 :                :  * blocks [current_blocknum, last_exclusive).
                                161                 :                :  */
                                162                 :                : BlockNumber
  558 noah@leadboat.com         163                 :         394733 : block_range_read_stream_cb(ReadStream *stream,
                                164                 :                :                            void *callback_private_data,
                                165                 :                :                            void *per_buffer_data)
                                166                 :                : {
                                167                 :         394733 :     BlockRangeReadStreamPrivate *p = callback_private_data;
                                168                 :                : 
                                169         [ +  + ]:         394733 :     if (p->current_blocknum < p->last_exclusive)
                                170                 :         324854 :         return p->current_blocknum++;
                                171                 :                : 
                                172                 :          69879 :     return InvalidBlockNumber;
                                173                 :                : }
                                174                 :                : 
                                175                 :                : /*
                                176                 :                :  * Ask the callback which block it would like us to read next, with a one block
                                177                 :                :  * buffer in front to allow read_stream_unget_block() to work.
                                178                 :                :  */
                                179                 :                : static inline BlockNumber
  711 tmunro@postgresql.or      180                 :        5497737 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
                                181                 :                : {
                                182                 :                :     BlockNumber blocknum;
                                183                 :                : 
  561                           184                 :        5497737 :     blocknum = stream->buffered_blocknum;
                                185         [ -  + ]:        5497737 :     if (blocknum != InvalidBlockNumber)
  561 tmunro@postgresql.or      186                 :UBC           0 :         stream->buffered_blocknum = InvalidBlockNumber;
                                187                 :                :     else
                                188                 :                :     {
                                189                 :                :         /*
                                190                 :                :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
                                191                 :                :          * the "noaccess" state that was set when the consumer moved past this
                                192                 :                :          * entry last time around the queue, and should also catch callbacks
                                193                 :                :          * that fail to initialize data that the buffer consumer later
                                194                 :                :          * accesses.  On the first go around, it is undefined already.
                                195                 :                :          */
                                196                 :                :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
                                197                 :                :                                     stream->per_buffer_data_size);
  561 tmunro@postgresql.or      198                 :CBC     5497737 :         blocknum = stream->callback(stream,
                                199                 :                :                                     stream->callback_private_data,
                                200                 :                :                                     per_buffer_data);
                                201                 :                :     }
                                202                 :                : 
                                203                 :        5497737 :     return blocknum;
                                204                 :                : }
                                205                 :                : 
                                206                 :                : /*
                                207                 :                :  * In order to deal with buffer shortages and I/O limits after short reads, we
                                208                 :                :  * sometimes need to defer handling of a block we've already consumed from the
                                209                 :                :  * registered callback until later.
                                210                 :                :  */
                                211                 :                : static inline void
  711 tmunro@postgresql.or      212                 :UBC           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
                                213                 :                : {
                                214                 :                :     /* We shouldn't ever unget more than one block. */
  561                           215         [ #  # ]:              0 :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
                                216         [ #  # ]:              0 :     Assert(blocknum != InvalidBlockNumber);
                                217                 :              0 :     stream->buffered_blocknum = blocknum;
  711                           218                 :              0 : }
                                219                 :                : 
                                220                 :                : /*
                                221                 :                :  * Start as much of the current pending read as we can.  If we have to split it
                                222                 :                :  * because of the per-backend buffer limit, or the buffer manager decides to
                                223                 :                :  * split it, then the pending read is adjusted to hold the remaining portion.
                                224                 :                :  *
                                225                 :                :  * We can always start a read of at least size one if we have no progress yet.
                                226                 :                :  * Otherwise it's possible that we can't start a read at all because of a lack
                                227                 :                :  * of buffers, and then false is returned.  Buffer shortages also reduce the
                                228                 :                :  * distance to a level that prevents look-ahead until buffers are released.
                                229                 :                :  */
                                230                 :                : static bool
  365 tmunro@postgresql.or      231                 :CBC     1842097 : read_stream_start_pending_read(ReadStream *stream)
                                232                 :                : {
                                233                 :                :     bool        need_wait;
                                234                 :                :     int         requested_nblocks;
                                235                 :                :     int         nblocks;
                                236                 :                :     int         flags;
                                237                 :                :     int         forwarded;
                                238                 :                :     int16       io_index;
                                239                 :                :     int16       overflow;
                                240                 :                :     int16       buffer_index;
                                241                 :                :     int         buffer_limit;
                                242                 :                : 
                                243                 :                :     /* This should only be called with a pending read. */
  711                           244         [ -  + ]:        1842097 :     Assert(stream->pending_read_nblocks > 0);
  367                           245         [ -  + ]:        1842097 :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
                                246                 :                : 
                                247                 :                :     /* We had better not exceed the per-stream buffer limit with this read. */
  711                           248         [ -  + ]:        1842097 :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
                                249                 :                :            stream->max_pinned_buffers);
                                250                 :                : 
                                251                 :                : #ifdef USE_ASSERT_CHECKING
                                252                 :                :     /* We had better not be overwriting an existing pinned buffer. */
                                253         [ +  + ]:        1842097 :     if (stream->pinned_buffers > 0)
                                254         [ -  + ]:          13927 :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
                                255                 :                :     else
                                256         [ -  + ]:        1828170 :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
                                257                 :                : 
                                258                 :                :     /*
                                259                 :                :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
                                260                 :                :      * had to split the operation should match the leading blocks of this
                                261                 :                :      * following StartReadBuffers() call.
                                262                 :                :      */
  218                           263         [ -  + ]:        1842097 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                264         [ +  + ]:        1843768 :     for (int i = 0; i < stream->forwarded_buffers; ++i)
                                265         [ -  + ]:           1671 :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
                                266                 :                :                stream->pending_read_blocknum + i);
                                267                 :                : 
                                268                 :                :     /*
                                269                 :                :      * Check that we've cleared the queue/overflow entries corresponding to
                                270                 :                :      * the rest of the blocks covered by this read, unless it's the first go
                                271                 :                :      * around and we haven't even initialized them yet.
                                272                 :                :      */
                                273         [ +  + ]:        4188669 :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
                                274   [ +  +  -  + ]:        2346572 :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
                                275                 :                :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
                                276                 :                : #endif
                                277                 :                : 
                                278                 :                :     /* Do we need to issue read-ahead advice? */
  350 andres@anarazel.de        279                 :        1842097 :     flags = stream->read_buffers_flags;
  365 tmunro@postgresql.or      280         [ +  + ]:        1842097 :     if (stream->advice_enabled)
                                281                 :                :     {
                                282         [ +  + ]:           1640 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
                                283                 :                :         {
                                284                 :                :             /*
                                285                 :                :              * Sequential:  Issue advice until the preadv() calls have caught
                                286                 :                :              * up with the first advice issued for this sequential region, and
                                287                 :                :              * then stay out of the way of the kernel's own read-ahead.
                                288                 :                :              */
                                289         [ +  + ]:             23 :             if (stream->seq_until_processed != InvalidBlockNumber)
  350 andres@anarazel.de        290                 :              1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                291                 :                :         }
                                292                 :                :         else
                                293                 :                :         {
                                294                 :                :             /*
                                295                 :                :              * Random jump:  Note the starting location of a new potential
                                296                 :                :              * sequential region and start issuing advice.  Skip it this time
                                297                 :                :              * if the preadv() follows immediately, eg first block in stream.
                                298                 :                :              */
  365 tmunro@postgresql.or      299                 :           1617 :             stream->seq_until_processed = stream->pending_read_blocknum;
                                300         [ +  + ]:           1617 :             if (stream->pinned_buffers > 0)
  350 andres@anarazel.de        301                 :             25 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                302                 :                :         }
                                303                 :                :     }
                                304                 :                : 
                                305                 :                :     /*
                                306                 :                :      * How many more buffers is this backend allowed?
                                307                 :                :      *
                                308                 :                :      * Forwarded buffers are already pinned and map to the leading blocks of
                                309                 :                :      * the pending read (the remaining portion of an earlier short read that
                                310                 :                :      * we're about to continue).  They are not counted in pinned_buffers, but
                                311                 :                :      * they are counted as pins already held by this backend according to the
                                312                 :                :      * buffer manager, so they must be added to the limit it grants us.
                                313                 :                :      */
  366 tmunro@postgresql.or      314         [ +  + ]:        1842097 :     if (stream->temporary)
                                315         [ +  - ]:          12096 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
                                316                 :                :     else
                                317         [ +  - ]:        1830001 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
  359                           318         [ -  + ]:        1842097 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                319                 :                : 
                                320                 :        1842097 :     buffer_limit += stream->forwarded_buffers;
  342 andres@anarazel.de        321                 :        1842097 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
                                322                 :                : 
  366 tmunro@postgresql.or      323   [ +  +  +  - ]:        1842097 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
                                324                 :         671676 :         buffer_limit = 1;       /* guarantee progress */
                                325                 :                : 
                                326                 :                :     /* Does the per-backend limit affect this read? */
                                327                 :        1842097 :     nblocks = stream->pending_read_nblocks;
                                328         [ +  + ]:        1842097 :     if (buffer_limit < nblocks)
                                329                 :                :     {
                                330                 :                :         int16       new_distance;
                                331                 :                : 
                                332                 :                :         /* Shrink distance: no more look-ahead until buffers are released. */
                                333                 :           2533 :         new_distance = stream->pinned_buffers + buffer_limit;
                                334         [ +  + ]:           2533 :         if (stream->distance > new_distance)
                                335                 :           1743 :             stream->distance = new_distance;
                                336                 :                : 
                                337                 :                :         /* Unless we have nothing to give the consumer, stop here. */
                                338         [ +  + ]:           2533 :         if (stream->pinned_buffers > 0)
                                339                 :           1255 :             return false;
                                340                 :                : 
                                341                 :                :         /* A short read is required to make progress. */
                                342                 :           1278 :         nblocks = buffer_limit;
                                343                 :                :     }
                                344                 :                : 
                                345                 :                :     /*
                                346                 :                :      * We say how many blocks we want to read, but it may be smaller on return
                                347                 :                :      * if the buffer manager decides to shorten the read.  Initialize buffers
                                348                 :                :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
                                349                 :                :      * and keep the original nblocks number so we can check for forwarded
                                350                 :                :      * buffers as output, below.
                                351                 :                :      */
  711                           352                 :        1840842 :     buffer_index = stream->next_buffer_index;
                                353                 :        1840842 :     io_index = stream->next_io_index;
  359                           354         [ +  + ]:        3108527 :     while (stream->initialized_buffers < buffer_index + nblocks)
                                355                 :        1267685 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
                                356                 :        1840842 :     requested_nblocks = nblocks;
  711                           357                 :        1840842 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                358                 :        1840842 :                                  &stream->buffers[buffer_index],
                                359                 :                :                                  stream->pending_read_blocknum,
                                360                 :                :                                  &nblocks,
                                361                 :                :                                  flags);
                                362                 :        1840836 :     stream->pinned_buffers += nblocks;
                                363                 :                : 
                                364                 :                :     /* Remember whether we need to wait before returning this buffer. */
                                365         [ +  + ]:        1840836 :     if (!need_wait)
                                366                 :                :     {
                                367                 :                :         /* Look-ahead distance decays, no I/O necessary. */
                                368         [ +  + ]:        1228268 :         if (stream->distance > 1)
                                369                 :          18653 :             stream->distance--;
                                370                 :                :     }
                                371                 :                :     else
                                372                 :                :     {
                                373                 :                :         /*
                                374                 :                :          * Remember to call WaitReadBuffers() before returning head buffer.
                                375                 :                :          * Look-ahead distance will be adjusted after waiting.
                                376                 :                :          */
                                377                 :         612568 :         stream->ios[io_index].buffer_index = buffer_index;
                                378         [ +  + ]:         612568 :         if (++stream->next_io_index == stream->max_ios)
                                379                 :          25309 :             stream->next_io_index = 0;
                                380         [ -  + ]:         612568 :         Assert(stream->ios_in_progress < stream->max_ios);
                                381                 :         612568 :         stream->ios_in_progress++;
                                382                 :         612568 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
                                383                 :                :     }
                                384                 :                : 
                                385                 :                :     /*
                                386                 :                :      * How many pins were acquired but forwarded to the next call?  These need
                                387                 :                :      * to be passed to the next StartReadBuffers() call by leaving them
                                388                 :                :      * exactly where they are in the queue, or released if the stream ends
                                389                 :                :      * early.  We need the number for accounting purposes, since they are not
                                390                 :                :      * counted in stream->pinned_buffers but we already hold them.
                                391                 :                :      */
  359                           392                 :        1840836 :     forwarded = 0;
                                393         [ +  + ]:        1842508 :     while (nblocks + forwarded < requested_nblocks &&
                                394         [ +  + ]:          56464 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
                                395                 :           1672 :         forwarded++;
                                396                 :        1840836 :     stream->forwarded_buffers = forwarded;
                                397                 :                : 
                                398                 :                :     /*
                                399                 :                :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
                                400                 :                :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
                                401                 :                :      * the front of the array where they'll be consumed, but also leave a copy
                                402                 :                :      * in the overflow zone which the I/O operation has a pointer to (it needs
                                403                 :                :      * a contiguous array).  Both copies will be cleared when the buffers are
                                404                 :                :      * handed to the consumer.
                                405                 :                :      */
                                406                 :        1840836 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
  711                           407         [ +  + ]:        1840836 :     if (overflow > 0)
                                408                 :                :     {
  359                           409         [ -  + ]:            288 :         Assert(overflow < stream->queue_size);    /* can't overlap */
                                410                 :            288 :         memcpy(&stream->buffers[0],
                                411                 :            288 :                &stream->buffers[stream->queue_size],
                                412                 :                :                sizeof(stream->buffers[0]) * overflow);
                                413                 :                :     }
                                414                 :                : 
                                415                 :                :     /* Compute location of start of next read, without using % operator. */
  711                           416                 :        1840836 :     buffer_index += nblocks;
                                417         [ +  + ]:        1840836 :     if (buffer_index >= stream->queue_size)
                                418                 :         312629 :         buffer_index -= stream->queue_size;
                                419   [ +  -  -  + ]:        1840836 :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                420                 :        1840836 :     stream->next_buffer_index = buffer_index;
                                421                 :                : 
                                422                 :                :     /* Adjust the pending read to cover the remaining portion, if any. */
                                423                 :        1840836 :     stream->pending_read_blocknum += nblocks;
                                424                 :        1840836 :     stream->pending_read_nblocks -= nblocks;
                                425                 :                : 
  366                           426                 :        1840836 :     return true;
                                427                 :                : }
                                428                 :                : 
                                429                 :                : static void
  365                           430                 :        3568949 : read_stream_look_ahead(ReadStream *stream)
                                431                 :                : {
                                432                 :                :     /*
                                433                 :                :      * Allow amortizing the cost of submitting IO over multiple IOs. This
                                434                 :                :      * requires that we don't do any operations that could lead to a deadlock
                                435                 :                :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
                                436                 :                :      * careful.
                                437                 :                :      */
  350 andres@anarazel.de        438         [ +  + ]:        3568949 :     if (stream->batch_mode)
                                439                 :        3477951 :         pgaio_enter_batchmode();
                                440                 :                : 
  711 tmunro@postgresql.or      441         [ +  - ]:        5588170 :     while (stream->ios_in_progress < stream->max_ios &&
                                442         [ +  + ]:        5588170 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
                                443                 :                :     {
                                444                 :                :         BlockNumber blocknum;
                                445                 :                :         int16       buffer_index;
                                446                 :                :         void       *per_buffer_data;
                                447                 :                : 
  367                           448         [ +  + ]:        3473974 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
                                449                 :                :         {
  365                           450                 :           4285 :             read_stream_start_pending_read(stream);
  711                           451                 :           4285 :             continue;
                                452                 :                :         }
                                453                 :                : 
                                454                 :                :         /*
                                455                 :                :          * See which block the callback wants next in the stream.  We need to
                                456                 :                :          * compute the index of the Nth block of the pending read including
                                457                 :                :          * wrap-around, but we don't want to use the expensive % operator.
                                458                 :                :          */
                                459                 :        3469689 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
                                460         [ +  + ]:        3469689 :         if (buffer_index >= stream->queue_size)
                                461                 :           1698 :             buffer_index -= stream->queue_size;
                                462   [ +  -  -  + ]:        3469689 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                463                 :        3469689 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
                                464                 :        3469689 :         blocknum = read_stream_get_block(stream, per_buffer_data);
                                465         [ +  + ]:        3469689 :         if (blocknum == InvalidBlockNumber)
                                466                 :                :         {
                                467                 :                :             /* End of stream. */
                                468                 :        1454753 :             stream->distance = 0;
                                469                 :        1454753 :             break;
                                470                 :                :         }
                                471                 :                : 
                                472                 :                :         /* Can we merge it with the pending read? */
                                473         [ +  + ]:        2014936 :         if (stream->pending_read_nblocks > 0 &&
                                474         [ +  + ]:         230577 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
                                475                 :                :         {
                                476                 :         230542 :             stream->pending_read_nblocks++;
                                477                 :         230542 :             continue;
                                478                 :                :         }
                                479                 :                : 
                                480                 :                :         /* We have to start the pending read before we can build another. */
  707                           481         [ +  + ]:        1784432 :         while (stream->pending_read_nblocks > 0)
                                482                 :                :         {
  365                           483         [ +  - ]:             38 :             if (!read_stream_start_pending_read(stream) ||
  366                           484         [ -  + ]:             38 :                 stream->ios_in_progress == stream->max_ios)
                                485                 :                :             {
                                486                 :                :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
  711 tmunro@postgresql.or      487                 :UBC           0 :                 read_stream_unget_block(stream, blocknum);
  350 andres@anarazel.de        488         [ #  # ]:              0 :                 if (stream->batch_mode)
                                489                 :              0 :                     pgaio_exit_batchmode();
  711 tmunro@postgresql.or      490                 :              0 :                 return;
                                491                 :                :             }
                                492                 :                :         }
                                493                 :                : 
                                494                 :                :         /* This is the start of a new pending read. */
  711 tmunro@postgresql.or      495                 :CBC     1784394 :         stream->pending_read_blocknum = blocknum;
                                496                 :        1784394 :         stream->pending_read_nblocks = 1;
                                497                 :                :     }
                                498                 :                : 
                                499                 :                :     /*
                                500                 :                :      * We don't start the pending read just because we've hit the distance
                                501                 :                :      * limit, preferring to give it another chance to grow to full
                                502                 :                :      * io_combine_limit size once more buffers have been consumed.  However,
                                503                 :                :      * if we've already reached io_combine_limit, or we've reached the
                                504                 :                :      * distance limit and there isn't anything pinned yet, or the callback has
                                505                 :                :      * signaled end-of-stream, we start the read immediately.  Note that the
                                506                 :                :      * pending read can exceed the distance goal, if the latter was reduced
                                507                 :                :      * after hitting the per-backend buffer limit.
                                508                 :                :      */
                                509         [ +  + ]:        3568949 :     if (stream->pending_read_nblocks > 0 &&
  367                           510         [ +  + ]:        1879328 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
  366                           511         [ +  + ]:        1870796 :          (stream->pending_read_nblocks >= stream->distance &&
  711                           512         [ +  + ]:        1829242 :           stream->pinned_buffers == 0) ||
                                513         [ +  + ]:          47791 :          stream->distance == 0) &&
                                514         [ +  - ]:        1837774 :         stream->ios_in_progress < stream->max_ios)
  365                           515                 :        1837774 :         read_stream_start_pending_read(stream);
                                516                 :                : 
                                517                 :                :     /*
                                518                 :                :      * There should always be something pinned when we leave this function,
                                519                 :                :      * whether started by this call or not, unless we've hit the end of the
                                520                 :                :      * stream.  In the worst case we can always make progress one buffer at a
                                521                 :                :      * time.
                                522                 :                :      */
  366                           523   [ +  +  -  + ]:        3568943 :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
                                524                 :                : 
  350 andres@anarazel.de        525         [ +  + ]:        3568943 :     if (stream->batch_mode)
                                526                 :        3477945 :         pgaio_exit_batchmode();
                                527                 :                : }
                                528                 :                : 
                                529                 :                : /*
                                530                 :                :  * Create a new read stream object that can be used to perform the equivalent
                                531                 :                :  * of a series of ReadBuffer() calls for one fork of one relation.
                                532                 :                :  * Internally, it generates larger vectored reads where possible by looking
                                533                 :                :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
                                534                 :                :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
                                535                 :                :  * write extra data for each block into the space provided to it.  It will
                                536                 :                :  * also receive callback_private_data for its own purposes.
                                537                 :                :  */
                                538                 :                : static ReadStream *
  603 noah@leadboat.com         539                 :         779068 : read_stream_begin_impl(int flags,
                                540                 :                :                        BufferAccessStrategy strategy,
                                541                 :                :                        Relation rel,
                                542                 :                :                        SMgrRelation smgr,
                                543                 :                :                        char persistence,
                                544                 :                :                        ForkNumber forknum,
                                545                 :                :                        ReadStreamBlockNumberCB callback,
                                546                 :                :                        void *callback_private_data,
                                547                 :                :                        size_t per_buffer_data_size)
                                548                 :                : {
                                549                 :                :     ReadStream *stream;
                                550                 :                :     size_t      size;
                                551                 :                :     int16       queue_size;
                                552                 :                :     int16       queue_overflow;
                                553                 :                :     int         max_ios;
                                554                 :                :     int         strategy_pin_limit;
                                555                 :                :     uint32      max_pinned_buffers;
                                556                 :                :     uint32      max_possible_buffer_limit;
                                557                 :                :     Oid         tablespace_id;
                                558                 :                : 
                                559                 :                :     /*
                                560                 :                :      * Decide how many I/Os we will allow to run at the same time.  That
                                561                 :                :      * currently means advice to the kernel to tell it that we will soon read.
                                562                 :                :      * This number also affects how far we look ahead for opportunities to
                                563                 :                :      * start more I/Os.
                                564                 :                :      */
  711 tmunro@postgresql.or      565                 :         779068 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
                                566   [ +  +  +  + ]:         779068 :     if (!OidIsValid(MyDatabaseId) ||
  603 noah@leadboat.com         567   [ +  +  +  + ]:        1159646 :         (rel && IsCatalogRelation(rel)) ||
  711 tmunro@postgresql.or      568                 :         452982 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
                                569                 :                :     {
                                570                 :                :         /*
                                571                 :                :          * Avoid circularity while trying to look up tablespace settings or
                                572                 :                :          * before spccache.c is ready.
                                573                 :                :          */
                                574                 :         385533 :         max_ios = effective_io_concurrency;
                                575                 :                :     }
                                576         [ +  + ]:         393535 :     else if (flags & READ_STREAM_MAINTENANCE)
                                577                 :           6793 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
                                578                 :                :     else
                                579                 :         386742 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
                                580                 :                : 
                                581                 :                :     /* Cap to INT16_MAX to avoid overflowing below */
                                582                 :         779068 :     max_ios = Min(max_ios, PG_INT16_MAX);
                                583                 :                : 
                                584                 :                :     /*
                                585                 :                :      * If starting a multi-block I/O near the end of the queue, we might
                                586                 :                :      * temporarily need extra space for overflowing buffers before they are
                                587                 :                :      * moved to regular circular position.  This is the maximum extra space we
                                588                 :                :      * could need.
                                589                 :                :      */
  367                           590                 :         779068 :     queue_overflow = io_combine_limit - 1;
                                591                 :                : 
                                592                 :                :     /*
                                593                 :                :      * Choose the maximum number of buffers we're prepared to pin.  We try to
                                594                 :                :      * pin fewer if we can, though.  We add one so that we can make progress
                                595                 :                :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
                                596                 :                :      * this also allows an extra full I/O's worth of buffers: after an I/O
                                597                 :                :      * finishes we don't want to have to wait for its buffers to be consumed
                                598                 :                :      * before starting a new one.
                                599                 :                :      *
                                600                 :                :      * Be careful not to allow int16 to overflow.  That is possible with the
                                601                 :                :      * current GUC range limits, so this is an artificial limit of ~32k
                                602                 :                :      * buffers and we'd need to adjust the types to exceed that.  We also have
                                603                 :                :      * to allow for the spare entry and the overflow space.
                                604                 :                :      */
  381                           605                 :         779068 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
  711                           606                 :         779068 :     max_pinned_buffers = Min(max_pinned_buffers,
                                607                 :                :                              PG_INT16_MAX - queue_overflow - 1);
                                608                 :                : 
                                609                 :                :     /* Give the strategy a chance to limit the number of buffers we pin. */
  708                           610                 :         779068 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
                                611                 :         779068 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
                                612                 :                : 
                                613                 :                :     /*
                                614                 :                :      * Also limit our queue to the maximum number of pins we could ever be
                                615                 :                :      * allowed to acquire according to the buffer manager.  We may not really
                                616                 :                :      * be able to use them all due to other pins held by this backend, but
                                617                 :                :      * we'll check that later in read_stream_start_pending_read().
                                618                 :                :      */
  711                           619         [ +  + ]:         779068 :     if (SmgrIsTemp(smgr))
  366                           620                 :           6970 :         max_possible_buffer_limit = GetLocalPinLimit();
                                621                 :                :     else
                                622                 :         772098 :         max_possible_buffer_limit = GetPinLimit();
                                623                 :         779068 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
                                624                 :                : 
                                625                 :                :     /*
                                626                 :                :      * The limit might be zero on a system configured with too few buffers for
                                627                 :                :      * the number of connections.  We need at least one to make progress.
                                628                 :                :      */
                                629         [ +  + ]:         779068 :     max_pinned_buffers = Max(1, max_pinned_buffers);
                                630                 :                : 
                                631                 :                :     /*
                                632                 :                :      * We need one extra entry for buffers and per-buffer data, because users
                                633                 :                :      * of per-buffer data have access to the object until the next call to
                                634                 :                :      * read_stream_next_buffer(), so we need a gap between the head and tail
                                635                 :                :      * of the queue so that we don't clobber it.
                                636                 :                :      */
  711                           637                 :         779068 :     queue_size = max_pinned_buffers + 1;
                                638                 :                : 
                                639                 :                :     /*
                                640                 :                :      * Allocate the object, the buffers, the ios and per_buffer_data space in
                                641                 :                :      * one big chunk.  Though we have queue_size buffers, we want to be able
                                642                 :                :      * to assume that all the buffers for a single read are contiguous (i.e.
                                643                 :                :      * don't wrap around halfway through), so we allow temporary overflows of
                                644                 :                :      * up to the maximum possible overflow size.
                                645                 :                :      */
                                646                 :         779068 :     size = offsetof(ReadStream, buffers);
  367                           647                 :         779068 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
  711                           648         [ +  + ]:         779068 :     size += sizeof(InProgressIO) * Max(1, max_ios);
                                649                 :         779068 :     size += per_buffer_data_size * queue_size;
                                650                 :         779068 :     size += MAXIMUM_ALIGNOF * 2;
                                651                 :         779068 :     stream = (ReadStream *) palloc(size);
                                652                 :         779068 :     memset(stream, 0, offsetof(ReadStream, buffers));
                                653                 :         779068 :     stream->ios = (InProgressIO *)
  367                           654                 :         779068 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
  711                           655         [ +  + ]:         779068 :     if (per_buffer_data_size > 0)
                                656                 :          25155 :         stream->per_buffer_data = (void *)
                                657         [ +  + ]:          25155 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
                                658                 :                : 
  350 andres@anarazel.de        659                 :         779068 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
                                660                 :         779068 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
                                661                 :                : 
                                662                 :                : #ifdef USE_PREFETCH
                                663                 :                : 
                                664                 :                :     /*
                                665                 :                :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
                                666                 :                :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
                                667                 :                :      * caller hasn't promised sequential access (overriding our detection
                                668                 :                :      * heuristics), and max_ios hasn't been set to zero.
                                669                 :                :      */
                                670         [ +  + ]:         779068 :     if (stream->sync_mode &&
                                671         [ +  - ]:           2886 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
  711 tmunro@postgresql.or      672   [ +  +  +  - ]:           2886 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
                                673                 :                :         max_ios > 0)
                                674                 :            692 :         stream->advice_enabled = true;
                                675                 :                : #endif
                                676                 :                : 
                                677                 :                :     /*
                                678                 :                :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
                                679                 :                :      * we still need to allocate space to combine and run one I/O.  Bump it up
                                680                 :                :      * to one, and remember to ask for synchronous I/O only.
                                681                 :                :      */
                                682         [ +  + ]:         779068 :     if (max_ios == 0)
                                683                 :                :     {
                                684                 :              7 :         max_ios = 1;
  350 andres@anarazel.de        685                 :              7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
                                686                 :                :     }
                                687                 :                : 
                                688                 :                :     /*
                                689                 :                :      * Capture stable values for these two GUC-derived numbers for the
                                690                 :                :      * lifetime of this stream, so we don't have to worry about the GUCs
                                691                 :                :      * changing underneath us beyond this point.
                                692                 :                :      */
  711 tmunro@postgresql.or      693                 :         779068 :     stream->max_ios = max_ios;
  367                           694                 :         779068 :     stream->io_combine_limit = io_combine_limit;
                                695                 :                : 
  711                           696                 :         779068 :     stream->per_buffer_data_size = per_buffer_data_size;
                                697                 :         779068 :     stream->max_pinned_buffers = max_pinned_buffers;
                                698                 :         779068 :     stream->queue_size = queue_size;
                                699                 :         779068 :     stream->callback = callback;
                                700                 :         779068 :     stream->callback_private_data = callback_private_data;
  561                           701                 :         779068 :     stream->buffered_blocknum = InvalidBlockNumber;
  365                           702                 :         779068 :     stream->seq_blocknum = InvalidBlockNumber;
                                703                 :         779068 :     stream->seq_until_processed = InvalidBlockNumber;
  366                           704                 :         779068 :     stream->temporary = SmgrIsTemp(smgr);
                                705                 :                : 
                                706                 :                :     /*
                                707                 :                :      * Skip the initial ramp-up phase if the caller says we're going to be
                                708                 :                :      * reading the whole relation.  This way we start out assuming we'll be
                                709                 :                :      * doing full io_combine_limit sized reads.
                                710                 :                :      */
  711                           711         [ +  + ]:         779068 :     if (flags & READ_STREAM_FULL)
  367                           712                 :          70037 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                713                 :                :     else
  711                           714                 :         709031 :         stream->distance = 1;
   12 melanieplageman@gmai      715                 :GNC      779068 :     stream->resume_distance = stream->distance;
                                716                 :                : 
                                717                 :                :     /*
                                718                 :                :      * Since we always access the same relation, we can initialize parts of
                                719                 :                :      * the ReadBuffersOperation objects and leave them that way, to avoid
                                720                 :                :      * wasting CPU cycles writing to them for each read.
                                721                 :                :      */
  711 tmunro@postgresql.or      722         [ +  + ]:CBC    13265607 :     for (int i = 0; i < max_ios; ++i)
                                723                 :                :     {
                                724                 :       12486539 :         stream->ios[i].op.rel = rel;
  603 noah@leadboat.com         725                 :       12486539 :         stream->ios[i].op.smgr = smgr;
                                726                 :       12486539 :         stream->ios[i].op.persistence = persistence;
  711 tmunro@postgresql.or      727                 :       12486539 :         stream->ios[i].op.forknum = forknum;
                                728                 :       12486539 :         stream->ios[i].op.strategy = strategy;
                                729                 :                :     }
                                730                 :                : 
                                731                 :         779068 :     return stream;
                                732                 :                : }
                                733                 :                : 
                                734                 :                : /*
                                735                 :                :  * Create a new read stream for reading a relation.
                                736                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                737                 :                :  */
                                738                 :                : ReadStream *
  603 noah@leadboat.com         739                 :         715322 : read_stream_begin_relation(int flags,
                                740                 :                :                            BufferAccessStrategy strategy,
                                741                 :                :                            Relation rel,
                                742                 :                :                            ForkNumber forknum,
                                743                 :                :                            ReadStreamBlockNumberCB callback,
                                744                 :                :                            void *callback_private_data,
                                745                 :                :                            size_t per_buffer_data_size)
                                746                 :                : {
                                747                 :         715322 :     return read_stream_begin_impl(flags,
                                748                 :                :                                   strategy,
                                749                 :                :                                   rel,
                                750                 :                :                                   RelationGetSmgr(rel),
                                751                 :         715322 :                                   rel->rd_rel->relpersistence,
                                752                 :                :                                   forknum,
                                753                 :                :                                   callback,
                                754                 :                :                                   callback_private_data,
                                755                 :                :                                   per_buffer_data_size);
                                756                 :                : }
                                757                 :                : 
                                758                 :                : /*
                                759                 :                :  * Create a new read stream for reading a SMgr relation.
                                760                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                761                 :                :  */
                                762                 :                : ReadStream *
                                763                 :          63746 : read_stream_begin_smgr_relation(int flags,
                                764                 :                :                                 BufferAccessStrategy strategy,
                                765                 :                :                                 SMgrRelation smgr,
                                766                 :                :                                 char smgr_persistence,
                                767                 :                :                                 ForkNumber forknum,
                                768                 :                :                                 ReadStreamBlockNumberCB callback,
                                769                 :                :                                 void *callback_private_data,
                                770                 :                :                                 size_t per_buffer_data_size)
                                771                 :                : {
                                772                 :          63746 :     return read_stream_begin_impl(flags,
                                773                 :                :                                   strategy,
                                774                 :                :                                   NULL,
                                775                 :                :                                   smgr,
                                776                 :                :                                   smgr_persistence,
                                777                 :                :                                   forknum,
                                778                 :                :                                   callback,
                                779                 :                :                                   callback_private_data,
                                780                 :                :                                   per_buffer_data_size);
                                781                 :                : }
                                782                 :                : 
                                783                 :                : /*
                                784                 :                :  * Pull one pinned buffer out of a stream.  Each call returns successive
                                785                 :                :  * blocks in the order specified by the callback.  If per_buffer_data_size was
                                786                 :                :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
                                787                 :                :  * per-buffer data that the callback had a chance to populate, which remains
                                788                 :                :  * valid until the next call to read_stream_next_buffer().  When the stream
                                789                 :                :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
                                790                 :                :  * the stream early at any time by calling read_stream_end().
                                791                 :                :  */
                                792                 :                : Buffer
  711 tmunro@postgresql.or      793                 :        6773670 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                                794                 :                : {
                                795                 :                :     Buffer      buffer;
                                796                 :                :     int16       oldest_buffer_index;
                                797                 :                : 
                                798                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                799                 :                : 
                                800                 :                :     /*
                                801                 :                :      * A fast path for all-cached scans.  This is the same as the usual
                                802                 :                :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
                                803                 :                :      * we can skip the queue management code, stay in the same buffer slot and
                                804                 :                :      * use singular StartReadBuffer().
                                805                 :                :      */
                                806         [ +  + ]:        6773670 :     if (likely(stream->fast_path))
                                807                 :                :     {
                                808                 :                :         BlockNumber next_blocknum;
                                809                 :                : 
                                810                 :                :         /* Fast path assumptions. */
                                811         [ -  + ]:        2028048 :         Assert(stream->ios_in_progress == 0);
  359                           812         [ -  + ]:        2028048 :         Assert(stream->forwarded_buffers == 0);
  711                           813         [ -  + ]:        2028048 :         Assert(stream->pinned_buffers == 1);
                                814         [ -  + ]:        2028048 :         Assert(stream->distance == 1);
  708                           815         [ -  + ]:        2028048 :         Assert(stream->pending_read_nblocks == 0);
  711                           816         [ -  + ]:        2028048 :         Assert(stream->per_buffer_data_size == 0);
  359                           817         [ -  + ]:        2028048 :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
                                818                 :                : 
                                819                 :                :         /* We're going to return the buffer we pinned last time. */
  711                           820                 :        2028048 :         oldest_buffer_index = stream->oldest_buffer_index;
                                821         [ -  + ]:        2028048 :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
                                822                 :                :                stream->next_buffer_index);
                                823                 :        2028048 :         buffer = stream->buffers[oldest_buffer_index];
                                824         [ -  + ]:        2028048 :         Assert(buffer != InvalidBuffer);
                                825                 :                : 
                                826                 :                :         /* Choose the next block to pin. */
  561                           827                 :        2028048 :         next_blocknum = read_stream_get_block(stream, NULL);
                                828                 :                : 
  708                           829         [ +  + ]:        2028048 :         if (likely(next_blocknum != InvalidBlockNumber))
                                830                 :                :         {
  350 andres@anarazel.de        831                 :        1941764 :             int         flags = stream->read_buffers_flags;
                                832                 :                : 
                                833         [ +  + ]:        1941764 :             if (stream->advice_enabled)
                                834                 :            545 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                835                 :                : 
                                836                 :                :             /*
                                837                 :                :              * Pin a buffer for the next call.  Same buffer entry, and
                                838                 :                :              * arbitrary I/O entry (they're all free).  We don't have to
                                839                 :                :              * adjust pinned_buffers because we're transferring one to caller
                                840                 :                :              * but pinning one more.
                                841                 :                :              *
                                842                 :                :              * In the fast path we don't need to check the pin limit.  We're
                                843                 :                :              * always allowed at least one pin so that progress can be made,
                                844                 :                :              * and that's all we need here.  Although two pins are momentarily
                                845                 :                :              * held at the same time, the model used here is that the stream
                                846                 :                :              * holds only one, and the other now belongs to the caller.
                                847                 :                :              */
  708 tmunro@postgresql.or      848         [ +  + ]:        1941764 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
                                849                 :                :                                         &stream->buffers[oldest_buffer_index],
                                850                 :                :                                         next_blocknum,
                                851                 :                :                                         flags)))
                                852                 :                :             {
                                853                 :                :                 /* Fast return. */
                                854                 :        1926313 :                 return buffer;
                                855                 :                :             }
                                856                 :                : 
                                857                 :                :             /* Next call must wait for I/O for the newly pinned buffer. */
  711                           858                 :          15451 :             stream->oldest_io_index = 0;
                                859                 :          15451 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
                                860                 :          15451 :             stream->ios_in_progress = 1;
                                861                 :          15451 :             stream->ios[0].buffer_index = oldest_buffer_index;
                                862                 :          15451 :             stream->seq_blocknum = next_blocknum + 1;
                                863                 :                :         }
                                864                 :                :         else
                                865                 :                :         {
                                866                 :                :             /* No more blocks, end of stream. */
  708                           867                 :          86284 :             stream->distance = 0;
                                868                 :          86284 :             stream->oldest_buffer_index = stream->next_buffer_index;
                                869                 :          86284 :             stream->pinned_buffers = 0;
  359                           870                 :          86284 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
                                871                 :                :         }
                                872                 :                : 
  708                           873                 :         101735 :         stream->fast_path = false;
  711                           874                 :         101735 :         return buffer;
                                875                 :                :     }
                                876                 :                : #endif
                                877                 :                : 
                                878         [ +  + ]:        4745622 :     if (unlikely(stream->pinned_buffers == 0))
                                879                 :                :     {
                                880         [ -  + ]:        3698507 :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
                                881                 :                : 
                                882                 :                :         /* End of stream reached?  */
                                883         [ +  + ]:        3698507 :         if (stream->distance == 0)
                                884                 :        2057175 :             return InvalidBuffer;
                                885                 :                : 
                                886                 :                :         /*
                                887                 :                :          * The usual order of operations is that we look ahead at the bottom
                                888                 :                :          * of this function after potentially finishing an I/O and making
                                889                 :                :          * space for more, but if we're just starting up we'll need to crank
                                890                 :                :          * the handle to get started.
                                891                 :                :          */
  365                           892                 :        1641332 :         read_stream_look_ahead(stream);
                                893                 :                : 
                                894                 :                :         /* End of stream reached? */
  711                           895         [ +  + ]:        1641332 :         if (stream->pinned_buffers == 0)
                                896                 :                :         {
                                897         [ -  + ]:         760798 :             Assert(stream->distance == 0);
                                898                 :         760798 :             return InvalidBuffer;
                                899                 :                :         }
                                900                 :                :     }
                                901                 :                : 
                                902                 :                :     /* Grab the oldest pinned buffer and associated per-buffer data. */
                                903         [ -  + ]:        1927649 :     Assert(stream->pinned_buffers > 0);
                                904                 :        1927649 :     oldest_buffer_index = stream->oldest_buffer_index;
                                905   [ +  -  -  + ]:        1927649 :     Assert(oldest_buffer_index >= 0 &&
                                906                 :                :            oldest_buffer_index < stream->queue_size);
                                907                 :        1927649 :     buffer = stream->buffers[oldest_buffer_index];
                                908         [ +  + ]:        1927649 :     if (per_buffer_data)
                                909                 :         295361 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
                                910                 :                : 
                                911         [ -  + ]:        1927649 :     Assert(BufferIsValid(buffer));
                                912                 :                : 
                                913                 :                :     /* Do we have to wait for an associated I/O first? */
                                914         [ +  + ]:        1927649 :     if (stream->ios_in_progress > 0 &&
                                915         [ +  + ]:         718016 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
                                916                 :                :     {
                                917                 :         627719 :         int16       io_index = stream->oldest_io_index;
                                918                 :                :         int32       distance;   /* wider temporary value, clamped below */
                                919                 :                : 
                                920                 :                :         /* Sanity check that we still agree on the buffers. */
                                921         [ -  + ]:         627719 :         Assert(stream->ios[io_index].op.buffers ==
                                922                 :                :                &stream->buffers[oldest_buffer_index]);
                                923                 :                : 
                                924                 :         627719 :         WaitReadBuffers(&stream->ios[io_index].op);
                                925                 :                : 
                                926         [ -  + ]:         627687 :         Assert(stream->ios_in_progress > 0);
                                927                 :         627687 :         stream->ios_in_progress--;
                                928         [ +  + ]:         627687 :         if (++stream->oldest_io_index == stream->max_ios)
                                929                 :          25309 :             stream->oldest_io_index = 0;
                                930                 :                : 
                                931                 :                :         /* Look-ahead distance ramps up rapidly after we do I/O. */
  364                           932                 :         627687 :         distance = stream->distance * 2;
                                933                 :         627687 :         distance = Min(distance, stream->max_pinned_buffers);
                                934                 :         627687 :         stream->distance = distance;
                                935                 :                : 
                                936                 :                :         /*
                                937                 :                :          * If we've reached the first block of a sequential region we're
                                938                 :                :          * issuing advice for, cancel that until the next jump.  The kernel
                                939                 :                :          * will see the sequential preadv() pattern starting here.
                                940                 :                :          */
  365                           941         [ +  + ]:         627687 :         if (stream->advice_enabled &&
                                942         [ +  + ]:            274 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
                                943                 :            252 :             stream->seq_until_processed = InvalidBlockNumber;
                                944                 :                :     }
                                945                 :                : 
                                946                 :                :     /*
                                947                 :                :      * We must zap this queue entry, or else it would appear as a forwarded
                                948                 :                :      * buffer.  If it's potentially in the overflow zone (ie from a
                                949                 :                :      * multi-block I/O that wrapped around the queue), also zap the copy.
                                950                 :                :      */
  711                           951                 :        1927617 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
  359                           952         [ +  + ]:        1927617 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
                                953                 :        1584864 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
                                954                 :                :             InvalidBuffer;
                                955                 :                : 
                                956                 :                : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
                                957                 :                : 
                                958                 :                :     /*
                                959                 :                :      * The caller will get access to the per-buffer data, until the next call.
                                960                 :                :      * We wipe the one before, which is never occupied because queue_size
                                961                 :                :      * allowed one extra element.  This will hopefully trip up client code
                                962                 :                :      * that is holding a dangling pointer to it.
                                963                 :                :      */
  711                           964         [ +  + ]:        1927617 :     if (stream->per_buffer_data)
                                965                 :                :     {
                                966                 :                :         void       *per_buffer_data;
                                967                 :                : 
  393                           968         [ +  + ]:         590750 :         per_buffer_data = get_per_buffer_data(stream,
                                969                 :                :                                               oldest_buffer_index == 0 ?
                                970                 :          74249 :                                               stream->queue_size - 1 :
                                971                 :         221126 :                                               oldest_buffer_index - 1);
                                972                 :                : 
                                973                 :                : #if defined(CLOBBER_FREED_MEMORY)
                                974                 :                :         /* This also tells Valgrind the memory is "noaccess". */
                                975                 :         295375 :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
                                976                 :                : #elif defined(USE_VALGRIND)
                                977                 :                :         /* Tell it ourselves. */
                                978                 :                :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
                                979                 :                :                                    stream->per_buffer_data_size);
                                980                 :                : #endif
                                981                 :                :     }
                                982                 :                : #endif
                                983                 :                : 
                                984                 :                :     /* Pin transferred to caller. */
  711                           985         [ -  + ]:        1927617 :     Assert(stream->pinned_buffers > 0);
                                986                 :        1927617 :     stream->pinned_buffers--;
                                987                 :                : 
                                988                 :                :     /* Advance oldest buffer, with wrap-around. */
                                989                 :        1927617 :     stream->oldest_buffer_index++;
                                990         [ +  + ]:        1927617 :     if (stream->oldest_buffer_index == stream->queue_size)
                                991                 :         304780 :         stream->oldest_buffer_index = 0;
                                992                 :                : 
                                993                 :                :     /* Prepare for the next call. */
  365                           994                 :        1927617 :     read_stream_look_ahead(stream);
                                995                 :                : 
                                996                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                997                 :                :     /* See if we can take the fast path for all-cached scans next time. */
  711                           998         [ +  + ]:        1927611 :     if (stream->ios_in_progress == 0 &&
  359                           999         [ +  + ]:        1300627 :         stream->forwarded_buffers == 0 &&
  711                          1000         [ +  + ]:        1300225 :         stream->pinned_buffers == 1 &&
                               1001         [ +  + ]:         445773 :         stream->distance == 1 &&
  708                          1002         [ +  + ]:         372360 :         stream->pending_read_nblocks == 0 &&
  711                          1003         [ +  + ]:         370912 :         stream->per_buffer_data_size == 0)
                               1004                 :                :     {
                               1005                 :                :         /*
                               1006                 :                :          * The fast path spins on one buffer entry repeatedly instead of
                               1007                 :                :          * rotating through the whole queue and clearing the entries behind
                               1008                 :                :          * it.  If the buffer it starts with happened to be forwarded between
                               1009                 :                :          * StartReadBuffers() calls and also wrapped around the circular queue
                               1010                 :                :          * partway through, then a copy also exists in the overflow zone, and
                               1011                 :                :          * it won't clear it out as the regular path would.  Do that now, so
                               1012                 :                :          * it doesn't need code for that.
                               1013                 :                :          */
  218                          1014         [ +  + ]:         195510 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
                               1015                 :         194010 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
                               1016                 :                :                 InvalidBuffer;
                               1017                 :                : 
  711                          1018                 :         195510 :         stream->fast_path = true;
                               1019                 :                :     }
                               1020                 :                : #endif
                               1021                 :                : 
                               1022                 :        1927611 :     return buffer;
                               1023                 :                : }
                               1024                 :                : 
                               1025                 :                : /*
                               1026                 :                :  * Transitional support for code that would like to perform or skip reads
                               1027                 :                :  * itself, without using the stream.  Returns, and consumes, the next block
                               1028                 :                :  * number that would be read by the stream's look-ahead algorithm, or
                               1029                 :                :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
                               1030                 :                :  * strategy that would be used to read it.
                               1031                 :                :  */
                               1032                 :                : BlockNumber
  543 tmunro@postgresql.or     1033                 :UBC           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
                               1034                 :                : {
                               1035                 :              0 :     *strategy = stream->ios[0].op.strategy;
                               1036                 :              0 :     return read_stream_get_block(stream, NULL);
                               1037                 :                : }
                               1038                 :                : 
                               1039                 :                : /*
                               1040                 :                :  * Temporarily stop consuming block numbers from the block number callback.
                               1041                 :                :  * If called inside the block number callback, its return value should be
                               1042                 :                :  * returned by the callback.
                               1043                 :                :  */
                               1044                 :                : BlockNumber
   12 melanieplageman@gmai     1045                 :UNC           0 : read_stream_pause(ReadStream *stream)
                               1046                 :                : {
                               1047                 :              0 :     stream->resume_distance = stream->distance;
                               1048                 :              0 :     stream->distance = 0;
                               1049                 :              0 :     return InvalidBlockNumber;
                               1050                 :                : }
                               1051                 :                : 
                               1052                 :                : /*
                               1053                 :                :  * Resume looking ahead after the block number callback reported
                               1054                 :                :  * end-of-stream. This is useful for streams of self-referential blocks, after
                               1055                 :                :  * a buffer needed to be consumed and examined to find more block numbers.
                               1056                 :                :  */
                               1057                 :                : void
                               1058                 :              0 : read_stream_resume(ReadStream *stream)
                               1059                 :                : {
                               1060                 :              0 :     stream->distance = stream->resume_distance;
                               1061                 :              0 : }
                               1062                 :                : 
                               1063                 :                : /*
                               1064                 :                :  * Reset a read stream by releasing any queued up buffers, allowing the stream
                               1065                 :                :  * to be used again for different blocks.  This can be used to clear an
                               1066                 :                :  * end-of-stream condition and start again, or to throw away blocks that were
                               1067                 :                :  * speculatively read and read some different blocks instead.
                               1068                 :                :  */
                               1069                 :                : void
  711 tmunro@postgresql.or     1070                 :CBC     1641333 : read_stream_reset(ReadStream *stream)
                               1071                 :                : {
                               1072                 :                :     int16       index;
                               1073                 :                :     Buffer      buffer;
                               1074                 :                : 
                               1075                 :                :     /* Stop looking ahead. */
                               1076                 :        1641333 :     stream->distance = 0;
                               1077                 :                : 
                               1078                 :                :     /* Forget buffered block number and fast path state. */
  561                          1079                 :        1641333 :     stream->buffered_blocknum = InvalidBlockNumber;
  708                          1080                 :        1641333 :     stream->fast_path = false;
                               1081                 :                : 
                               1082                 :                :     /* Unpin anything that wasn't consumed. */
  711                          1083         [ +  + ]:        1763803 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
                               1084                 :         122470 :         ReleaseBuffer(buffer);
                               1085                 :                : 
                               1086                 :                :     /* Unpin any unused forwarded buffers. */
  359                          1087                 :        1641333 :     index = stream->next_buffer_index;
                               1088         [ +  + ]:        1641333 :     while (index < stream->initialized_buffers &&
                               1089         [ -  + ]:         214664 :            (buffer = stream->buffers[index]) != InvalidBuffer)
                               1090                 :                :     {
  359 tmunro@postgresql.or     1091         [ #  # ]:UBC           0 :         Assert(stream->forwarded_buffers > 0);
                               1092                 :              0 :         stream->forwarded_buffers--;
                               1093                 :              0 :         ReleaseBuffer(buffer);
                               1094                 :                : 
                               1095                 :              0 :         stream->buffers[index] = InvalidBuffer;
                               1096         [ #  # ]:              0 :         if (index < stream->io_combine_limit - 1)
                               1097                 :              0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
                               1098                 :                : 
                               1099         [ #  # ]:              0 :         if (++index == stream->queue_size)
                               1100                 :              0 :             index = 0;
                               1101                 :                :     }
                               1102                 :                : 
  359 tmunro@postgresql.or     1103         [ -  + ]:CBC     1641333 :     Assert(stream->forwarded_buffers == 0);
  711                          1104         [ -  + ]:        1641333 :     Assert(stream->pinned_buffers == 0);
                               1105         [ -  + ]:        1641333 :     Assert(stream->ios_in_progress == 0);
                               1106                 :                : 
                               1107                 :                :     /* Start off assuming data is cached. */
                               1108                 :        1641333 :     stream->distance = 1;
   12 melanieplageman@gmai     1109                 :GNC     1641333 :     stream->resume_distance = stream->distance;
  711 tmunro@postgresql.or     1110                 :CBC     1641333 : }
                               1111                 :                : 
                               1112                 :                : /*
                               1113                 :                :  * Release and free a read stream.
                               1114                 :                :  */
                               1115                 :                : void
                               1116                 :         776648 : read_stream_end(ReadStream *stream)
                               1117                 :                : {
                               1118                 :         776648 :     read_stream_reset(stream);
                               1119                 :         776648 :     pfree(stream);
                               1120                 :         776648 : }
        

Generated by: LCOV version 2.4-beta