LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB DCB
Current: bed3ffbf9d952be6c7d739d068cdce44c046dfb7 vs 574581b50ac9c63dd9e4abebb731a3b67e5b50f6 Lines: 90.6 % 404 366 17 21 103 263 28
Current Date: 2026-05-05 10:23:31 +0900 Functions: 81.0 % 21 17 2 2 11 6
Baseline: lcov-20260505-025707-baseline Branches: 78.6 % 308 242 11 55 67 175 5 29
Baseline Date: 2026-05-05 10:27:06 +0900 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 88.3 % 111 98 13 98
(30,360] days: 75.0 % 16 12 4 5 7
(360..) days: 92.4 % 277 256 21 256
Function coverage date bins:
(7,30] days: 100.0 % 6 6 6
(30,360] days: 0.0 % 2 0 2
(360..) days: 84.6 % 13 11 2 5 6
Branch coverage date bins:
(7,30] days: 85.9 % 78 67 11 67
(30,360] days: 78.6 % 14 11 3 11
(360..) days: 75.9 % 216 164 52 164

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * read_stream.c
                                  4                 :                :  *    Mechanism for accessing buffered relation data with look-ahead
                                  5                 :                :  *
                                  6                 :                :  * Code that needs to access relation data typically pins blocks one at a
                                  7                 :                :  * time, often in a predictable order that might be sequential or data-driven.
                                  8                 :                :  * Calling the simple ReadBuffer() function for each block is inefficient,
                                  9                 :                :  * because blocks that are not yet in the buffer pool require I/O operations
                                 10                 :                :  * that are small and might stall waiting for storage.  This mechanism looks
                                 11                 :                :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
                                 12                 :                :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
                                 13                 :                :  * distance.
                                 14                 :                :  *
                                 15                 :                :  * A user-provided callback generates a stream of block numbers that is used
                                 16                 :                :  * to form reads of up to io_combine_limit, by attempting to merge them with a
                                 17                 :                :  * pending read.  When that isn't possible, the existing pending read is sent
                                 18                 :                :  * to StartReadBuffers() so that a new one can begin to form.
                                 19                 :                :  *
                                 20                 :                :  * The algorithm for controlling the look-ahead distance is based on recent
                                 21                 :                :  * cache / miss history, as well as whether we need to wait for I/O completion
                                 22                 :                :  * after a miss.  When no I/O is necessary, there is no benefit in looking
                                 23                 :                :  * ahead more than one block.  This is the default initial assumption.  When
                                 24                 :                :  * blocks needing I/O are streamed, the combine distance is increased to
                                 25                 :                :  * benefit from I/O combining and the read-ahead distance is increased
                                 26                 :                :  * whenever we need to wait for I/O to try to benefit from increased I/O
                                 27                 :                :  * concurrency. Both are reduced gradually when cached blocks are streamed.
                                 28                 :                :  *
                                 29                 :                :  * The main data structure is a circular queue of buffers of size
                                 30                 :                :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
                                 31                 :                :  * returned by read_stream_next_buffer().  Each buffer also has an optional
                                 32                 :                :  * variable sized object that is passed from the callback to the consumer of
                                 33                 :                :  * buffers.
                                 34                 :                :  *
                                 35                 :                :  * Parallel to the queue of buffers, there is a circular queue of in-progress
                                 36                 :                :  * I/Os that have been started with StartReadBuffers(), and for which
                                 37                 :                :  * WaitReadBuffers() must be called before returning the buffer.
                                 38                 :                :  *
                                 39                 :                :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
                                 40                 :                :  * successive calls, then these data structures might appear as follows:
                                 41                 :                :  *
                                 42                 :                :  *                          buffers buf/data       ios
                                 43                 :                :  *
                                 44                 :                :  *                          +----+  +-----+       +--------+
                                 45                 :                :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
                                 46                 :                :  *                          +----+  +-----+  |    +--------+
                                 47                 :                :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
                                 48                 :                :  *                          +----+  +-----+  | |  +--------+
                                 49                 :                :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
                                 50                 :                :  *                          +----+  +-----+    |  +--------+
                                 51                 :                :  *                          | 43 |  |  ?  |    |  |        |
                                 52                 :                :  *                          +----+  +-----+    |  +--------+
                                 53                 :                :  *                          | 44 |  |  ?  |    |  |        |
                                 54                 :                :  *                          +----+  +-----+    |  +--------+
                                 55                 :                :  *                          | 60 |  |  ?  |<---+
                                 56                 :                :  *                          +----+  +-----+
                                 57                 :                :  *     next_buffer_index -> |    |  |     |
                                 58                 :                :  *                          +----+  +-----+
                                 59                 :                :  *
                                 60                 :                :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
                                 61                 :                :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
                                 62                 :                :  * the range 42..44 requires an I/O wait before its buffers are returned, as
                                 63                 :                :  * does block 60.
                                 64                 :                :  *
                                 65                 :                :  *
                                 66                 :                :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
                                 67                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 68                 :                :  *
                                 69                 :                :  * IDENTIFICATION
                                 70                 :                :  *    src/backend/storage/aio/read_stream.c
                                 71                 :                :  *
                                 72                 :                :  *-------------------------------------------------------------------------
                                 73                 :                :  */
                                 74                 :                : #include "postgres.h"
                                 75                 :                : 
                                 76                 :                : #include "miscadmin.h"
                                 77                 :                : #include "executor/instrument_node.h"
                                 78                 :                : #include "storage/aio.h"
                                 79                 :                : #include "storage/fd.h"
                                 80                 :                : #include "storage/smgr.h"
                                 81                 :                : #include "storage/read_stream.h"
                                 82                 :                : #include "utils/memdebug.h"
                                 83                 :                : #include "utils/rel.h"
                                 84                 :                : #include "utils/spccache.h"
                                 85                 :                : 
                                 86                 :                : typedef struct InProgressIO
                                 87                 :                : {
                                 88                 :                :     int16       buffer_index;
                                 89                 :                :     ReadBuffersOperation op;
                                 90                 :                : } InProgressIO;
                                 91                 :                : 
                                 92                 :                : /*
                                 93                 :                :  * State for managing a stream of reads.
                                 94                 :                :  */
                                 95                 :                : struct ReadStream
                                 96                 :                : {
                                 97                 :                :     int16       max_ios;
                                 98                 :                :     int16       io_combine_limit;
                                 99                 :                :     int16       ios_in_progress;
                                100                 :                :     int16       queue_size;
                                101                 :                :     int16       max_pinned_buffers;
                                102                 :                :     int16       forwarded_buffers;
                                103                 :                :     int16       pinned_buffers;
                                104                 :                : 
                                105                 :                :     /*
                                106                 :                :      * Limit of how far, in blocks, to look-ahead for IO combining and for
                                107                 :                :      * read-ahead.
                                108                 :                :      *
                                109                 :                :      * The limits for read-ahead and combining are handled separately to allow
                                110                 :                :      * for IO combining even in cases where the I/O subsystem can keep up at a
                                111                 :                :      * low read-ahead distance, as doing larger IOs is more efficient.
                                112                 :                :      *
                                113                 :                :      * Set to 0 when the end of the stream is reached.
                                114                 :                :      */
                                115                 :                :     int16       combine_distance;
                                116                 :                :     int16       readahead_distance;
                                117                 :                :     uint16      distance_decay_holdoff;
                                118                 :                :     int16       initialized_buffers;
                                119                 :                :     int16       resume_readahead_distance;
                                120                 :                :     int16       resume_combine_distance;
                                121                 :                :     int         read_buffers_flags;
                                122                 :                :     bool        sync_mode;      /* using io_method=sync */
                                123                 :                :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
                                124                 :                :     bool        advice_enabled;
                                125                 :                :     bool        temporary;
                                126                 :                : 
                                127                 :                :     /* scan stats counters */
                                128                 :                :     IOStats    *stats;
                                129                 :                : 
                                130                 :                :     /*
                                131                 :                :      * One-block buffer to support 'ungetting' a block number, to resolve flow
                                132                 :                :      * control problems when I/Os are split.
                                133                 :                :      */
                                134                 :                :     BlockNumber buffered_blocknum;
                                135                 :                : 
                                136                 :                :     /*
                                137                 :                :      * The callback that will tell us which block numbers to read, and an
                                138                 :                :      * opaque pointer that will be pass to it for its own purposes.
                                139                 :                :      */
                                140                 :                :     ReadStreamBlockNumberCB callback;
                                141                 :                :     void       *callback_private_data;
                                142                 :                : 
                                143                 :                :     /* Next expected block, for detecting sequential access. */
                                144                 :                :     BlockNumber seq_blocknum;
                                145                 :                :     BlockNumber seq_until_processed;
                                146                 :                : 
                                147                 :                :     /* The read operation we are currently preparing. */
                                148                 :                :     BlockNumber pending_read_blocknum;
                                149                 :                :     int16       pending_read_nblocks;
                                150                 :                : 
                                151                 :                :     /* Space for buffers and optional per-buffer private data. */
                                152                 :                :     size_t      per_buffer_data_size;
                                153                 :                :     void       *per_buffer_data;
                                154                 :                : 
                                155                 :                :     /* Read operations that have been started but not waited for yet. */
                                156                 :                :     InProgressIO *ios;
                                157                 :                :     int16       oldest_io_index;
                                158                 :                :     int16       next_io_index;
                                159                 :                : 
                                160                 :                :     bool        fast_path;
                                161                 :                : 
                                162                 :                :     /* Circular queue of buffers. */
                                163                 :                :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
                                164                 :                :     int16       next_buffer_index;  /* Index of next buffer to pin */
                                165                 :                :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
                                166                 :                : };
                                167                 :                : 
                                168                 :                : /*
                                169                 :                :  * Return a pointer to the per-buffer data by index.
                                170                 :                :  */
                                171                 :                : static inline void *
  762 tmunro@postgresql.or      172                 :CBC     3903586 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
                                173                 :                : {
                                174                 :        7807172 :     return (char *) stream->per_buffer_data +
                                175                 :        3903586 :         stream->per_buffer_data_size * buffer_index;
                                176                 :                : }
                                177                 :                : 
                                178                 :                : /*
                                179                 :                :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
                                180                 :                :  * blocks [current_blocknum, last_exclusive).
                                181                 :                :  */
                                182                 :                : BlockNumber
  609 noah@leadboat.com         183                 :         429088 : block_range_read_stream_cb(ReadStream *stream,
                                184                 :                :                            void *callback_private_data,
                                185                 :                :                            void *per_buffer_data)
                                186                 :                : {
                                187                 :         429088 :     BlockRangeReadStreamPrivate *p = callback_private_data;
                                188                 :                : 
                                189         [ +  + ]:         429088 :     if (p->current_blocknum < p->last_exclusive)
                                190                 :         352345 :         return p->current_blocknum++;
                                191                 :                : 
                                192                 :          76743 :     return InvalidBlockNumber;
                                193                 :                : }
                                194                 :                : 
                                195                 :                : /*
                                196                 :                :  * Update stream stats with current pinned buffer depth.
                                197                 :                :  *
                                198                 :                :  * Called once per buffer returned to the consumer in read_stream_next_buffer().
                                199                 :                :  * Records the number of pinned buffers at that moment, so we can compute the
                                200                 :                :  * average look-ahead depth.
                                201                 :                :  */
                                202                 :                : static inline void
   28 tomas.vondra@postgre      203                 :GNC     4395792 : read_stream_count_prefetch(ReadStream *stream)
                                204                 :                : {
                                205                 :        4395792 :     IOStats    *stats = stream->stats;
                                206                 :                : 
                                207         [ +  + ]:        4395792 :     if (stats == NULL)
                                208                 :        4395784 :         return;
                                209                 :                : 
                                210                 :              8 :     stats->prefetch_count++;
                                211                 :              8 :     stats->distance_sum += stream->pinned_buffers;
                                212         [ +  - ]:              8 :     if (stream->pinned_buffers > stats->distance_max)
                                213                 :              8 :         stats->distance_max = stream->pinned_buffers;
                                214                 :                : }
                                215                 :                : 
                                216                 :                : /*
                                217                 :                :  * Update stream stats about size of I/O requests.
                                218                 :                :  *
                                219                 :                :  * We count the number of I/O requests, size of requests (counted in blocks)
                                220                 :                :  * and number of in-progress I/Os.
                                221                 :                :  */
                                222                 :                : static inline void
                                223                 :         697822 : read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
                                224                 :                : {
                                225                 :         697822 :     IOStats    *stats = stream->stats;
                                226                 :                : 
                                227         [ +  - ]:         697822 :     if (stats == NULL)
                                228                 :         697822 :         return;
                                229                 :                : 
   28 tomas.vondra@postgre      230                 :UNC           0 :     stats->io_count++;
                                231                 :              0 :     stats->io_nblocks += nblocks;
                                232                 :              0 :     stats->io_in_progress += in_progress;
                                233                 :                : }
                                234                 :                : 
                                235                 :                : /*
                                236                 :                :  * Update stream stats about waits for I/O when consuming buffers.
                                237                 :                :  *
                                238                 :                :  * We count the number of I/O waits while pulling buffers out of a stream.
                                239                 :                :  */
                                240                 :                : static inline void
   28 tomas.vondra@postgre      241                 :GNC      317670 : read_stream_count_wait(ReadStream *stream)
                                242                 :                : {
                                243                 :         317670 :     IOStats    *stats = stream->stats;
                                244                 :                : 
                                245         [ +  - ]:         317670 :     if (stats == NULL)
                                246                 :         317670 :         return;
                                247                 :                : 
   28 tomas.vondra@postgre      248                 :UNC           0 :     stats->wait_count++;
                                249                 :                : }
                                250                 :                : 
                                251                 :                : /*
                                252                 :                :  * Enable collection of stats into the provided IOStats.
                                253                 :                :  */
                                254                 :                : void
   28 tomas.vondra@postgre      255                 :GNC           8 : read_stream_enable_stats(ReadStream *stream, IOStats *stats)
                                256                 :                : {
                                257                 :              8 :     stream->stats = stats;
                                258         [ +  - ]:              8 :     if (stream->stats)
                                259                 :              8 :         stream->stats->distance_capacity = stream->max_pinned_buffers;
                                260                 :              8 : }
                                261                 :                : 
                                262                 :                : /*
                                263                 :                :  * Ask the callback which block it would like us to read next, with a one block
                                264                 :                :  * buffer in front to allow read_stream_unget_block() to work.
                                265                 :                :  */
                                266                 :                : static inline BlockNumber
  762 tmunro@postgresql.or      267                 :CBC     5820862 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
                                268                 :                : {
                                269                 :                :     BlockNumber blocknum;
                                270                 :                : 
  612                           271                 :        5820862 :     blocknum = stream->buffered_blocknum;
                                272         [ -  + ]:        5820862 :     if (blocknum != InvalidBlockNumber)
  612 tmunro@postgresql.or      273                 :UBC           0 :         stream->buffered_blocknum = InvalidBlockNumber;
                                274                 :                :     else
                                275                 :                :     {
                                276                 :                :         /*
                                277                 :                :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
                                278                 :                :          * the "noaccess" state that was set when the consumer moved past this
                                279                 :                :          * entry last time around the queue, and should also catch callbacks
                                280                 :                :          * that fail to initialize data that the buffer consumer later
                                281                 :                :          * accesses.  On the first go around, it is undefined already.
                                282                 :                :          */
                                283                 :                :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
                                284                 :                :                                     stream->per_buffer_data_size);
  612 tmunro@postgresql.or      285                 :CBC     5820862 :         blocknum = stream->callback(stream,
                                286                 :                :                                     stream->callback_private_data,
                                287                 :                :                                     per_buffer_data);
                                288                 :                :     }
                                289                 :                : 
                                290                 :        5820862 :     return blocknum;
                                291                 :                : }
                                292                 :                : 
                                293                 :                : /*
                                294                 :                :  * In order to deal with buffer shortages and I/O limits after short reads, we
                                295                 :                :  * sometimes need to defer handling of a block we've already consumed from the
                                296                 :                :  * registered callback until later.
                                297                 :                :  */
                                298                 :                : static inline void
  762 tmunro@postgresql.or      299                 :UBC           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
                                300                 :                : {
                                301                 :                :     /* We shouldn't ever unget more than one block. */
  612                           302         [ #  # ]:              0 :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
                                303         [ #  # ]:              0 :     Assert(blocknum != InvalidBlockNumber);
                                304                 :              0 :     stream->buffered_blocknum = blocknum;
  762                           305                 :              0 : }
                                306                 :                : 
                                307                 :                : /*
                                308                 :                :  * Start as much of the current pending read as we can.  If we have to split it
                                309                 :                :  * because of the per-backend buffer limit, or the buffer manager decides to
                                310                 :                :  * split it, then the pending read is adjusted to hold the remaining portion.
                                311                 :                :  *
                                312                 :                :  * We can always start a read of at least size one if we have no progress yet.
                                313                 :                :  * Otherwise it's possible that we can't start a read at all because of a lack
                                314                 :                :  * of buffers, and then false is returned.  Buffer shortages also reduce the
                                315                 :                :  * distance to a level that prevents look-ahead until buffers are released.
                                316                 :                :  */
                                317                 :                : static bool
  416 tmunro@postgresql.or      318                 :CBC     1786329 : read_stream_start_pending_read(ReadStream *stream)
                                319                 :                : {
                                320                 :                :     bool        need_wait;
                                321                 :                :     int         requested_nblocks;
                                322                 :                :     int         nblocks;
                                323                 :                :     int         flags;
                                324                 :                :     int         forwarded;
                                325                 :                :     int16       io_index;
                                326                 :                :     int16       overflow;
                                327                 :                :     int16       buffer_index;
                                328                 :                :     int         buffer_limit;
                                329                 :                : 
                                330                 :                :     /* This should only be called with a pending read. */
  762                           331         [ -  + ]:        1786329 :     Assert(stream->pending_read_nblocks > 0);
  418                           332         [ -  + ]:        1786329 :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
                                333                 :                : 
                                334                 :                :     /* We had better not exceed the per-stream buffer limit with this read. */
  762                           335         [ -  + ]:        1786329 :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
                                336                 :                :            stream->max_pinned_buffers);
                                337                 :                : 
                                338                 :                : #ifdef USE_ASSERT_CHECKING
                                339                 :                :     /* We had better not be overwriting an existing pinned buffer. */
                                340         [ +  + ]:        1786329 :     if (stream->pinned_buffers > 0)
                                341         [ -  + ]:           8417 :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
                                342                 :                :     else
                                343         [ -  + ]:        1777912 :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
                                344                 :                : 
                                345                 :                :     /*
                                346                 :                :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
                                347                 :                :      * had to split the operation should match the leading blocks of this
                                348                 :                :      * following StartReadBuffers() call.
                                349                 :                :      */
  269                           350         [ -  + ]:        1786329 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                351         [ +  + ]:        1788506 :     for (int i = 0; i < stream->forwarded_buffers; ++i)
                                352         [ -  + ]:           2177 :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
                                353                 :                :                stream->pending_read_blocknum + i);
                                354                 :                : 
                                355                 :                :     /*
                                356                 :                :      * Check that we've cleared the queue/overflow entries corresponding to
                                357                 :                :      * the rest of the blocks covered by this read, unless it's the first go
                                358                 :                :      * around and we haven't even initialized them yet.
                                359                 :                :      */
                                360         [ +  + ]:        4185685 :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
                                361   [ +  +  -  + ]:        2399356 :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
                                362                 :                :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
                                363                 :                : #endif
                                364                 :                : 
                                365                 :                :     /* Do we need to issue read-ahead advice? */
  401 andres@anarazel.de        366                 :        1786329 :     flags = stream->read_buffers_flags;
  416 tmunro@postgresql.or      367         [ +  + ]:        1786329 :     if (stream->advice_enabled)
                                368                 :                :     {
                                369         [ +  + ]:           1729 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
                                370                 :                :         {
                                371                 :                :             /*
                                372                 :                :              * Sequential:  Issue advice until the preadv() calls have caught
                                373                 :                :              * up with the first advice issued for this sequential region, and
                                374                 :                :              * then stay out of the way of the kernel's own read-ahead.
                                375                 :                :              */
                                376         [ +  + ]:             30 :             if (stream->seq_until_processed != InvalidBlockNumber)
  401 andres@anarazel.de        377                 :              1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                378                 :                :         }
                                379                 :                :         else
                                380                 :                :         {
                                381                 :                :             /*
                                382                 :                :              * Random jump:  Note the starting location of a new potential
                                383                 :                :              * sequential region and start issuing advice.  Skip it this time
                                384                 :                :              * if the preadv() follows immediately, eg first block in stream.
                                385                 :                :              */
  416 tmunro@postgresql.or      386                 :           1699 :             stream->seq_until_processed = stream->pending_read_blocknum;
                                387         [ +  + ]:           1699 :             if (stream->pinned_buffers > 0)
  401 andres@anarazel.de        388                 :             44 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                389                 :                :         }
                                390                 :                :     }
                                391                 :                : 
                                392                 :                :     /*
                                393                 :                :      * How many more buffers is this backend allowed?
                                394                 :                :      *
                                395                 :                :      * Forwarded buffers are already pinned and map to the leading blocks of
                                396                 :                :      * the pending read (the remaining portion of an earlier short read that
                                397                 :                :      * we're about to continue).  They are not counted in pinned_buffers, but
                                398                 :                :      * they are counted as pins already held by this backend according to the
                                399                 :                :      * buffer manager, so they must be added to the limit it grants us.
                                400                 :                :      */
  417 tmunro@postgresql.or      401         [ +  + ]:        1786329 :     if (stream->temporary)
                                402         [ +  - ]:          16425 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
                                403                 :                :     else
                                404         [ +  - ]:        1769904 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
  410                           405         [ -  + ]:        1786329 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                406                 :                : 
                                407                 :        1786329 :     buffer_limit += stream->forwarded_buffers;
  393 andres@anarazel.de        408                 :        1786329 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
                                409                 :                : 
  417 tmunro@postgresql.or      410   [ +  +  +  - ]:        1786329 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
                                411                 :         742397 :         buffer_limit = 1;       /* guarantee progress */
                                412                 :                : 
                                413                 :                :     /* Does the per-backend limit affect this read? */
                                414                 :        1786329 :     nblocks = stream->pending_read_nblocks;
                                415         [ +  + ]:        1786329 :     if (buffer_limit < nblocks)
                                416                 :                :     {
                                417                 :                :         int16       new_distance;
                                418                 :                : 
                                419                 :                :         /* Shrink distance: no more look-ahead until buffers are released. */
                                420                 :           1909 :         new_distance = stream->pinned_buffers + buffer_limit;
   30 andres@anarazel.de        421         [ +  + ]:GNC        1909 :         if (stream->readahead_distance > new_distance)
                                422                 :            481 :             stream->readahead_distance = new_distance;
                                423                 :                : 
                                424                 :                :         /* Unless we have nothing to give the consumer, stop here. */
  417 tmunro@postgresql.or      425         [ +  + ]:CBC        1909 :         if (stream->pinned_buffers > 0)
                                426                 :             74 :             return false;
                                427                 :                : 
                                428                 :                :         /* A short read is required to make progress. */
                                429                 :           1835 :         nblocks = buffer_limit;
                                430                 :                :     }
                                431                 :                : 
                                432                 :                :     /*
                                433                 :                :      * We say how many blocks we want to read, but it may be smaller on return
                                434                 :                :      * if the buffer manager decides to shorten the read.  Initialize buffers
                                435                 :                :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
                                436                 :                :      * and keep the original nblocks number so we can check for forwarded
                                437                 :                :      * buffers as output, below.
                                438                 :                :      */
  762                           439                 :        1786255 :     buffer_index = stream->next_buffer_index;
                                440                 :        1786255 :     io_index = stream->next_io_index;
  410                           441         [ +  + ]:        2931426 :     while (stream->initialized_buffers < buffer_index + nblocks)
                                442                 :        1145171 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
                                443                 :        1786255 :     requested_nblocks = nblocks;
  762                           444                 :        1786255 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                445                 :        1786255 :                                  &stream->buffers[buffer_index],
                                446                 :                :                                  stream->pending_read_blocknum,
                                447                 :                :                                  &nblocks,
                                448                 :                :                                  flags);
                                449                 :        1786247 :     stream->pinned_buffers += nblocks;
                                450                 :                : 
                                451                 :                :     /* Remember whether we need to wait before returning this buffer. */
                                452         [ +  + ]:        1786247 :     if (!need_wait)
                                453                 :                :     {
                                454                 :                :         /*
                                455                 :                :          * If there currently is no IO in progress, and we have not needed to
                                456                 :                :          * issue IO recently, decay the look-ahead distance.  We detect if we
                                457                 :                :          * had to issue IO recently by having a decay holdoff that's set to
                                458                 :                :          * the max look-ahead distance whenever we need to do IO.  This is
                                459                 :                :          * important to ensure we eventually reach a high enough distance to
                                460                 :                :          * perform IO asynchronously when starting out with a small look-ahead
                                461                 :                :          * distance.
                                462                 :                :          */
   30 andres@anarazel.de        463         [ +  + ]:GNC     1105499 :         if (stream->ios_in_progress == 0)
                                464                 :                :         {
                                465         [ +  + ]:        1105011 :             if (stream->distance_decay_holdoff > 0)
   34                           466                 :          26910 :                 stream->distance_decay_holdoff--;
                                467                 :                :             else
                                468                 :                :             {
   30                           469         [ +  + ]:        1078101 :                 if (stream->readahead_distance > 1)
                                470                 :          18089 :                     stream->readahead_distance--;
                                471                 :                : 
                                472                 :                :                 /*
                                473                 :                :                  * For now we reduce the IO combine distance after
                                474                 :                :                  * sufficiently many buffer hits. There is no clear
                                475                 :                :                  * performance argument for doing so, but at the moment we
                                476                 :                :                  * need to do so to make the entrance into fast_path work
                                477                 :                :                  * correctly: We require combine_distance == 1 to enter
                                478                 :                :                  * fast-path, as without that condition we would wrongly
                                479                 :                :                  * re-enter fast-path when readahead_distance == 1 and
                                480                 :                :                  * pinned_buffers == 1, as we would not yet have prepared
                                481                 :                :                  * another IO in that situation.
                                482                 :                :                  */
                                483         [ +  + ]:        1078101 :                 if (stream->combine_distance > 1)
                                484                 :          18080 :                     stream->combine_distance--;
                                485                 :                :             }
                                486                 :                :         }
                                487                 :                :     }
                                488                 :                :     else
                                489                 :                :     {
                                490                 :                :         /*
                                491                 :                :          * Remember to call WaitReadBuffers() before returning head buffer.
                                492                 :                :          * Look-ahead distance will be adjusted after waiting.
                                493                 :                :          */
  762 tmunro@postgresql.or      494                 :CBC      680748 :         stream->ios[io_index].buffer_index = buffer_index;
                                495         [ +  + ]:         680748 :         if (++stream->next_io_index == stream->max_ios)
                                496                 :          28578 :             stream->next_io_index = 0;
                                497         [ -  + ]:         680748 :         Assert(stream->ios_in_progress < stream->max_ios);
                                498                 :         680748 :         stream->ios_in_progress++;
                                499                 :         680748 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
                                500                 :                : 
                                501                 :                :         /* update I/O stats */
   28 tomas.vondra@postgre      502                 :GNC      680748 :         read_stream_count_io(stream, nblocks, stream->ios_in_progress);
                                503                 :                :     }
                                504                 :                : 
                                505                 :                :     /*
                                506                 :                :      * How many pins were acquired but forwarded to the next call?  These need
                                507                 :                :      * to be passed to the next StartReadBuffers() call by leaving them
                                508                 :                :      * exactly where they are in the queue, or released if the stream ends
                                509                 :                :      * early.  We need the number for accounting purposes, since they are not
                                510                 :                :      * counted in stream->pinned_buffers but we already hold them.
                                511                 :                :      */
  410 tmunro@postgresql.or      512                 :CBC     1786247 :     forwarded = 0;
                                513         [ +  + ]:        1788427 :     while (nblocks + forwarded < requested_nblocks &&
                                514         [ +  + ]:          67451 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
                                515                 :           2180 :         forwarded++;
                                516                 :        1786247 :     stream->forwarded_buffers = forwarded;
                                517                 :                : 
                                518                 :                :     /*
                                519                 :                :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
                                520                 :                :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
                                521                 :                :      * the front of the array where they'll be consumed, but also leave a copy
                                522                 :                :      * in the overflow zone which the I/O operation has a pointer to (it needs
                                523                 :                :      * a contiguous array).  Both copies will be cleared when the buffers are
                                524                 :                :      * handed to the consumer.
                                525                 :                :      */
                                526                 :        1786247 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
  762                           527         [ +  + ]:        1786247 :     if (overflow > 0)
                                528                 :                :     {
  410                           529         [ -  + ]:            439 :         Assert(overflow < stream->queue_size);    /* can't overlap */
                                530                 :            439 :         memcpy(&stream->buffers[0],
                                531                 :            439 :                &stream->buffers[stream->queue_size],
                                532                 :                :                sizeof(stream->buffers[0]) * overflow);
                                533                 :                :     }
                                534                 :                : 
                                535                 :                :     /* Compute location of start of next read, without using % operator. */
  762                           536                 :        1786247 :     buffer_index += nblocks;
                                537         [ +  + ]:        1786247 :     if (buffer_index >= stream->queue_size)
                                538                 :         345915 :         buffer_index -= stream->queue_size;
                                539   [ +  -  -  + ]:        1786247 :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                540                 :        1786247 :     stream->next_buffer_index = buffer_index;
                                541                 :                : 
                                542                 :                :     /* Adjust the pending read to cover the remaining portion, if any. */
                                543                 :        1786247 :     stream->pending_read_blocknum += nblocks;
                                544                 :        1786247 :     stream->pending_read_nblocks -= nblocks;
                                545                 :                : 
  417                           546                 :        1786247 :     return true;
                                547                 :                : }
                                548                 :                : 
                                549                 :                : /*
                                550                 :                :  * Should we continue to perform look ahead?  Looking ahead may allow us to
                                551                 :                :  * make the pending IO larger via IO combining or to issue more read-ahead.
                                552                 :                :  */
                                553                 :                : static inline bool
   30 andres@anarazel.de        554                 :GNC     5515606 : read_stream_should_look_ahead(ReadStream *stream)
                                555                 :                : {
                                556                 :                :     /* If the callback has signaled end-of-stream, we're done */
                                557         [ +  + ]:        5515606 :     if (stream->readahead_distance == 0)
                                558                 :         330305 :         return false;
                                559                 :                : 
                                560                 :                :     /* never start more IOs than our cap */
                                561         [ -  + ]:        5185301 :     if (stream->ios_in_progress >= stream->max_ios)
   30 andres@anarazel.de        562                 :UNC           0 :         return false;
                                563                 :                : 
                                564                 :                :     /*
                                565                 :                :      * Allow looking further ahead if we are in the process of building a
                                566                 :                :      * larger IO, the IO is not yet big enough, and we don't yet have IO in
                                567                 :                :      * flight.
                                568                 :                :      *
                                569                 :                :      * We do so to allow building larger reads when readahead_distance is
                                570                 :                :      * small (e.g. because the I/O subsystem is keeping up or
                                571                 :                :      * effective_io_concurrency is small). That's a useful goal because larger
                                572                 :                :      * reads are more CPU efficient than smaller reads, even if the system is
                                573                 :                :      * not IO bound.
                                574                 :                :      *
                                575                 :                :      * The reason we do *not* do so when we already have a read prepared (i.e.
                                576                 :                :      * why we check for pinned_buffers == 0) is once we are actually reading
                                577                 :                :      * ahead, we don't need it:
                                578                 :                :      *
                                579                 :                :      * - We won't issue unnecessarily small reads as
                                580                 :                :      * read_stream_should_issue_now() will return false until the IO is
                                581                 :                :      * suitably sized. The issuance of the pending read will be delayed until
                                582                 :                :      * enough buffers have been consumed.
                                583                 :                :      *
                                584                 :                :      * - If we are not reading ahead aggressively enough, future
                                585                 :                :      * WaitReadBuffers() calls will return true, leading to readahead_distance
                                586                 :                :      * being increased. After that more full-sized IOs can be issued.
                                587                 :                :      *
                                588                 :                :      * Furthermore, if we did not have the pinned_buffers == 0 condition, we
                                589                 :                :      * might end up issuing I/O more aggressively than we need.
                                590                 :                :      *
                                591                 :                :      * Note that a return of true here can lead to exceeding the read-ahead
                                592                 :                :      * limit, but we won't exceed the buffer pin limit (because pinned_buffers
                                593                 :                :      * == 0 and combine_distance is capped by max_pinned_buffers).
                                594                 :                :      */
   30 andres@anarazel.de        595         [ +  + ]:GNC     5185301 :     if (stream->pending_read_nblocks > 0 &&
                                596         [ +  + ]:        2288479 :         stream->pinned_buffers == 0 &&
                                597         [ +  + ]:        2164157 :         stream->pending_read_nblocks < stream->combine_distance)
                                598                 :         485782 :         return true;
                                599                 :                : 
                                600                 :                :     /*
                                601                 :                :      * Don't start more read-ahead if that'd put us over the distance limit
                                602                 :                :      * for doing read-ahead. As stream->readahead_distance is capped by
                                603                 :                :      * max_pinned_buffers, this prevents us from looking ahead so far that it
                                604                 :                :      * would put us over the pin limit.
                                605                 :                :      */
                                606         [ +  + ]:        4699519 :     if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
                                607                 :        1756687 :         return false;
                                608                 :                : 
                                609                 :        2942832 :     return true;
                                610                 :                : }
                                611                 :                : 
                                612                 :                : /*
                                613                 :                :  * We don't start the pending read just because we've hit the distance limit,
                                614                 :                :  * preferring to give it another chance to grow to full io_combine_limit size
                                615                 :                :  * once more buffers have been consumed.  But this is not desirable in all
                                616                 :                :  * situations - see below.
                                617                 :                :  */
                                618                 :                : static inline bool
                                619                 :        6489684 : read_stream_should_issue_now(ReadStream *stream)
                                620                 :                : {
                                621                 :        6489684 :     int16       pending_read_nblocks = stream->pending_read_nblocks;
                                622                 :                : 
                                623                 :                :     /* there is no pending IO that could be issued */
                                624         [ +  + ]:        6489684 :     if (pending_read_nblocks == 0)
                                625                 :        4340285 :         return false;
                                626                 :                : 
                                627                 :                :     /* never start more IOs than our cap */
                                628         [ -  + ]:        2149399 :     if (stream->ios_in_progress >= stream->max_ios)
   30 andres@anarazel.de        629                 :UNC           0 :         return false;
                                630                 :                : 
                                631                 :                :     /*
                                632                 :                :      * If the callback has signaled end-of-stream, start the pending read
                                633                 :                :      * immediately. There is no further potential for IO combining.
                                634                 :                :      */
   30 andres@anarazel.de        635         [ +  + ]:GNC     2149399 :     if (stream->readahead_distance == 0)
                                636                 :         103811 :         return true;
                                637                 :                : 
                                638                 :                :     /*
                                639                 :                :      * If we've already reached combine_distance, there's no chance of growing
                                640                 :                :      * the read further.
                                641                 :                :      */
                                642         [ +  + ]:        2045588 :     if (pending_read_nblocks >= stream->combine_distance)
                                643                 :        1682459 :         return true;
                                644                 :                : 
                                645                 :                :     /*
                                646                 :                :      * If we currently have no reads in flight or prepared, issue the IO once
                                647                 :                :      * we are not looking ahead further. This ensures there's always at least
                                648                 :                :      * one IO prepared.
                                649                 :                :      */
                                650         [ +  + ]:         363129 :     if (stream->pinned_buffers == 0 &&
                                651         [ -  + ]:         242891 :         !read_stream_should_look_ahead(stream))
   30 andres@anarazel.de        652                 :UNC           0 :         return true;
                                653                 :                : 
   30 andres@anarazel.de        654                 :GNC      363129 :     return false;
                                655                 :                : }
                                656                 :                : 
                                657                 :                : static void
  416 tmunro@postgresql.or      658                 :CBC     3303961 : read_stream_look_ahead(ReadStream *stream)
                                659                 :                : {
                                660                 :                :     /*
                                661                 :                :      * Allow amortizing the cost of submitting IO over multiple IOs. This
                                662                 :                :      * requires that we don't do any operations that could lead to a deadlock
                                663                 :                :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
                                664                 :                :      * careful.
                                665                 :                :      */
  401 andres@anarazel.de        666         [ +  + ]:        3303961 :     if (stream->batch_mode)
                                667                 :        3194181 :         pgaio_enter_batchmode();
                                668                 :                : 
   30 andres@anarazel.de        669         [ +  + ]:GNC     5272715 :     while (read_stream_should_look_ahead(stream))
                                670                 :                :     {
                                671                 :                :         BlockNumber blocknum;
                                672                 :                :         int16       buffer_index;
                                673                 :                :         void       *per_buffer_data;
                                674                 :                : 
                                675         [ +  + ]:        3185723 :         if (read_stream_should_issue_now(stream))
                                676                 :                :         {
  416 tmunro@postgresql.or      677                 :CBC        1514 :             read_stream_start_pending_read(stream);
  762                           678                 :           1514 :             continue;
                                679                 :                :         }
                                680                 :                : 
                                681                 :                :         /*
                                682                 :                :          * See which block the callback wants next in the stream.  We need to
                                683                 :                :          * compute the index of the Nth block of the pending read including
                                684                 :                :          * wrap-around, but we don't want to use the expensive % operator.
                                685                 :                :          */
                                686                 :        3184209 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
                                687         [ +  + ]:        3184209 :         if (buffer_index >= stream->queue_size)
                                688                 :           3482 :             buffer_index -= stream->queue_size;
                                689   [ +  -  -  + ]:        3184209 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                690                 :        3184209 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
                                691                 :        3184209 :         blocknum = read_stream_get_block(stream, per_buffer_data);
                                692         [ +  + ]:        3184209 :         if (blocknum == InvalidBlockNumber)
                                693                 :                :         {
                                694                 :                :             /* End of stream. */
   30 andres@anarazel.de        695                 :GNC     1216969 :             stream->readahead_distance = 0;
                                696                 :        1216969 :             stream->combine_distance = 0;
  762 tmunro@postgresql.or      697                 :CBC     1216969 :             break;
                                698                 :                :         }
                                699                 :                : 
                                700                 :                :         /* Can we merge it with the pending read? */
                                701         [ +  + ]:        1967240 :         if (stream->pending_read_nblocks > 0 &&
                                702         [ +  + ]:         248552 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
                                703                 :                :         {
                                704                 :         248493 :             stream->pending_read_nblocks++;
                                705                 :         248493 :             continue;
                                706                 :                :         }
                                707                 :                : 
                                708                 :                :         /* We have to start the pending read before we can build another. */
  758                           709         [ +  + ]:        1718806 :         while (stream->pending_read_nblocks > 0)
                                710                 :                :         {
  416                           711         [ +  - ]:             59 :             if (!read_stream_start_pending_read(stream) ||
  417                           712         [ -  + ]:             59 :                 stream->ios_in_progress == stream->max_ios)
                                713                 :                :             {
                                714                 :                :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
  762 tmunro@postgresql.or      715                 :UBC           0 :                 read_stream_unget_block(stream, blocknum);
  401 andres@anarazel.de        716         [ #  # ]:              0 :                 if (stream->batch_mode)
                                717                 :              0 :                     pgaio_exit_batchmode();
  762 tmunro@postgresql.or      718                 :              0 :                 return;
                                719                 :                :             }
                                720                 :                :         }
                                721                 :                : 
                                722                 :                :         /* This is the start of a new pending read. */
  762 tmunro@postgresql.or      723                 :CBC     1718747 :         stream->pending_read_blocknum = blocknum;
                                724                 :        1718747 :         stream->pending_read_nblocks = 1;
                                725                 :                :     }
                                726                 :                : 
                                727                 :                :     /*
                                728                 :                :      * Check if the pending read should be issued now, or if we should give it
                                729                 :                :      * another chance to grow to the full size.
                                730                 :                :      *
                                731                 :                :      * Note that the pending read can exceed the distance goal, if the latter
                                732                 :                :      * was reduced after hitting the per-backend buffer limit.
                                733                 :                :      */
   30 andres@anarazel.de        734         [ +  + ]:GNC     3303961 :     if (read_stream_should_issue_now(stream))
  416 tmunro@postgresql.or      735                 :CBC     1784756 :         read_stream_start_pending_read(stream);
                                736                 :                : 
                                737                 :                :     /*
                                738                 :                :      * There should always be something pinned when we leave this function,
                                739                 :                :      * whether started by this call or not, unless we've hit the end of the
                                740                 :                :      * stream.  In the worst case we can always make progress one buffer at a
                                741                 :                :      * time.
                                742                 :                :      */
   30 andres@anarazel.de        743   [ +  +  -  + ]:GNC     3303953 :     Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
                                744                 :                : 
  401 andres@anarazel.de        745         [ +  + ]:CBC     3303953 :     if (stream->batch_mode)
                                746                 :        3194173 :         pgaio_exit_batchmode();
                                747                 :                : }
                                748                 :                : 
                                749                 :                : /*
                                750                 :                :  * Create a new read stream object that can be used to perform the equivalent
                                751                 :                :  * of a series of ReadBuffer() calls for one fork of one relation.
                                752                 :                :  * Internally, it generates larger vectored reads where possible by looking
                                753                 :                :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
                                754                 :                :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
                                755                 :                :  * write extra data for each block into the space provided to it.  It will
                                756                 :                :  * also receive callback_private_data for its own purposes.
                                757                 :                :  */
                                758                 :                : static ReadStream *
  654 noah@leadboat.com         759                 :         574103 : read_stream_begin_impl(int flags,
                                760                 :                :                        BufferAccessStrategy strategy,
                                761                 :                :                        Relation rel,
                                762                 :                :                        SMgrRelation smgr,
                                763                 :                :                        char persistence,
                                764                 :                :                        ForkNumber forknum,
                                765                 :                :                        ReadStreamBlockNumberCB callback,
                                766                 :                :                        void *callback_private_data,
                                767                 :                :                        size_t per_buffer_data_size)
                                768                 :                : {
                                769                 :                :     ReadStream *stream;
                                770                 :                :     size_t      size;
                                771                 :                :     int16       queue_size;
                                772                 :                :     int16       queue_overflow;
                                773                 :                :     int         max_ios;
                                774                 :                :     int         strategy_pin_limit;
                                775                 :                :     uint32      max_pinned_buffers;
                                776                 :                :     uint32      max_possible_buffer_limit;
                                777                 :                :     Oid         tablespace_id;
                                778                 :                : 
                                779                 :                :     /*
                                780                 :                :      * Decide how many I/Os we will allow to run at the same time.  This
                                781                 :                :      * number also affects how far we look ahead for opportunities to start
                                782                 :                :      * more I/Os.
                                783                 :                :      */
  762 tmunro@postgresql.or      784                 :         574103 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
                                785   [ +  +  +  + ]:         574103 :     if (!OidIsValid(MyDatabaseId) ||
  654 noah@leadboat.com         786   [ +  +  +  + ]:         681538 :         (rel && IsCatalogRelation(rel)) ||
  762 tmunro@postgresql.or      787                 :         187149 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
                                788                 :                :     {
                                789                 :                :         /*
                                790                 :                :          * Avoid circularity while trying to look up tablespace settings or
                                791                 :                :          * before spccache.c is ready.
                                792                 :                :          */
                                793                 :         452428 :         max_ios = effective_io_concurrency;
                                794                 :                :     }
                                795         [ +  + ]:         121675 :     else if (flags & READ_STREAM_MAINTENANCE)
                                796                 :           8674 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
                                797                 :                :     else
                                798                 :         113001 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
                                799                 :                : 
                                800                 :                :     /* Cap to INT16_MAX to avoid overflowing below */
                                801                 :         574103 :     max_ios = Min(max_ios, PG_INT16_MAX);
                                802                 :                : 
                                803                 :                :     /*
                                804                 :                :      * If starting a multi-block I/O near the end of the queue, we might
                                805                 :                :      * temporarily need extra space for overflowing buffers before they are
                                806                 :                :      * moved to regular circular position.  This is the maximum extra space we
                                807                 :                :      * could need.
                                808                 :                :      */
  418                           809                 :         574103 :     queue_overflow = io_combine_limit - 1;
                                810                 :                : 
                                811                 :                :     /*
                                812                 :                :      * Choose the maximum number of buffers we're prepared to pin.  We try to
                                813                 :                :      * pin fewer if we can, though.  We add one so that we can make progress
                                814                 :                :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
                                815                 :                :      * this also allows an extra full I/O's worth of buffers: after an I/O
                                816                 :                :      * finishes we don't want to have to wait for its buffers to be consumed
                                817                 :                :      * before starting a new one.
                                818                 :                :      *
                                819                 :                :      * Be careful not to allow int16 to overflow.  That is possible with the
                                820                 :                :      * current GUC range limits, so this is an artificial limit of ~32k
                                821                 :                :      * buffers and we'd need to adjust the types to exceed that.  We also have
                                822                 :                :      * to allow for the spare entry and the overflow space.
                                823                 :                :      */
  432                           824                 :         574103 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
  762                           825                 :         574103 :     max_pinned_buffers = Min(max_pinned_buffers,
                                826                 :                :                              PG_INT16_MAX - queue_overflow - 1);
                                827                 :                : 
                                828                 :                :     /* Give the strategy a chance to limit the number of buffers we pin. */
  759                           829                 :         574103 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
                                830                 :         574103 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
                                831                 :                : 
                                832                 :                :     /*
                                833                 :                :      * Also limit our queue to the maximum number of pins we could ever be
                                834                 :                :      * allowed to acquire according to the buffer manager.  We may not really
                                835                 :                :      * be able to use them all due to other pins held by this backend, but
                                836                 :                :      * we'll check that later in read_stream_start_pending_read().
                                837                 :                :      */
  762                           838         [ +  + ]:         574103 :     if (SmgrIsTemp(smgr))
  417                           839                 :           9288 :         max_possible_buffer_limit = GetLocalPinLimit();
                                840                 :                :     else
                                841                 :         564815 :         max_possible_buffer_limit = GetPinLimit();
                                842                 :         574103 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
                                843                 :                : 
                                844                 :                :     /*
                                845                 :                :      * The limit might be zero on a system configured with too few buffers for
                                846                 :                :      * the number of connections.  We need at least one to make progress.
                                847                 :                :      */
                                848         [ +  + ]:         574103 :     max_pinned_buffers = Max(1, max_pinned_buffers);
                                849                 :                : 
                                850                 :                :     /*
                                851                 :                :      * We need one extra entry for buffers and per-buffer data, because users
                                852                 :                :      * of per-buffer data have access to the object until the next call to
                                853                 :                :      * read_stream_next_buffer(), so we need a gap between the head and tail
                                854                 :                :      * of the queue so that we don't clobber it.
                                855                 :                :      */
  762                           856                 :         574103 :     queue_size = max_pinned_buffers + 1;
                                857                 :                : 
                                858                 :                :     /*
                                859                 :                :      * Allocate the object, the buffers, the ios and per_buffer_data space in
                                860                 :                :      * one big chunk.  Though we have queue_size buffers, we want to be able
                                861                 :                :      * to assume that all the buffers for a single read are contiguous (i.e.
                                862                 :                :      * don't wrap around halfway through), so we allow temporary overflows of
                                863                 :                :      * up to the maximum possible overflow size.
                                864                 :                :      */
                                865                 :         574103 :     size = offsetof(ReadStream, buffers);
  418                           866                 :         574103 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
  762                           867         [ +  + ]:         574103 :     size += sizeof(InProgressIO) * Max(1, max_ios);
                                868                 :         574103 :     size += per_buffer_data_size * queue_size;
                                869                 :         574103 :     size += MAXIMUM_ALIGNOF * 2;
                                870                 :         574103 :     stream = (ReadStream *) palloc(size);
                                871                 :         574103 :     memset(stream, 0, offsetof(ReadStream, buffers));
                                872                 :         574103 :     stream->ios = (InProgressIO *)
  418                           873                 :         574103 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
  762                           874         [ +  + ]:         574103 :     if (per_buffer_data_size > 0)
                                875                 :          30102 :         stream->per_buffer_data = (void *)
                                876         [ +  + ]:          30102 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
                                877                 :                : 
  401 andres@anarazel.de        878                 :         574103 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
                                879                 :         574103 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
                                880                 :                : 
                                881                 :                : #ifdef USE_PREFETCH
                                882                 :                : 
                                883                 :                :     /*
                                884                 :                :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
                                885                 :                :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
                                886                 :                :      * caller hasn't promised sequential access (overriding our detection
                                887                 :                :      * heuristics), and max_ios hasn't been set to zero.
                                888                 :                :      */
                                889         [ +  + ]:         574103 :     if (stream->sync_mode &&
                                890         [ +  - ]:           3161 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
  762 tmunro@postgresql.or      891   [ +  +  +  - ]:           3161 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
                                892                 :                :         max_ios > 0)
                                893                 :            738 :         stream->advice_enabled = true;
                                894                 :                : #endif
                                895                 :                : 
                                896                 :                :     /*
                                897                 :                :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
                                898                 :                :      * we still need to allocate space to combine and run one I/O.  Bump it up
                                899                 :                :      * to one, and remember to ask for synchronous I/O only.
                                900                 :                :      */
                                901         [ +  + ]:         574103 :     if (max_ios == 0)
                                902                 :                :     {
                                903                 :              7 :         max_ios = 1;
  401 andres@anarazel.de        904                 :              7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
                                905                 :                :     }
                                906                 :                : 
                                907                 :                :     /*
                                908                 :                :      * Capture stable values for these two GUC-derived numbers for the
                                909                 :                :      * lifetime of this stream, so we don't have to worry about the GUCs
                                910                 :                :      * changing underneath us beyond this point.
                                911                 :                :      */
  762 tmunro@postgresql.or      912                 :         574103 :     stream->max_ios = max_ios;
  418                           913                 :         574103 :     stream->io_combine_limit = io_combine_limit;
                                914                 :                : 
  762                           915                 :         574103 :     stream->per_buffer_data_size = per_buffer_data_size;
                                916                 :         574103 :     stream->max_pinned_buffers = max_pinned_buffers;
                                917                 :         574103 :     stream->queue_size = queue_size;
                                918                 :         574103 :     stream->callback = callback;
                                919                 :         574103 :     stream->callback_private_data = callback_private_data;
  612                           920                 :         574103 :     stream->buffered_blocknum = InvalidBlockNumber;
  416                           921                 :         574103 :     stream->seq_blocknum = InvalidBlockNumber;
                                922                 :         574103 :     stream->seq_until_processed = InvalidBlockNumber;
  417                           923                 :         574103 :     stream->temporary = SmgrIsTemp(smgr);
   34 andres@anarazel.de        924                 :GNC      574103 :     stream->distance_decay_holdoff = 0;
                                925                 :                : 
                                926                 :                :     /*
                                927                 :                :      * Skip the initial ramp-up phase if the caller says we're going to be
                                928                 :                :      * reading the whole relation.  This way we start out assuming we'll be
                                929                 :                :      * doing full io_combine_limit sized reads.
                                930                 :                :      */
  762 tmunro@postgresql.or      931         [ +  + ]:CBC      574103 :     if (flags & READ_STREAM_FULL)
                                932                 :                :     {
   30 andres@anarazel.de        933                 :GNC       76966 :         stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                934                 :          76966 :         stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                935                 :                :     }
                                936                 :                :     else
                                937                 :                :     {
                                938                 :         497137 :         stream->readahead_distance = 1;
                                939                 :         497137 :         stream->combine_distance = 1;
                                940                 :                :     }
                                941                 :         574103 :     stream->resume_readahead_distance = stream->readahead_distance;
                                942                 :         574103 :     stream->resume_combine_distance = stream->combine_distance;
                                943                 :                : 
                                944                 :                :     /*
                                945                 :                :      * Since we always access the same relation, we can initialize parts of
                                946                 :                :      * the ReadBuffersOperation objects and leave them that way, to avoid
                                947                 :                :      * wasting CPU cycles writing to them for each read.
                                948                 :                :      */
  762 tmunro@postgresql.or      949         [ +  + ]:CBC     9788410 :     for (int i = 0; i < max_ios; ++i)
                                950                 :                :     {
                                951                 :        9214307 :         stream->ios[i].op.rel = rel;
  654 noah@leadboat.com         952                 :        9214307 :         stream->ios[i].op.smgr = smgr;
                                953                 :        9214307 :         stream->ios[i].op.persistence = persistence;
  762 tmunro@postgresql.or      954                 :        9214307 :         stream->ios[i].op.forknum = forknum;
                                955                 :        9214307 :         stream->ios[i].op.strategy = strategy;
                                956                 :                :     }
                                957                 :                : 
                                958                 :         574103 :     return stream;
                                959                 :                : }
                                960                 :                : 
                                961                 :                : /*
                                962                 :                :  * Create a new read stream for reading a relation.
                                963                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                964                 :                :  */
                                965                 :                : ReadStream *
  654 noah@leadboat.com         966                 :         504141 : read_stream_begin_relation(int flags,
                                967                 :                :                            BufferAccessStrategy strategy,
                                968                 :                :                            Relation rel,
                                969                 :                :                            ForkNumber forknum,
                                970                 :                :                            ReadStreamBlockNumberCB callback,
                                971                 :                :                            void *callback_private_data,
                                972                 :                :                            size_t per_buffer_data_size)
                                973                 :                : {
                                974                 :         504141 :     return read_stream_begin_impl(flags,
                                975                 :                :                                   strategy,
                                976                 :                :                                   rel,
                                977                 :                :                                   RelationGetSmgr(rel),
                                978                 :         504141 :                                   rel->rd_rel->relpersistence,
                                979                 :                :                                   forknum,
                                980                 :                :                                   callback,
                                981                 :                :                                   callback_private_data,
                                982                 :                :                                   per_buffer_data_size);
                                983                 :                : }
                                984                 :                : 
                                985                 :                : /*
                                986                 :                :  * Create a new read stream for reading a SMgr relation.
                                987                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                988                 :                :  */
                                989                 :                : ReadStream *
                                990                 :          69962 : read_stream_begin_smgr_relation(int flags,
                                991                 :                :                                 BufferAccessStrategy strategy,
                                992                 :                :                                 SMgrRelation smgr,
                                993                 :                :                                 char smgr_persistence,
                                994                 :                :                                 ForkNumber forknum,
                                995                 :                :                                 ReadStreamBlockNumberCB callback,
                                996                 :                :                                 void *callback_private_data,
                                997                 :                :                                 size_t per_buffer_data_size)
                                998                 :                : {
                                999                 :          69962 :     return read_stream_begin_impl(flags,
                               1000                 :                :                                   strategy,
                               1001                 :                :                                   NULL,
                               1002                 :                :                                   smgr,
                               1003                 :                :                                   smgr_persistence,
                               1004                 :                :                                   forknum,
                               1005                 :                :                                   callback,
                               1006                 :                :                                   callback_private_data,
                               1007                 :                :                                   per_buffer_data_size);
                               1008                 :                : }
                               1009                 :                : 
                               1010                 :                : /*
                               1011                 :                :  * Pull one pinned buffer out of a stream.  Each call returns successive
                               1012                 :                :  * blocks in the order specified by the callback.  If per_buffer_data_size was
                               1013                 :                :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
                               1014                 :                :  * per-buffer data that the callback had a chance to populate, which remains
                               1015                 :                :  * valid until the next call to read_stream_next_buffer().  When the stream
                               1016                 :                :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
                               1017                 :                :  * the stream early at any time by calling read_stream_end().
                               1018                 :                :  */
                               1019                 :                : Buffer
  762 tmunro@postgresql.or     1020                 :        7187525 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                               1021                 :                : {
                               1022                 :                :     Buffer      buffer;
                               1023                 :                :     int16       oldest_buffer_index;
                               1024                 :                : 
                               1025                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                               1026                 :                : 
                               1027                 :                :     /*
                               1028                 :                :      * A fast path for all-cached scans.  This is the same as the usual
                               1029                 :                :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
                               1030                 :                :      * we can skip the queue management code, stay in the same buffer slot and
                               1031                 :                :      * use singular StartReadBuffer().
                               1032                 :                :      */
                               1033         [ +  + ]:        7187525 :     if (likely(stream->fast_path))
                               1034                 :                :     {
                               1035                 :                :         BlockNumber next_blocknum;
                               1036                 :                : 
                               1037                 :                :         /* Fast path assumptions. */
                               1038         [ -  + ]:        2636653 :         Assert(stream->ios_in_progress == 0);
  410                          1039         [ -  + ]:        2636653 :         Assert(stream->forwarded_buffers == 0);
  762                          1040         [ -  + ]:        2636653 :         Assert(stream->pinned_buffers == 1);
   30 andres@anarazel.de       1041         [ -  + ]:GNC     2636653 :         Assert(stream->readahead_distance == 1);
                               1042         [ -  + ]:        2636653 :         Assert(stream->combine_distance == 1);
  759 tmunro@postgresql.or     1043         [ -  + ]:CBC     2636653 :         Assert(stream->pending_read_nblocks == 0);
  762                          1044         [ -  + ]:        2636653 :         Assert(stream->per_buffer_data_size == 0);
  410                          1045         [ -  + ]:        2636653 :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
                               1046                 :                : 
                               1047                 :                :         /* We're going to return the buffer we pinned last time. */
  762                          1048                 :        2636653 :         oldest_buffer_index = stream->oldest_buffer_index;
                               1049         [ -  + ]:        2636653 :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
                               1050                 :                :                stream->next_buffer_index);
                               1051                 :        2636653 :         buffer = stream->buffers[oldest_buffer_index];
                               1052         [ -  + ]:        2636653 :         Assert(buffer != InvalidBuffer);
                               1053                 :                : 
                               1054                 :                :         /* Choose the next block to pin. */
  612                          1055                 :        2636653 :         next_blocknum = read_stream_get_block(stream, NULL);
                               1056                 :                : 
  759                          1057         [ +  + ]:        2636653 :         if (likely(next_blocknum != InvalidBlockNumber))
                               1058                 :                :         {
  401 andres@anarazel.de       1059                 :        2533239 :             int         flags = stream->read_buffers_flags;
                               1060                 :                : 
                               1061         [ +  + ]:        2533239 :             if (stream->advice_enabled)
                               1062                 :            571 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                               1063                 :                : 
                               1064                 :                :             /*
                               1065                 :                :              * While in fast-path, execute any IO that we might encounter
                               1066                 :                :              * synchronously. Because we are, right now, only looking one
                               1067                 :                :              * block ahead, dispatching any occasional IO to workers would
                               1068                 :                :              * have the overhead of dispatching to workers, without any
                               1069                 :                :              * realistic chance of the IO completing before we need it. We
                               1070                 :                :              * will switch to non-synchronous IO after this.
                               1071                 :                :              *
                               1072                 :                :              * Arguably we should do so only for worker, as there's far less
                               1073                 :                :              * dispatch overhead with io_uring. However, tests so far have not
                               1074                 :                :              * shown a clear downside and additional io_method awareness here
                               1075                 :                :              * seems not great from an abstraction POV.
                               1076                 :                :              */
   34 andres@anarazel.de       1077                 :GNC     2533239 :             flags |= READ_BUFFERS_SYNCHRONOUSLY;
                               1078                 :                : 
                               1079                 :                :             /*
                               1080                 :                :              * Pin a buffer for the next call.  Same buffer entry, and
                               1081                 :                :              * arbitrary I/O entry (they're all free).  We don't have to
                               1082                 :                :              * adjust pinned_buffers because we're transferring one to caller
                               1083                 :                :              * but pinning one more.
                               1084                 :                :              *
                               1085                 :                :              * In the fast path we don't need to check the pin limit.  We're
                               1086                 :                :              * always allowed at least one pin so that progress can be made,
                               1087                 :                :              * and that's all we need here.  Although two pins are momentarily
                               1088                 :                :              * held at the same time, the model used here is that the stream
                               1089                 :                :              * holds only one, and the other now belongs to the caller.
                               1090                 :                :              */
  759 tmunro@postgresql.or     1091         [ +  + ]:CBC     2533239 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
                               1092                 :                :                                         &stream->buffers[oldest_buffer_index],
                               1093                 :                :                                         next_blocknum,
                               1094                 :                :                                         flags)))
                               1095                 :                :             {
                               1096                 :                :                 /* Fast return. */
   28 tomas.vondra@postgre     1097                 :GNC     2516165 :                 read_stream_count_prefetch(stream);
  759 tmunro@postgresql.or     1098                 :CBC     2516165 :                 return buffer;
                               1099                 :                :             }
                               1100                 :                : 
                               1101                 :                :             /* Next call must wait for I/O for the newly pinned buffer. */
  762                          1102                 :          17074 :             stream->oldest_io_index = 0;
                               1103                 :          17074 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
                               1104                 :          17074 :             stream->ios_in_progress = 1;
                               1105                 :          17074 :             stream->ios[0].buffer_index = oldest_buffer_index;
                               1106                 :          17074 :             stream->seq_blocknum = next_blocknum + 1;
                               1107                 :                : 
                               1108                 :                :             /*
                               1109                 :                :              * XXX: It might be worth triggering additional read-ahead here,
                               1110                 :                :              * to avoid having to effectively do another synchronous IO for
                               1111                 :                :              * the next block (if it were also a miss).
                               1112                 :                :              */
                               1113                 :                : 
                               1114                 :                :             /* update I/O stats */
   28 tomas.vondra@postgre     1115                 :GNC       17074 :             read_stream_count_io(stream, 1, stream->ios_in_progress);
                               1116                 :                : 
                               1117                 :                :             /* update prefetch distance */
                               1118                 :          17074 :             read_stream_count_prefetch(stream);
                               1119                 :                :         }
                               1120                 :                :         else
                               1121                 :                :         {
                               1122                 :                :             /* No more blocks, end of stream. */
   30 andres@anarazel.de       1123                 :         103414 :             stream->readahead_distance = 0;
                               1124                 :         103414 :             stream->combine_distance = 0;
  759 tmunro@postgresql.or     1125                 :CBC      103414 :             stream->oldest_buffer_index = stream->next_buffer_index;
                               1126                 :         103414 :             stream->pinned_buffers = 0;
  410                          1127                 :         103414 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
                               1128                 :                :         }
                               1129                 :                : 
  759                          1130                 :         120488 :         stream->fast_path = false;
  762                          1131                 :         120488 :         return buffer;
                               1132                 :                :     }
                               1133                 :                : #endif
                               1134                 :                : 
                               1135         [ +  + ]:        4550872 :     if (unlikely(stream->pinned_buffers == 0))
                               1136                 :                :     {
                               1137         [ -  + ]:        3345370 :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
                               1138                 :                : 
                               1139                 :                :         /* End of stream reached?  */
   30 andres@anarazel.de       1140         [ +  + ]:GNC     3345370 :         if (stream->readahead_distance == 0)
  762 tmunro@postgresql.or     1141                 :CBC     1903962 :             return InvalidBuffer;
                               1142                 :                : 
                               1143                 :                :         /*
                               1144                 :                :          * The usual order of operations is that we look ahead at the bottom
                               1145                 :                :          * of this function after potentially finishing an I/O and making
                               1146                 :                :          * space for more, but if we're just starting up we'll need to crank
                               1147                 :                :          * the handle to get started.
                               1148                 :                :          */
  416                          1149                 :        1441408 :         read_stream_look_ahead(stream);
                               1150                 :                : 
                               1151                 :                :         /* End of stream reached? */
  762                          1152         [ +  + ]:        1441408 :         if (stream->pinned_buffers == 0)
                               1153                 :                :         {
   30 andres@anarazel.de       1154         [ -  + ]:GNC      784326 :             Assert(stream->readahead_distance == 0);
  762 tmunro@postgresql.or     1155                 :CBC      784326 :             return InvalidBuffer;
                               1156                 :                :         }
                               1157                 :                :     }
                               1158                 :                : 
                               1159                 :                :     /* Grab the oldest pinned buffer and associated per-buffer data. */
                               1160         [ -  + ]:        1862584 :     Assert(stream->pinned_buffers > 0);
                               1161                 :        1862584 :     oldest_buffer_index = stream->oldest_buffer_index;
                               1162   [ +  -  -  + ]:        1862584 :     Assert(oldest_buffer_index >= 0 &&
                               1163                 :                :            oldest_buffer_index < stream->queue_size);
                               1164                 :        1862584 :     buffer = stream->buffers[oldest_buffer_index];
                               1165         [ +  + ]:        1862584 :     if (per_buffer_data)
                               1166                 :         359681 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
                               1167                 :                : 
                               1168         [ -  + ]:        1862584 :     Assert(BufferIsValid(buffer));
                               1169                 :                : 
                               1170                 :                :     /* Do we have to wait for an associated I/O first? */
                               1171         [ +  + ]:        1862584 :     if (stream->ios_in_progress > 0 &&
                               1172         [ +  + ]:         745072 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
                               1173                 :                :     {
                               1174                 :         697425 :         int16       io_index = stream->oldest_io_index;
                               1175                 :                :         bool        needed_wait;
                               1176                 :                : 
                               1177                 :                :         /* Sanity check that we still agree on the buffers. */
                               1178         [ -  + ]:         697425 :         Assert(stream->ios[io_index].op.buffers ==
                               1179                 :                :                &stream->buffers[oldest_buffer_index]);
                               1180                 :                : 
   30 andres@anarazel.de       1181                 :GNC      697425 :         needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
                               1182                 :                : 
  762 tmunro@postgresql.or     1183         [ -  + ]:CBC      697394 :         Assert(stream->ios_in_progress > 0);
                               1184                 :         697394 :         stream->ios_in_progress--;
                               1185         [ +  + ]:         697394 :         if (++stream->oldest_io_index == stream->max_ios)
                               1186                 :          28578 :             stream->oldest_io_index = 0;
                               1187                 :                : 
                               1188                 :                :         /*
                               1189                 :                :          * If the IO was executed synchronously, we will never see
                               1190                 :                :          * WaitReadBuffers() block. Treat it as if it did block. This is
                               1191                 :                :          * particularly crucial when effective_io_concurrency=0 is used, as
                               1192                 :                :          * all IO will be synchronous.  Without treating synchronous IO as
                               1193                 :                :          * having waited, we'd never allow the distance to get large enough to
                               1194                 :                :          * allow for IO combining, resulting in bad performance.
                               1195                 :                :          */
   30 andres@anarazel.de       1196         [ +  + ]:GNC      697394 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
                               1197                 :          17541 :             needed_wait = true;
                               1198                 :                : 
                               1199                 :                :         /* Count it as a wait if we need to wait for IO */
   28 tomas.vondra@postgre     1200         [ +  + ]:         697394 :         if (needed_wait)
                               1201                 :         317670 :             read_stream_count_wait(stream);
                               1202                 :                : 
                               1203                 :                :         /*
                               1204                 :                :          * Have the read-ahead distance ramp up rapidly after we needed to
                               1205                 :                :          * wait for IO. We only increase the read-ahead-distance when we
                               1206                 :                :          * needed to wait, to avoid increasing the distance further than
                               1207                 :                :          * necessary, as looking ahead too far can be costly, both due to the
                               1208                 :                :          * cost of unnecessarily pinning many buffers and due to doing IOs
                               1209                 :                :          * that may never be consumed if the stream is ended/reset before
                               1210                 :                :          * completion.
                               1211                 :                :          *
                               1212                 :                :          * If we did not need to wait, the current distance was evidently
                               1213                 :                :          * sufficient.
                               1214                 :                :          *
                               1215                 :                :          * NB: Must not increase the distance if we already reached the end of
                               1216                 :                :          * the stream, as stream->readahead_distance == 0 is used to keep
                               1217                 :                :          * track of having reached the end.
                               1218                 :                :          */
   30 andres@anarazel.de       1219   [ +  +  +  + ]:         697394 :         if (stream->readahead_distance > 0 && needed_wait)
                               1220                 :                :         {
                               1221                 :                :             /* wider temporary value, due to overflow risk */
                               1222                 :                :             int32       readahead_distance;
                               1223                 :                : 
                               1224                 :         296615 :             readahead_distance = stream->readahead_distance * 2;
                               1225                 :         296615 :             readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
                               1226                 :         296615 :             stream->readahead_distance = readahead_distance;
                               1227                 :                :         }
                               1228                 :                : 
                               1229                 :                :         /*
                               1230                 :                :          * As we needed IO, prevent distances from being reduced within our
                               1231                 :                :          * maximum look-ahead window. This avoids collapsing distances too
                               1232                 :                :          * quickly in workloads where most of the required blocks are cached,
                               1233                 :                :          * but where the remaining IOs are a sufficient enough factor to cause
                               1234                 :                :          * a substantial slowdown if executed synchronously.
                               1235                 :                :          *
                               1236                 :                :          * There are valid arguments for preventing decay for max_ios or for
                               1237                 :                :          * max_pinned_buffers.  But the argument for max_pinned_buffers seems
                               1238                 :                :          * clearer - if we can't see any misses within the maximum look-ahead
                               1239                 :                :          * distance, we can't do any useful read-ahead.
                               1240                 :                :          */
   34                          1241                 :         697394 :         stream->distance_decay_holdoff = stream->max_pinned_buffers;
                               1242                 :                : 
                               1243                 :                :         /*
                               1244                 :                :          * Whether we needed to wait or not, allow for more IO combining if we
                               1245                 :                :          * needed to do IO. The reason to do so independent of needing to wait
                               1246                 :                :          * is that when the data is resident in the kernel page cache, IO
                               1247                 :                :          * combining reduces the syscall / dispatch overhead, making it
                               1248                 :                :          * worthwhile regardless of needing to wait.
                               1249                 :                :          *
                               1250                 :                :          * It is also important with io_uring as it will never signal the need
                               1251                 :                :          * to wait for reads if all the data is in the page cache. There are
                               1252                 :                :          * heuristics to deal with that in method_io_uring.c, but they only
                               1253                 :                :          * work when the IO gets large enough.
                               1254                 :                :          */
   30                          1255         [ +  + ]:         697394 :         if (stream->combine_distance > 0 &&
                               1256         [ +  + ]:         647438 :             stream->combine_distance < stream->io_combine_limit)
                               1257                 :                :         {
                               1258                 :                :             /* wider temporary value, due to overflow risk */
                               1259                 :                :             int32       combine_distance;
                               1260                 :                : 
                               1261                 :         638958 :             combine_distance = stream->combine_distance * 2;
                               1262                 :         638958 :             combine_distance = Min(combine_distance, stream->io_combine_limit);
                               1263                 :         638958 :             combine_distance = Min(combine_distance, stream->max_pinned_buffers);
                               1264                 :         638958 :             stream->combine_distance = combine_distance;
                               1265                 :                :         }
                               1266                 :                : 
                               1267                 :                :         /*
                               1268                 :                :          * If we've reached the first block of a sequential region we're
                               1269                 :                :          * issuing advice for, cancel that until the next jump.  The kernel
                               1270                 :                :          * will see the sequential preadv() pattern starting here.
                               1271                 :                :          */
  416 tmunro@postgresql.or     1272         [ +  + ]:CBC      697394 :         if (stream->advice_enabled &&
                               1273         [ +  + ]:            319 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
                               1274                 :            275 :             stream->seq_until_processed = InvalidBlockNumber;
                               1275                 :                :     }
                               1276                 :                : 
                               1277                 :                :     /*
                               1278                 :                :      * We must zap this queue entry, or else it would appear as a forwarded
                               1279                 :                :      * buffer.  If it's potentially in the overflow zone (ie from a
                               1280                 :                :      * multi-block I/O that wrapped around the queue), also zap the copy.
                               1281                 :                :      */
  762                          1282                 :        1862553 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
  410                          1283         [ +  + ]:        1862553 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
                               1284                 :        1472550 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
                               1285                 :                :             InvalidBuffer;
                               1286                 :                : 
                               1287                 :                : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
                               1288                 :                : 
                               1289                 :                :     /*
                               1290                 :                :      * The caller will get access to the per-buffer data, until the next call.
                               1291                 :                :      * We wipe the one before, which is never occupied because queue_size
                               1292                 :                :      * allowed one extra element.  This will hopefully trip up client code
                               1293                 :                :      * that is holding a dangling pointer to it.
                               1294                 :                :      */
  762                          1295         [ +  + ]:        1862553 :     if (stream->per_buffer_data)
                               1296                 :                :     {
                               1297                 :                :         void       *per_buffer_data;
                               1298                 :                : 
  444                          1299         [ +  + ]:         719392 :         per_buffer_data = get_per_buffer_data(stream,
                               1300                 :                :                                               oldest_buffer_index == 0 ?
                               1301                 :          78744 :                                               stream->queue_size - 1 :
                               1302                 :         280952 :                                               oldest_buffer_index - 1);
                               1303                 :                : 
                               1304                 :                : #if defined(CLOBBER_FREED_MEMORY)
                               1305                 :                :         /* This also tells Valgrind the memory is "noaccess". */
                               1306                 :         359696 :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
                               1307                 :                : #elif defined(USE_VALGRIND)
                               1308                 :                :         /* Tell it ourselves. */
                               1309                 :                :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
                               1310                 :                :                                    stream->per_buffer_data_size);
                               1311                 :                : #endif
                               1312                 :                :     }
                               1313                 :                : #endif
                               1314                 :                : 
   28 tomas.vondra@postgre     1315                 :GNC     1862553 :     read_stream_count_prefetch(stream);
                               1316                 :                : 
                               1317                 :                :     /* Pin transferred to caller. */
  762 tmunro@postgresql.or     1318         [ -  + ]:CBC     1862553 :     Assert(stream->pinned_buffers > 0);
                               1319                 :        1862553 :     stream->pinned_buffers--;
                               1320                 :                : 
                               1321                 :                :     /* Advance oldest buffer, with wrap-around. */
                               1322                 :        1862553 :     stream->oldest_buffer_index++;
                               1323         [ +  + ]:        1862553 :     if (stream->oldest_buffer_index == stream->queue_size)
                               1324                 :         338453 :         stream->oldest_buffer_index = 0;
                               1325                 :                : 
                               1326                 :                :     /* Prepare for the next call. */
  416                          1327                 :        1862553 :     read_stream_look_ahead(stream);
                               1328                 :                : 
                               1329                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                               1330                 :                :     /* See if we can take the fast path for all-cached scans next time. */
  762                          1331         [ +  + ]:        1862545 :     if (stream->ios_in_progress == 0 &&
  410                          1332         [ +  + ]:        1215613 :         stream->forwarded_buffers == 0 &&
  762                          1333         [ +  + ]:        1212402 :         stream->pinned_buffers == 1 &&
   30 andres@anarazel.de       1334         [ +  + ]:GNC      556963 :         stream->readahead_distance == 1 &&
                               1335         [ +  + ]:         471756 :         stream->combine_distance == 1 &&
  759 tmunro@postgresql.or     1336         [ +  + ]:CBC      468396 :         stream->pending_read_nblocks == 0 &&
  762                          1337         [ +  + ]:         467202 :         stream->per_buffer_data_size == 0)
                               1338                 :                :     {
                               1339                 :                :         /*
                               1340                 :                :          * The fast path spins on one buffer entry repeatedly instead of
                               1341                 :                :          * rotating through the whole queue and clearing the entries behind
                               1342                 :                :          * it.  If the buffer it starts with happened to be forwarded between
                               1343                 :                :          * StartReadBuffers() calls and also wrapped around the circular queue
                               1344                 :                :          * partway through, then a copy also exists in the overflow zone, and
                               1345                 :                :          * it won't clear it out as the regular path would.  Do that now, so
                               1346                 :                :          * it doesn't need code for that.
                               1347                 :                :          */
  269                          1348         [ +  + ]:         233704 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
                               1349                 :         231904 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
                               1350                 :                :                 InvalidBuffer;
                               1351                 :                : 
  762                          1352                 :         233704 :         stream->fast_path = true;
                               1353                 :                :     }
                               1354                 :                : #endif
                               1355                 :                : 
                               1356                 :        1862545 :     return buffer;
                               1357                 :                : }
                               1358                 :                : 
                               1359                 :                : /*
                               1360                 :                :  * Transitional support for code that would like to perform or skip reads
                               1361                 :                :  * itself, without using the stream.  Returns, and consumes, the next block
                               1362                 :                :  * number that would be read by the stream's look-ahead algorithm, or
                               1363                 :                :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
                               1364                 :                :  * strategy that would be used to read it.
                               1365                 :                :  */
                               1366                 :                : BlockNumber
  594 tmunro@postgresql.or     1367                 :UBC           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
                               1368                 :                : {
                               1369                 :              0 :     *strategy = stream->ios[0].op.strategy;
                               1370                 :              0 :     return read_stream_get_block(stream, NULL);
                               1371                 :                : }
                               1372                 :                : 
                               1373                 :                : /*
                               1374                 :                :  * Temporarily stop consuming block numbers from the block number callback.
                               1375                 :                :  * If called inside the block number callback, its return value should be
                               1376                 :                :  * returned by the callback.
                               1377                 :                :  */
                               1378                 :                : BlockNumber
   63 melanieplageman@gmai     1379                 :UNC           0 : read_stream_pause(ReadStream *stream)
                               1380                 :                : {
   30 andres@anarazel.de       1381                 :              0 :     stream->resume_readahead_distance = stream->readahead_distance;
                               1382                 :              0 :     stream->resume_combine_distance = stream->combine_distance;
                               1383                 :              0 :     stream->readahead_distance = 0;
                               1384                 :              0 :     stream->combine_distance = 0;
   63 melanieplageman@gmai     1385                 :              0 :     return InvalidBlockNumber;
                               1386                 :                : }
                               1387                 :                : 
                               1388                 :                : /*
                               1389                 :                :  * Resume looking ahead after the block number callback reported
                               1390                 :                :  * end-of-stream. This is useful for streams of self-referential blocks, after
                               1391                 :                :  * a buffer needed to be consumed and examined to find more block numbers.
                               1392                 :                :  */
                               1393                 :                : void
                               1394                 :              0 : read_stream_resume(ReadStream *stream)
                               1395                 :                : {
   30 andres@anarazel.de       1396                 :              0 :     stream->readahead_distance = stream->resume_readahead_distance;
                               1397                 :              0 :     stream->combine_distance = stream->resume_combine_distance;
   63 melanieplageman@gmai     1398                 :              0 : }
                               1399                 :                : 
                               1400                 :                : /*
                               1401                 :                :  * Reset a read stream by releasing any queued up buffers, allowing the stream
                               1402                 :                :  * to be used again for different blocks.  This can be used to clear an
                               1403                 :                :  * end-of-stream condition and start again, or to throw away blocks that were
                               1404                 :                :  * speculatively read and read some different blocks instead.
                               1405                 :                :  */
                               1406                 :                : void
  762 tmunro@postgresql.or     1407                 :CBC     1442370 : read_stream_reset(ReadStream *stream)
                               1408                 :                : {
                               1409                 :                :     int16       index;
                               1410                 :                :     Buffer      buffer;
                               1411                 :                : 
                               1412                 :                :     /* Stop looking ahead. */
   30 andres@anarazel.de       1413                 :GNC     1442370 :     stream->readahead_distance = 0;
                               1414                 :        1442370 :     stream->combine_distance = 0;
                               1415                 :                : 
                               1416                 :                :     /* Forget buffered block number and fast path state. */
  612 tmunro@postgresql.or     1417                 :CBC     1442370 :     stream->buffered_blocknum = InvalidBlockNumber;
  759                          1418                 :        1442370 :     stream->fast_path = false;
                               1419                 :                : 
                               1420                 :                :     /* Unpin anything that wasn't consumed. */
  762                          1421         [ +  + ]:        1578084 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
                               1422                 :         135714 :         ReleaseBuffer(buffer);
                               1423                 :                : 
                               1424                 :                :     /* Unpin any unused forwarded buffers. */
  410                          1425                 :        1442370 :     index = stream->next_buffer_index;
                               1426         [ +  + ]:        1442370 :     while (index < stream->initialized_buffers &&
                               1427         [ -  + ]:         217842 :            (buffer = stream->buffers[index]) != InvalidBuffer)
                               1428                 :                :     {
  410 tmunro@postgresql.or     1429         [ #  # ]:UBC           0 :         Assert(stream->forwarded_buffers > 0);
                               1430                 :              0 :         stream->forwarded_buffers--;
                               1431                 :              0 :         ReleaseBuffer(buffer);
                               1432                 :                : 
                               1433                 :              0 :         stream->buffers[index] = InvalidBuffer;
                               1434         [ #  # ]:              0 :         if (index < stream->io_combine_limit - 1)
                               1435                 :              0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
                               1436                 :                : 
                               1437         [ #  # ]:              0 :         if (++index == stream->queue_size)
                               1438                 :              0 :             index = 0;
                               1439                 :                :     }
                               1440                 :                : 
  410 tmunro@postgresql.or     1441         [ -  + ]:CBC     1442370 :     Assert(stream->forwarded_buffers == 0);
  762                          1442         [ -  + ]:        1442370 :     Assert(stream->pinned_buffers == 0);
                               1443         [ -  + ]:        1442370 :     Assert(stream->ios_in_progress == 0);
                               1444                 :                : 
                               1445                 :                :     /* Start off assuming data is cached. */
   30 andres@anarazel.de       1446                 :GNC     1442370 :     stream->readahead_distance = 1;
                               1447                 :        1442370 :     stream->combine_distance = 1;
                               1448                 :        1442370 :     stream->resume_readahead_distance = stream->readahead_distance;
                               1449                 :        1442370 :     stream->resume_combine_distance = stream->combine_distance;
   34                          1450                 :        1442370 :     stream->distance_decay_holdoff = 0;
  762 tmunro@postgresql.or     1451                 :CBC     1442370 : }
                               1452                 :                : 
                               1453                 :                : /*
                               1454                 :                :  * Release and free a read stream.
                               1455                 :                :  */
                               1456                 :                : void
                               1457                 :         570866 : read_stream_end(ReadStream *stream)
                               1458                 :                : {
                               1459                 :         570866 :     read_stream_reset(stream);
                               1460                 :         570866 :     pfree(stream);
                               1461                 :         570866 : }
        

Generated by: LCOV version 2.5.0-beta