LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB DCB
Current: 380a8b2ea024c33a35e7abc8628e7c4f52f9f9f9 vs db5ed03217b9c238703df8b4b286115d6e940488 Lines: 90.6 % 406 368 17 21 103 265 28
Current Date: 2026-05-29 21:51:00 -0400 Functions: 81.0 % 21 17 2 2 11 6
Baseline: lcov-20260530-034037-baseline Branches: 78.8 % 316 249 11 56 67 182 5 29
Baseline Date: 2026-05-29 14:39:03 -0700 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 2 2 2
(30,360] days: 86.6 % 127 110 17 103 7
(360..) days: 92.4 % 277 256 21 256
Function coverage date bins:
(30,360] days: 75.0 % 8 6 2 6
(360..) days: 84.6 % 13 11 2 5 6
Branch coverage date bins:
(7,30] days: 87.5 % 8 7 1 7
(30,360] days: 84.8 % 92 78 11 3 67 11
(360..) days: 75.9 % 216 164 52 164

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * read_stream.c
                                  4                 :                :  *    Mechanism for accessing buffered relation data with look-ahead
                                  5                 :                :  *
                                  6                 :                :  * Code that needs to access relation data typically pins blocks one at a
                                  7                 :                :  * time, often in a predictable order that might be sequential or data-driven.
                                  8                 :                :  * Calling the simple ReadBuffer() function for each block is inefficient,
                                  9                 :                :  * because blocks that are not yet in the buffer pool require I/O operations
                                 10                 :                :  * that are small and might stall waiting for storage.  This mechanism looks
                                 11                 :                :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
                                 12                 :                :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
                                 13                 :                :  * distance.
                                 14                 :                :  *
                                 15                 :                :  * A user-provided callback generates a stream of block numbers that is used
                                 16                 :                :  * to form reads of up to io_combine_limit, by attempting to merge them with a
                                 17                 :                :  * pending read.  When that isn't possible, the existing pending read is sent
                                 18                 :                :  * to StartReadBuffers() so that a new one can begin to form.
                                 19                 :                :  *
                                 20                 :                :  * The algorithm for controlling the look-ahead distance is based on recent
                                 21                 :                :  * cache / miss history, as well as whether we need to wait for I/O completion
                                 22                 :                :  * after a miss.  When no I/O is necessary, there is no benefit in looking
                                 23                 :                :  * ahead more than one block.  This is the default initial assumption.  When
                                 24                 :                :  * blocks needing I/O are streamed, the combine distance is increased to
                                 25                 :                :  * benefit from I/O combining and the read-ahead distance is increased
                                 26                 :                :  * whenever we need to wait for I/O to try to benefit from increased I/O
                                 27                 :                :  * concurrency. Both are reduced gradually when cached blocks are streamed.
                                 28                 :                :  *
                                 29                 :                :  * The main data structure is a circular queue of buffers of size
                                 30                 :                :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
                                 31                 :                :  * returned by read_stream_next_buffer().  Each buffer also has an optional
                                 32                 :                :  * variable sized object that is passed from the callback to the consumer of
                                 33                 :                :  * buffers.
                                 34                 :                :  *
                                 35                 :                :  * Parallel to the queue of buffers, there is a circular queue of in-progress
                                 36                 :                :  * I/Os that have been started with StartReadBuffers(), and for which
                                 37                 :                :  * WaitReadBuffers() must be called before returning the buffer.
                                 38                 :                :  *
                                 39                 :                :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
                                 40                 :                :  * successive calls, then these data structures might appear as follows:
                                 41                 :                :  *
                                 42                 :                :  *                          buffers buf/data       ios
                                 43                 :                :  *
                                 44                 :                :  *                          +----+  +-----+       +--------+
                                 45                 :                :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
                                 46                 :                :  *                          +----+  +-----+  |    +--------+
                                 47                 :                :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
                                 48                 :                :  *                          +----+  +-----+  | |  +--------+
                                 49                 :                :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
                                 50                 :                :  *                          +----+  +-----+    |  +--------+
                                 51                 :                :  *                          | 43 |  |  ?  |    |  |        |
                                 52                 :                :  *                          +----+  +-----+    |  +--------+
                                 53                 :                :  *                          | 44 |  |  ?  |    |  |        |
                                 54                 :                :  *                          +----+  +-----+    |  +--------+
                                 55                 :                :  *                          | 60 |  |  ?  |<---+
                                 56                 :                :  *                          +----+  +-----+
                                 57                 :                :  *     next_buffer_index -> |    |  |     |
                                 58                 :                :  *                          +----+  +-----+
                                 59                 :                :  *
                                 60                 :                :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
                                 61                 :                :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
                                 62                 :                :  * the range 42..44 requires an I/O wait before its buffers are returned, as
                                 63                 :                :  * does block 60.
                                 64                 :                :  *
                                 65                 :                :  *
                                 66                 :                :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
                                 67                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 68                 :                :  *
                                 69                 :                :  * IDENTIFICATION
                                 70                 :                :  *    src/backend/storage/aio/read_stream.c
                                 71                 :                :  *
                                 72                 :                :  *-------------------------------------------------------------------------
                                 73                 :                :  */
                                 74                 :                : #include "postgres.h"
                                 75                 :                : 
                                 76                 :                : #include "miscadmin.h"
                                 77                 :                : #include "executor/instrument_node.h"
                                 78                 :                : #include "storage/aio.h"
                                 79                 :                : #include "storage/fd.h"
                                 80                 :                : #include "storage/smgr.h"
                                 81                 :                : #include "storage/read_stream.h"
                                 82                 :                : #include "utils/memdebug.h"
                                 83                 :                : #include "utils/rel.h"
                                 84                 :                : #include "utils/spccache.h"
                                 85                 :                : 
                                 86                 :                : typedef struct InProgressIO
                                 87                 :                : {
                                 88                 :                :     int16       buffer_index;
                                 89                 :                :     ReadBuffersOperation op;
                                 90                 :                : } InProgressIO;
                                 91                 :                : 
                                 92                 :                : /*
                                 93                 :                :  * State for managing a stream of reads.
                                 94                 :                :  */
                                 95                 :                : struct ReadStream
                                 96                 :                : {
                                 97                 :                :     int16       max_ios;
                                 98                 :                :     int16       io_combine_limit;
                                 99                 :                :     int16       ios_in_progress;
                                100                 :                :     int16       queue_size;
                                101                 :                :     int16       max_pinned_buffers;
                                102                 :                :     int16       forwarded_buffers;
                                103                 :                :     int16       pinned_buffers;
                                104                 :                : 
                                105                 :                :     /*
                                106                 :                :      * Limit of how far, in blocks, to look-ahead for IO combining and for
                                107                 :                :      * read-ahead.
                                108                 :                :      *
                                109                 :                :      * The limits for read-ahead and combining are handled separately to allow
                                110                 :                :      * for IO combining even in cases where the I/O subsystem can keep up at a
                                111                 :                :      * low read-ahead distance, as doing larger IOs is more efficient.
                                112                 :                :      *
                                113                 :                :      * Set to 0 when the end of the stream is reached.
                                114                 :                :      */
                                115                 :                :     int16       combine_distance;
                                116                 :                :     int16       readahead_distance;
                                117                 :                :     uint16      distance_decay_holdoff;
                                118                 :                :     int16       initialized_buffers;
                                119                 :                :     int16       resume_readahead_distance;
                                120                 :                :     int16       resume_combine_distance;
                                121                 :                :     int         read_buffers_flags;
                                122                 :                :     bool        sync_mode;      /* using io_method=sync */
                                123                 :                :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
                                124                 :                :     bool        advice_enabled;
                                125                 :                :     bool        temporary;
                                126                 :                : 
                                127                 :                :     /* scan stats counters */
                                128                 :                :     IOStats    *stats;
                                129                 :                : 
                                130                 :                :     /*
                                131                 :                :      * One-block buffer to support 'ungetting' a block number, to resolve flow
                                132                 :                :      * control problems when I/Os are split.
                                133                 :                :      */
                                134                 :                :     BlockNumber buffered_blocknum;
                                135                 :                : 
                                136                 :                :     /*
                                137                 :                :      * The callback that will tell us which block numbers to read, and an
                                138                 :                :      * opaque pointer that will be pass to it for its own purposes.
                                139                 :                :      */
                                140                 :                :     ReadStreamBlockNumberCB callback;
                                141                 :                :     void       *callback_private_data;
                                142                 :                : 
                                143                 :                :     /* Next expected block, for detecting sequential access. */
                                144                 :                :     BlockNumber seq_blocknum;
                                145                 :                :     BlockNumber seq_until_processed;
                                146                 :                : 
                                147                 :                :     /* The read operation we are currently preparing. */
                                148                 :                :     BlockNumber pending_read_blocknum;
                                149                 :                :     int16       pending_read_nblocks;
                                150                 :                : 
                                151                 :                :     /* Space for buffers and optional per-buffer private data. */
                                152                 :                :     size_t      per_buffer_data_size;
                                153                 :                :     void       *per_buffer_data;
                                154                 :                : 
                                155                 :                :     /* Read operations that have been started but not waited for yet. */
                                156                 :                :     InProgressIO *ios;
                                157                 :                :     int16       oldest_io_index;
                                158                 :                :     int16       next_io_index;
                                159                 :                : 
                                160                 :                :     bool        fast_path;
                                161                 :                : 
                                162                 :                :     /* Circular queue of buffers. */
                                163                 :                :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
                                164                 :                :     int16       next_buffer_index;  /* Index of next buffer to pin */
                                165                 :                :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
                                166                 :                : };
                                167                 :                : 
                                168                 :                : /*
                                169                 :                :  * Return a pointer to the per-buffer data by index.
                                170                 :                :  */
                                171                 :                : static inline void *
  787 tmunro@postgresql.or      172                 :CBC     3903238 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
                                173                 :                : {
                                174                 :        7806476 :     return (char *) stream->per_buffer_data +
                                175                 :        3903238 :         stream->per_buffer_data_size * buffer_index;
                                176                 :                : }
                                177                 :                : 
                                178                 :                : /*
                                179                 :                :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
                                180                 :                :  * blocks [current_blocknum, last_exclusive).
                                181                 :                :  */
                                182                 :                : BlockNumber
  634 noah@leadboat.com         183                 :         429861 : block_range_read_stream_cb(ReadStream *stream,
                                184                 :                :                            void *callback_private_data,
                                185                 :                :                            void *per_buffer_data)
                                186                 :                : {
                                187                 :         429861 :     BlockRangeReadStreamPrivate *p = callback_private_data;
                                188                 :                : 
                                189         [ +  + ]:         429861 :     if (p->current_blocknum < p->last_exclusive)
                                190                 :         352853 :         return p->current_blocknum++;
                                191                 :                : 
                                192                 :          77008 :     return InvalidBlockNumber;
                                193                 :                : }
                                194                 :                : 
                                195                 :                : /*
                                196                 :                :  * Update stream stats with current pinned buffer depth.
                                197                 :                :  *
                                198                 :                :  * Called once per buffer returned to the consumer in read_stream_next_buffer().
                                199                 :                :  * Records the number of pinned buffers at that moment, so we can compute the
                                200                 :                :  * average look-ahead depth.
                                201                 :                :  */
                                202                 :                : static inline void
   53 tomas.vondra@postgre      203                 :GNC     4394589 : read_stream_count_prefetch(ReadStream *stream)
                                204                 :                : {
                                205                 :        4394589 :     IOStats    *stats = stream->stats;
                                206                 :                : 
                                207         [ +  + ]:        4394589 :     if (stats == NULL)
                                208                 :        4394581 :         return;
                                209                 :                : 
                                210                 :              8 :     stats->prefetch_count++;
                                211                 :              8 :     stats->distance_sum += stream->pinned_buffers;
                                212         [ +  - ]:              8 :     if (stream->pinned_buffers > stats->distance_max)
                                213                 :              8 :         stats->distance_max = stream->pinned_buffers;
                                214                 :                : }
                                215                 :                : 
                                216                 :                : /*
                                217                 :                :  * Update stream stats about size of I/O requests.
                                218                 :                :  *
                                219                 :                :  * We count the number of I/O requests, size of requests (counted in blocks)
                                220                 :                :  * and number of in-progress I/Os.
                                221                 :                :  */
                                222                 :                : static inline void
                                223                 :         698502 : read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
                                224                 :                : {
                                225                 :         698502 :     IOStats    *stats = stream->stats;
                                226                 :                : 
                                227         [ +  - ]:         698502 :     if (stats == NULL)
                                228                 :         698502 :         return;
                                229                 :                : 
   53 tomas.vondra@postgre      230                 :UNC           0 :     stats->io_count++;
                                231                 :              0 :     stats->io_nblocks += nblocks;
                                232                 :              0 :     stats->io_in_progress += in_progress;
                                233                 :                : }
                                234                 :                : 
                                235                 :                : /*
                                236                 :                :  * Update stream stats about waits for I/O when consuming buffers.
                                237                 :                :  *
                                238                 :                :  * We count the number of I/O waits while pulling buffers out of a stream.
                                239                 :                :  */
                                240                 :                : static inline void
   53 tomas.vondra@postgre      241                 :GNC      318570 : read_stream_count_wait(ReadStream *stream)
                                242                 :                : {
                                243                 :         318570 :     IOStats    *stats = stream->stats;
                                244                 :                : 
                                245         [ +  - ]:         318570 :     if (stats == NULL)
                                246                 :         318570 :         return;
                                247                 :                : 
   53 tomas.vondra@postgre      248                 :UNC           0 :     stats->wait_count++;
                                249                 :                : }
                                250                 :                : 
                                251                 :                : /*
                                252                 :                :  * Enable collection of stats into the provided IOStats.
                                253                 :                :  */
                                254                 :                : void
   53 tomas.vondra@postgre      255                 :GNC           8 : read_stream_enable_stats(ReadStream *stream, IOStats *stats)
                                256                 :                : {
                                257                 :              8 :     stream->stats = stats;
                                258         [ +  - ]:              8 :     if (stream->stats)
                                259                 :              8 :         stream->stats->distance_capacity = stream->max_pinned_buffers;
                                260                 :              8 : }
                                261                 :                : 
                                262                 :                : /*
                                263                 :                :  * Ask the callback which block it would like us to read next, with a one block
                                264                 :                :  * buffer in front to allow read_stream_unget_block() to work.
                                265                 :                :  */
                                266                 :                : static inline BlockNumber
  787 tmunro@postgresql.or      267                 :CBC     5820985 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
                                268                 :                : {
                                269                 :                :     BlockNumber blocknum;
                                270                 :                : 
  637                           271                 :        5820985 :     blocknum = stream->buffered_blocknum;
                                272         [ -  + ]:        5820985 :     if (blocknum != InvalidBlockNumber)
  637 tmunro@postgresql.or      273                 :UBC           0 :         stream->buffered_blocknum = InvalidBlockNumber;
                                274                 :                :     else
                                275                 :                :     {
                                276                 :                :         /*
                                277                 :                :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
                                278                 :                :          * the "noaccess" state that was set when the consumer moved past this
                                279                 :                :          * entry last time around the queue, and should also catch callbacks
                                280                 :                :          * that fail to initialize data that the buffer consumer later
                                281                 :                :          * accesses.  On the first go around, it is undefined already.
                                282                 :                :          */
                                283                 :                :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
                                284                 :                :                                     stream->per_buffer_data_size);
  637 tmunro@postgresql.or      285                 :CBC     5820985 :         blocknum = stream->callback(stream,
                                286                 :                :                                     stream->callback_private_data,
                                287                 :                :                                     per_buffer_data);
                                288                 :                :     }
                                289                 :                : 
                                290                 :        5820985 :     return blocknum;
                                291                 :                : }
                                292                 :                : 
                                293                 :                : /*
                                294                 :                :  * In order to deal with buffer shortages and I/O limits after short reads, we
                                295                 :                :  * sometimes need to defer handling of a block we've already consumed from the
                                296                 :                :  * registered callback until later.
                                297                 :                :  */
                                298                 :                : static inline void
  787 tmunro@postgresql.or      299                 :UBC           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
                                300                 :                : {
                                301                 :                :     /* We shouldn't ever unget more than one block. */
  637                           302         [ #  # ]:              0 :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
                                303         [ #  # ]:              0 :     Assert(blocknum != InvalidBlockNumber);
                                304                 :              0 :     stream->buffered_blocknum = blocknum;
  787                           305                 :              0 : }
                                306                 :                : 
                                307                 :                : /*
                                308                 :                :  * Start as much of the current pending read as we can.  If we have to split it
                                309                 :                :  * because of the per-backend buffer limit, or the buffer manager decides to
                                310                 :                :  * split it, then the pending read is adjusted to hold the remaining portion.
                                311                 :                :  *
                                312                 :                :  * We can always start a read of at least size one if we have no progress yet.
                                313                 :                :  * Otherwise it's possible that we can't start a read at all because of a lack
                                314                 :                :  * of buffers, and then false is returned.  Buffer shortages also reduce the
                                315                 :                :  * distance to a level that prevents look-ahead until buffers are released.
                                316                 :                :  */
                                317                 :                : static bool
  441 tmunro@postgresql.or      318                 :CBC     1783012 : read_stream_start_pending_read(ReadStream *stream)
                                319                 :                : {
                                320                 :                :     bool        need_wait;
                                321                 :                :     int         requested_nblocks;
                                322                 :                :     int         nblocks;
                                323                 :                :     int         flags;
                                324                 :                :     int         forwarded;
                                325                 :                :     int16       io_index;
                                326                 :                :     int16       overflow;
                                327                 :                :     int16       buffer_index;
                                328                 :                :     int         buffer_limit;
                                329                 :                : 
                                330                 :                :     /* This should only be called with a pending read. */
  787                           331         [ -  + ]:        1783012 :     Assert(stream->pending_read_nblocks > 0);
  443                           332         [ -  + ]:        1783012 :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
                                333                 :                : 
                                334                 :                :     /* We had better not exceed the per-stream buffer limit with this read. */
  787                           335         [ -  + ]:        1783012 :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
                                336                 :                :            stream->max_pinned_buffers);
                                337                 :                : 
                                338                 :                : #ifdef USE_ASSERT_CHECKING
                                339                 :                :     /* We had better not be overwriting an existing pinned buffer. */
                                340         [ +  + ]:        1783012 :     if (stream->pinned_buffers > 0)
                                341         [ -  + ]:           8444 :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
                                342                 :                :     else
                                343         [ -  + ]:        1774568 :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
                                344                 :                : 
                                345                 :                :     /*
                                346                 :                :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
                                347                 :                :      * had to split the operation should match the leading blocks of this
                                348                 :                :      * following StartReadBuffers() call.
                                349                 :                :      */
  294                           350         [ -  + ]:        1783012 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                351         [ +  + ]:        1785190 :     for (int i = 0; i < stream->forwarded_buffers; ++i)
                                352         [ -  + ]:           2178 :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
                                353                 :                :                stream->pending_read_blocknum + i);
                                354                 :                : 
                                355                 :                :     /*
                                356                 :                :      * Check that we've cleared the queue/overflow entries corresponding to
                                357                 :                :      * the rest of the blocks covered by this read, unless it's the first go
                                358                 :                :      * around and we haven't even initialized them yet.
                                359                 :                :      */
                                360         [ +  + ]:        4181089 :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
                                361   [ +  +  -  + ]:        2398077 :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
                                362                 :                :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
                                363                 :                : #endif
                                364                 :                : 
                                365                 :                :     /* Do we need to issue read-ahead advice? */
  426 andres@anarazel.de        366                 :        1783012 :     flags = stream->read_buffers_flags;
  441 tmunro@postgresql.or      367         [ +  + ]:        1783012 :     if (stream->advice_enabled)
                                368                 :                :     {
                                369         [ +  + ]:           1729 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
                                370                 :                :         {
                                371                 :                :             /*
                                372                 :                :              * Sequential:  Issue advice until the preadv() calls have caught
                                373                 :                :              * up with the first advice issued for this sequential region, and
                                374                 :                :              * then stay out of the way of the kernel's own read-ahead.
                                375                 :                :              */
                                376         [ +  + ]:             30 :             if (stream->seq_until_processed != InvalidBlockNumber)
  426 andres@anarazel.de        377                 :              1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                378                 :                :         }
                                379                 :                :         else
                                380                 :                :         {
                                381                 :                :             /*
                                382                 :                :              * Random jump:  Note the starting location of a new potential
                                383                 :                :              * sequential region and start issuing advice.  Skip it this time
                                384                 :                :              * if the preadv() follows immediately, eg first block in stream.
                                385                 :                :              */
  441 tmunro@postgresql.or      386                 :           1699 :             stream->seq_until_processed = stream->pending_read_blocknum;
                                387         [ +  + ]:           1699 :             if (stream->pinned_buffers > 0)
  426 andres@anarazel.de        388                 :             44 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                                389                 :                :         }
                                390                 :                :     }
                                391                 :                : 
                                392                 :                :     /*
                                393                 :                :      * How many more buffers is this backend allowed?
                                394                 :                :      *
                                395                 :                :      * Forwarded buffers are already pinned and map to the leading blocks of
                                396                 :                :      * the pending read (the remaining portion of an earlier short read that
                                397                 :                :      * we're about to continue).  They are not counted in pinned_buffers, but
                                398                 :                :      * they are counted as pins already held by this backend according to the
                                399                 :                :      * buffer manager, so they must be added to the limit it grants us.
                                400                 :                :      */
  442 tmunro@postgresql.or      401         [ +  + ]:        1783012 :     if (stream->temporary)
                                402         [ +  - ]:          16442 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
                                403                 :                :     else
                                404         [ +  - ]:        1766570 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
  435                           405         [ -  + ]:        1783012 :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
                                406                 :                : 
                                407                 :        1783012 :     buffer_limit += stream->forwarded_buffers;
  418 andres@anarazel.de        408                 :        1783012 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
                                409                 :                : 
  442 tmunro@postgresql.or      410   [ +  +  +  - ]:        1783012 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
                                411                 :         741974 :         buffer_limit = 1;       /* guarantee progress */
                                412                 :                : 
                                413                 :                :     /* Does the per-backend limit affect this read? */
                                414                 :        1783012 :     nblocks = stream->pending_read_nblocks;
                                415         [ +  + ]:        1783012 :     if (buffer_limit < nblocks)
                                416                 :                :     {
                                417                 :                :         int16       new_distance;
                                418                 :                : 
                                419                 :                :         /* Shrink distance: no more look-ahead until buffers are released. */
                                420                 :           1907 :         new_distance = stream->pinned_buffers + buffer_limit;
   55 andres@anarazel.de        421         [ +  + ]:GNC        1907 :         if (stream->readahead_distance > new_distance)
                                422                 :            483 :             stream->readahead_distance = new_distance;
                                423                 :                : 
                                424                 :                :         /* Unless we have nothing to give the consumer, stop here. */
  442 tmunro@postgresql.or      425         [ +  + ]:CBC        1907 :         if (stream->pinned_buffers > 0)
                                426                 :             72 :             return false;
                                427                 :                : 
                                428                 :                :         /* A short read is required to make progress. */
                                429                 :           1835 :         nblocks = buffer_limit;
                                430                 :                :     }
                                431                 :                : 
                                432                 :                :     /*
                                433                 :                :      * We say how many blocks we want to read, but it may be smaller on return
                                434                 :                :      * if the buffer manager decides to shorten the read.  Initialize buffers
                                435                 :                :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
                                436                 :                :      * and keep the original nblocks number so we can check for forwarded
                                437                 :                :      * buffers as output, below.
                                438                 :                :      */
  787                           439                 :        1782940 :     buffer_index = stream->next_buffer_index;
                                440                 :        1782940 :     io_index = stream->next_io_index;
  435                           441         [ +  + ]:        2924817 :     while (stream->initialized_buffers < buffer_index + nblocks)
                                442                 :        1141877 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
                                443                 :        1782940 :     requested_nblocks = nblocks;
  787                           444                 :        1782940 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                445                 :        1782940 :                                  &stream->buffers[buffer_index],
                                446                 :                :                                  stream->pending_read_blocknum,
                                447                 :                :                                  &nblocks,
                                448                 :                :                                  flags);
                                449                 :        1782932 :     stream->pinned_buffers += nblocks;
                                450                 :                : 
                                451                 :                :     /* Remember whether we need to wait before returning this buffer. */
                                452         [ +  + ]:        1782932 :     if (!need_wait)
                                453                 :                :     {
                                454                 :                :         /*
                                455                 :                :          * If there currently is no IO in progress, and we have not needed to
                                456                 :                :          * issue IO recently, decay the look-ahead distance.  We detect if we
                                457                 :                :          * had to issue IO recently by having a decay holdoff that's set to
                                458                 :                :          * the max look-ahead distance whenever we need to do IO.  This is
                                459                 :                :          * important to ensure we eventually reach a high enough distance to
                                460                 :                :          * perform IO asynchronously when starting out with a small look-ahead
                                461                 :                :          * distance.
                                462                 :                :          */
   55 andres@anarazel.de        463         [ +  + ]:GNC     1100965 :         if (stream->ios_in_progress == 0)
                                464                 :                :         {
                                465         [ +  + ]:        1100327 :             if (stream->distance_decay_holdoff > 0)
   59                           466                 :          26522 :                 stream->distance_decay_holdoff--;
                                467                 :                :             else
                                468                 :                :             {
   55                           469         [ +  + ]:        1073805 :                 if (stream->readahead_distance > 1)
                                470                 :          18007 :                     stream->readahead_distance--;
                                471                 :                : 
                                472                 :                :                 /*
                                473                 :                :                  * For now we reduce the IO combine distance after
                                474                 :                :                  * sufficiently many buffer hits. There is no clear
                                475                 :                :                  * performance argument for doing so, but at the moment we
                                476                 :                :                  * need to do so to make the entrance into fast_path work
                                477                 :                :                  * correctly: We require combine_distance == 1 to enter
                                478                 :                :                  * fast-path, as without that condition we would wrongly
                                479                 :                :                  * re-enter fast-path when readahead_distance == 1 and
                                480                 :                :                  * pinned_buffers == 1, as we would not yet have prepared
                                481                 :                :                  * another IO in that situation.
                                482                 :                :                  */
                                483         [ +  + ]:        1073805 :                 if (stream->combine_distance > 1)
                                484                 :          17930 :                     stream->combine_distance--;
                                485                 :                :             }
                                486                 :                :         }
                                487                 :                :     }
                                488                 :                :     else
                                489                 :                :     {
                                490                 :                :         /*
                                491                 :                :          * Remember to call WaitReadBuffers() before returning head buffer.
                                492                 :                :          * Look-ahead distance will be adjusted after waiting.
                                493                 :                :          */
  787 tmunro@postgresql.or      494                 :CBC      681967 :         stream->ios[io_index].buffer_index = buffer_index;
                                495         [ +  + ]:         681967 :         if (++stream->next_io_index == stream->max_ios)
                                496                 :          28758 :             stream->next_io_index = 0;
                                497         [ -  + ]:         681967 :         Assert(stream->ios_in_progress < stream->max_ios);
                                498                 :         681967 :         stream->ios_in_progress++;
                                499                 :         681967 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
                                500                 :                : 
                                501                 :                :         /* update I/O stats */
   53 tomas.vondra@postgre      502                 :GNC      681967 :         read_stream_count_io(stream, nblocks, stream->ios_in_progress);
                                503                 :                :     }
                                504                 :                : 
                                505                 :                :     /*
                                506                 :                :      * How many pins were acquired but forwarded to the next call?  These need
                                507                 :                :      * to be passed to the next StartReadBuffers() call by leaving them
                                508                 :                :      * exactly where they are in the queue, or released if the stream ends
                                509                 :                :      * early.  We need the number for accounting purposes, since they are not
                                510                 :                :      * counted in stream->pinned_buffers but we already hold them.
                                511                 :                :      */
  435 tmunro@postgresql.or      512                 :CBC     1782932 :     forwarded = 0;
                                513         [ +  + ]:        1785113 :     while (nblocks + forwarded < requested_nblocks &&
                                514         [ +  + ]:          67465 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
                                515                 :           2181 :         forwarded++;
                                516                 :        1782932 :     stream->forwarded_buffers = forwarded;
                                517                 :                : 
                                518                 :                :     /*
                                519                 :                :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
                                520                 :                :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
                                521                 :                :      * the front of the array where they'll be consumed, but also leave a copy
                                522                 :                :      * in the overflow zone which the I/O operation has a pointer to (it needs
                                523                 :                :      * a contiguous array).  Both copies will be cleared when the buffers are
                                524                 :                :      * handed to the consumer.
                                525                 :                :      */
                                526                 :        1782932 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
  787                           527         [ +  + ]:        1782932 :     if (overflow > 0)
                                528                 :                :     {
  435                           529         [ -  + ]:            443 :         Assert(overflow < stream->queue_size);    /* can't overlap */
                                530                 :            443 :         memcpy(&stream->buffers[0],
                                531                 :            443 :                &stream->buffers[stream->queue_size],
                                532                 :                :                sizeof(stream->buffers[0]) * overflow);
                                533                 :                :     }
                                534                 :                : 
                                535                 :                :     /* Compute location of start of next read, without using % operator. */
  787                           536                 :        1782932 :     buffer_index += nblocks;
                                537         [ +  + ]:        1782932 :     if (buffer_index >= stream->queue_size)
                                538                 :         345485 :         buffer_index -= stream->queue_size;
                                539   [ +  -  -  + ]:        1782932 :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                540                 :        1782932 :     stream->next_buffer_index = buffer_index;
                                541                 :                : 
                                542                 :                :     /* Adjust the pending read to cover the remaining portion, if any. */
                                543                 :        1782932 :     stream->pending_read_blocknum += nblocks;
                                544                 :        1782932 :     stream->pending_read_nblocks -= nblocks;
                                545                 :                : 
  442                           546                 :        1782932 :     return true;
                                547                 :                : }
                                548                 :                : 
                                549                 :                : /*
                                550                 :                :  * Should we continue to perform look ahead?  Looking ahead may allow us to
                                551                 :                :  * make the pending IO larger via IO combining or to issue more read-ahead.
                                552                 :                :  */
                                553                 :                : static inline bool
   55 andres@anarazel.de        554                 :GNC     5509375 : read_stream_should_look_ahead(ReadStream *stream)
                                555                 :                : {
                                556                 :                :     /* If the callback has signaled end-of-stream, we're done */
                                557         [ +  + ]:        5509375 :     if (stream->readahead_distance == 0)
                                558                 :         329179 :         return false;
                                559                 :                : 
                                560                 :                :     /* never start more IOs than our cap */
                                561         [ -  + ]:        5180196 :     if (stream->ios_in_progress >= stream->max_ios)
   55 andres@anarazel.de        562                 :UNC           0 :         return false;
                                563                 :                : 
                                564                 :                :     /*
                                565                 :                :      * Allow looking further ahead if we are in the process of building a
                                566                 :                :      * larger IO, the IO is not yet big enough, and we don't yet have IO in
                                567                 :                :      * flight.
                                568                 :                :      *
                                569                 :                :      * We do so to allow building larger reads when readahead_distance is
                                570                 :                :      * small (e.g. because the I/O subsystem is keeping up or
                                571                 :                :      * effective_io_concurrency is small). That's a useful goal because larger
                                572                 :                :      * reads are more CPU efficient than smaller reads, even if the system is
                                573                 :                :      * not IO bound.
                                574                 :                :      *
                                575                 :                :      * The reason we do *not* do so when we already have a read prepared (i.e.
                                576                 :                :      * why we check for pinned_buffers == 0) is once we are actually reading
                                577                 :                :      * ahead, we don't need it:
                                578                 :                :      *
                                579                 :                :      * - We won't issue unnecessarily small reads as
                                580                 :                :      * read_stream_should_issue_now() will return false until the IO is
                                581                 :                :      * suitably sized. The issuance of the pending read will be delayed until
                                582                 :                :      * enough buffers have been consumed.
                                583                 :                :      *
                                584                 :                :      * - If we are not reading ahead aggressively enough, future
                                585                 :                :      * WaitReadBuffers() calls will return true, leading to readahead_distance
                                586                 :                :      * being increased. After that more full-sized IOs can be issued.
                                587                 :                :      *
                                588                 :                :      * Furthermore, if we did not have the pinned_buffers == 0 condition, we
                                589                 :                :      * might end up issuing I/O more aggressively than we need.
                                590                 :                :      *
                                591                 :                :      * Note that a return of true here can lead to exceeding the read-ahead
                                592                 :                :      * limit, but we won't exceed the buffer pin limit (because pinned_buffers
                                593                 :                :      * == 0 and combine_distance is capped by max_pinned_buffers).
                                594                 :                :      */
   55 andres@anarazel.de        595         [ +  + ]:GNC     5180196 :     if (stream->pending_read_nblocks > 0 &&
                                596         [ +  + ]:        2286027 :         stream->pinned_buffers == 0 &&
                                597         [ +  + ]:        2161391 :         stream->pending_read_nblocks < stream->combine_distance)
                                598                 :         486290 :         return true;
                                599                 :                : 
                                600                 :                :     /*
                                601                 :                :      * Don't start more read-ahead if that'd put us over the distance limit
                                602                 :                :      * for doing read-ahead. As stream->readahead_distance is capped by
                                603                 :                :      * max_pinned_buffers, this prevents us from looking ahead so far that it
                                604                 :                :      * would put us over the pin limit.
                                605                 :                :      */
                                606         [ +  + ]:        4693906 :     if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
                                607                 :        1753695 :         return false;
                                608                 :                : 
                                609                 :        2940211 :     return true;
                                610                 :                : }
                                611                 :                : 
                                612                 :                : /*
                                613                 :                :  * We don't start the pending read just because we've hit the distance limit,
                                614                 :                :  * preferring to give it another chance to grow to full io_combine_limit size
                                615                 :                :  * once more buffers have been consumed.  But this is not desirable in all
                                616                 :                :  * situations - see below.
                                617                 :                :  */
                                618                 :                : static inline bool
                                619                 :        6483909 : read_stream_should_issue_now(ReadStream *stream)
                                620                 :                : {
                                621                 :        6483909 :     int16       pending_read_nblocks = stream->pending_read_nblocks;
                                622                 :                : 
                                623                 :                :     /* there is no pending IO that could be issued */
                                624         [ +  + ]:        6483909 :     if (pending_read_nblocks == 0)
                                625                 :        4337246 :         return false;
                                626                 :                : 
                                627                 :                :     /* never start more IOs than our cap */
                                628         [ -  + ]:        2146663 :     if (stream->ios_in_progress >= stream->max_ios)
   55 andres@anarazel.de        629                 :UNC           0 :         return false;
                                630                 :                : 
                                631                 :                :     /*
                                632                 :                :      * If the callback has signaled end-of-stream, start the pending read
                                633                 :                :      * immediately. There is no further potential for IO combining.
                                634                 :                :      */
   55 andres@anarazel.de        635         [ +  + ]:GNC     2146663 :     if (stream->readahead_distance == 0)
                                636                 :         103781 :         return true;
                                637                 :                : 
                                638                 :                :     /*
                                639                 :                :      * If we've already reached combine_distance, there's no chance of growing
                                640                 :                :      * the read further.
                                641                 :                :      */
                                642         [ +  + ]:        2042882 :     if (pending_read_nblocks >= stream->combine_distance)
                                643                 :        1679172 :         return true;
                                644                 :                : 
                                645                 :                :     /*
                                646                 :                :      * If we currently have no reads in flight or prepared, issue the IO once
                                647                 :                :      * we are not looking ahead further. This ensures there's always at least
                                648                 :                :      * one IO prepared.
                                649                 :                :      */
                                650         [ +  + ]:         363710 :     if (stream->pinned_buffers == 0 &&
                                651         [ -  + ]:         243145 :         !read_stream_should_look_ahead(stream))
   55 andres@anarazel.de        652                 :UNC           0 :         return true;
                                653                 :                : 
   55 andres@anarazel.de        654                 :GNC      363710 :     return false;
                                655                 :                : }
                                656                 :                : 
                                657                 :                : static void
  441 tmunro@postgresql.or      658                 :CBC     3300553 : read_stream_look_ahead(ReadStream *stream)
                                659                 :                : {
                                660                 :                :     /*
                                661                 :                :      * Allow amortizing the cost of submitting IO over multiple IOs. This
                                662                 :                :      * requires that we don't do any operations that could lead to a deadlock
                                663                 :                :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
                                664                 :                :      * careful.
                                665                 :                :      */
  426 andres@anarazel.de        666         [ +  + ]:        3300553 :     if (stream->batch_mode)
                                667                 :        3188943 :         pgaio_enter_batchmode();
                                668                 :                : 
   55 andres@anarazel.de        669         [ +  + ]:GNC     5266230 :     while (read_stream_should_look_ahead(stream))
                                670                 :                :     {
                                671                 :                :         BlockNumber blocknum;
                                672                 :                :         int16       buffer_index;
                                673                 :                :         void       *per_buffer_data;
                                674                 :                : 
                                675         [ +  + ]:        3183356 :         if (read_stream_should_issue_now(stream))
                                676                 :                :         {
  441 tmunro@postgresql.or      677                 :CBC        1483 :             read_stream_start_pending_read(stream);
  787                           678                 :           1483 :             continue;
                                679                 :                :         }
                                680                 :                : 
                                681                 :                :         /*
                                682                 :                :          * See which block the callback wants next in the stream.  We need to
                                683                 :                :          * compute the index of the Nth block of the pending read including
                                684                 :                :          * wrap-around, but we don't want to use the expensive % operator.
                                685                 :                :          */
                                686                 :        3181873 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
                                687         [ +  + ]:        3181873 :         if (buffer_index >= stream->queue_size)
                                688                 :           3538 :             buffer_index -= stream->queue_size;
                                689   [ +  -  -  + ]:        3181873 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                690                 :        3181873 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
                                691                 :        3181873 :         blocknum = read_stream_get_block(stream, per_buffer_data);
                                692         [ +  + ]:        3181873 :         if (blocknum == InvalidBlockNumber)
                                693                 :                :         {
                                694                 :                :             /* End of stream. */
   55 andres@anarazel.de        695                 :GNC     1217679 :             stream->readahead_distance = 0;
                                696                 :        1217679 :             stream->combine_distance = 0;
  787 tmunro@postgresql.or      697                 :CBC     1217679 :             break;
                                698                 :                :         }
                                699                 :                : 
                                700                 :                :         /* Can we merge it with the pending read? */
                                701         [ +  + ]:        1964194 :         if (stream->pending_read_nblocks > 0 &&
                                702         [ +  + ]:         248831 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
                                703                 :                :         {
                                704                 :         248772 :             stream->pending_read_nblocks++;
                                705                 :         248772 :             continue;
                                706                 :                :         }
                                707                 :                : 
                                708                 :                :         /* We have to start the pending read before we can build another. */
  783                           709         [ +  + ]:        1715481 :         while (stream->pending_read_nblocks > 0)
                                710                 :                :         {
  441                           711         [ +  - ]:             59 :             if (!read_stream_start_pending_read(stream) ||
  442                           712         [ -  + ]:             59 :                 stream->ios_in_progress == stream->max_ios)
                                713                 :                :             {
                                714                 :                :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
  787 tmunro@postgresql.or      715                 :UBC           0 :                 read_stream_unget_block(stream, blocknum);
  426 andres@anarazel.de        716         [ #  # ]:              0 :                 if (stream->batch_mode)
                                717                 :              0 :                     pgaio_exit_batchmode();
  787 tmunro@postgresql.or      718                 :              0 :                 return;
                                719                 :                :             }
                                720                 :                :         }
                                721                 :                : 
                                722                 :                :         /* This is the start of a new pending read. */
  787 tmunro@postgresql.or      723                 :CBC     1715422 :         stream->pending_read_blocknum = blocknum;
                                724                 :        1715422 :         stream->pending_read_nblocks = 1;
                                725                 :                :     }
                                726                 :                : 
                                727                 :                :     /*
                                728                 :                :      * Check if the pending read should be issued now, or if we should give it
                                729                 :                :      * another chance to grow to the full size.
                                730                 :                :      *
                                731                 :                :      * Note that the pending read can exceed the distance goal, if the latter
                                732                 :                :      * was reduced after hitting the per-backend buffer limit.
                                733                 :                :      */
   55 andres@anarazel.de        734         [ +  + ]:GNC     3300553 :     if (read_stream_should_issue_now(stream))
  441 tmunro@postgresql.or      735                 :CBC     1781470 :         read_stream_start_pending_read(stream);
                                736                 :                : 
                                737                 :                :     /*
                                738                 :                :      * There should always be something pinned when we leave this function,
                                739                 :                :      * whether started by this call or not, unless we've hit the end of the
                                740                 :                :      * stream.  In the worst case we can always make progress one buffer at a
                                741                 :                :      * time.
                                742                 :                :      */
   55 andres@anarazel.de        743   [ +  +  -  + ]:GNC     3300545 :     Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
                                744                 :                : 
  426 andres@anarazel.de        745         [ +  + ]:CBC     3300545 :     if (stream->batch_mode)
                                746                 :        3188935 :         pgaio_exit_batchmode();
                                747                 :                : }
                                748                 :                : 
                                749                 :                : /*
                                750                 :                :  * Create a new read stream object that can be used to perform the equivalent
                                751                 :                :  * of a series of ReadBuffer() calls for one fork of one relation.
                                752                 :                :  * Internally, it generates larger vectored reads where possible by looking
                                753                 :                :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
                                754                 :                :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
                                755                 :                :  * write extra data for each block into the space provided to it.  It will
                                756                 :                :  * also receive callback_private_data for its own purposes.
                                757                 :                :  */
                                758                 :                : static ReadStream *
  679 noah@leadboat.com         759                 :         574074 : read_stream_begin_impl(int flags,
                                760                 :                :                        BufferAccessStrategy strategy,
                                761                 :                :                        Relation rel,
                                762                 :                :                        SMgrRelation smgr,
                                763                 :                :                        char persistence,
                                764                 :                :                        ForkNumber forknum,
                                765                 :                :                        ReadStreamBlockNumberCB callback,
                                766                 :                :                        void *callback_private_data,
                                767                 :                :                        size_t per_buffer_data_size)
                                768                 :                : {
                                769                 :                :     ReadStream *stream;
                                770                 :                :     size_t      size;
                                771                 :                :     int16       queue_size;
                                772                 :                :     int16       queue_overflow;
                                773                 :                :     int         max_ios;
                                774                 :                :     int         strategy_pin_limit;
                                775                 :                :     uint32      max_pinned_buffers;
                                776                 :                :     uint32      max_possible_buffer_limit;
                                777                 :                :     Oid         tablespace_id;
                                778                 :                : 
                                779                 :                :     /*
                                780                 :                :      * Reject attempts to read non-local temporary relations; we would be
                                781                 :                :      * likely to get wrong data since we have no visibility into the owning
                                782                 :                :      * session's local buffers.
                                783                 :                :      */
   16 akorotkov@postgresql      784   [ +  +  +  +  :         574074 :     if (rel && RELATION_IS_OTHER_TEMP(rel))
                                              +  + ]
                                785         [ +  - ]:              5 :         ereport(ERROR,
                                786                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                787                 :                :                  errmsg("cannot access temporary tables of other sessions")));
                                788                 :                : 
                                789                 :                :     /*
                                790                 :                :      * Decide how many I/Os we will allow to run at the same time.  This
                                791                 :                :      * number also affects how far we look ahead for opportunities to start
                                792                 :                :      * more I/Os.
                                793                 :                :      */
  787 tmunro@postgresql.or      794                 :         574069 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
                                795   [ +  +  +  + ]:         574069 :     if (!OidIsValid(MyDatabaseId) ||
  679 noah@leadboat.com         796   [ +  +  +  + ]:         681746 :         (rel && IsCatalogRelation(rel)) ||
  787 tmunro@postgresql.or      797                 :         187706 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
                                798                 :                :     {
                                799                 :                :         /*
                                800                 :                :          * Avoid circularity while trying to look up tablespace settings or
                                801                 :                :          * before spccache.c is ready.
                                802                 :                :          */
                                803                 :         452080 :         max_ios = effective_io_concurrency;
                                804                 :                :     }
                                805         [ +  + ]:         121989 :     else if (flags & READ_STREAM_MAINTENANCE)
                                806                 :           8696 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
                                807                 :                :     else
                                808                 :         113293 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
                                809                 :                : 
                                810                 :                :     /* Cap to INT16_MAX to avoid overflowing below */
                                811                 :         574069 :     max_ios = Min(max_ios, PG_INT16_MAX);
                                812                 :                : 
                                813                 :                :     /*
                                814                 :                :      * If starting a multi-block I/O near the end of the queue, we might
                                815                 :                :      * temporarily need extra space for overflowing buffers before they are
                                816                 :                :      * moved to regular circular position.  This is the maximum extra space we
                                817                 :                :      * could need.
                                818                 :                :      */
  443                           819                 :         574069 :     queue_overflow = io_combine_limit - 1;
                                820                 :                : 
                                821                 :                :     /*
                                822                 :                :      * Choose the maximum number of buffers we're prepared to pin.  We try to
                                823                 :                :      * pin fewer if we can, though.  We add one so that we can make progress
                                824                 :                :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
                                825                 :                :      * this also allows an extra full I/O's worth of buffers: after an I/O
                                826                 :                :      * finishes we don't want to have to wait for its buffers to be consumed
                                827                 :                :      * before starting a new one.
                                828                 :                :      *
                                829                 :                :      * Be careful not to allow int16 to overflow.  That is possible with the
                                830                 :                :      * current GUC range limits, so this is an artificial limit of ~32k
                                831                 :                :      * buffers and we'd need to adjust the types to exceed that.  We also have
                                832                 :                :      * to allow for the spare entry and the overflow space.
                                833                 :                :      */
  457                           834                 :         574069 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
  787                           835                 :         574069 :     max_pinned_buffers = Min(max_pinned_buffers,
                                836                 :                :                              PG_INT16_MAX - queue_overflow - 1);
                                837                 :                : 
                                838                 :                :     /* Give the strategy a chance to limit the number of buffers we pin. */
  784                           839                 :         574069 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
                                840                 :         574069 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
                                841                 :                : 
                                842                 :                :     /*
                                843                 :                :      * Also limit our queue to the maximum number of pins we could ever be
                                844                 :                :      * allowed to acquire according to the buffer manager.  We may not really
                                845                 :                :      * be able to use them all due to other pins held by this backend, but
                                846                 :                :      * we'll check that later in read_stream_start_pending_read().
                                847                 :                :      */
  787                           848         [ +  + ]:         574069 :     if (SmgrIsTemp(smgr))
  442                           849                 :           9310 :         max_possible_buffer_limit = GetLocalPinLimit();
                                850                 :                :     else
                                851                 :         564759 :         max_possible_buffer_limit = GetPinLimit();
                                852                 :         574069 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
                                853                 :                : 
                                854                 :                :     /*
                                855                 :                :      * The limit might be zero on a system configured with too few buffers for
                                856                 :                :      * the number of connections.  We need at least one to make progress.
                                857                 :                :      */
                                858         [ +  + ]:         574069 :     max_pinned_buffers = Max(1, max_pinned_buffers);
                                859                 :                : 
                                860                 :                :     /*
                                861                 :                :      * We need one extra entry for buffers and per-buffer data, because users
                                862                 :                :      * of per-buffer data have access to the object until the next call to
                                863                 :                :      * read_stream_next_buffer(), so we need a gap between the head and tail
                                864                 :                :      * of the queue so that we don't clobber it.
                                865                 :                :      */
  787                           866                 :         574069 :     queue_size = max_pinned_buffers + 1;
                                867                 :                : 
                                868                 :                :     /*
                                869                 :                :      * Allocate the object, the buffers, the ios and per_buffer_data space in
                                870                 :                :      * one big chunk.  Though we have queue_size buffers, we want to be able
                                871                 :                :      * to assume that all the buffers for a single read are contiguous (i.e.
                                872                 :                :      * don't wrap around halfway through), so we allow temporary overflows of
                                873                 :                :      * up to the maximum possible overflow size.
                                874                 :                :      */
                                875                 :         574069 :     size = offsetof(ReadStream, buffers);
  443                           876                 :         574069 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
  787                           877         [ +  + ]:         574069 :     size += sizeof(InProgressIO) * Max(1, max_ios);
                                878                 :         574069 :     size += per_buffer_data_size * queue_size;
                                879                 :         574069 :     size += MAXIMUM_ALIGNOF * 2;
                                880                 :         574069 :     stream = (ReadStream *) palloc(size);
                                881                 :         574069 :     memset(stream, 0, offsetof(ReadStream, buffers));
                                882                 :         574069 :     stream->ios = (InProgressIO *)
  443                           883                 :         574069 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
  787                           884         [ +  + ]:         574069 :     if (per_buffer_data_size > 0)
                                885                 :          30187 :         stream->per_buffer_data = (void *)
                                886         [ +  + ]:          30187 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
                                887                 :                : 
  426 andres@anarazel.de        888                 :         574069 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
                                889                 :         574069 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
                                890                 :                : 
                                891                 :                : #ifdef USE_PREFETCH
                                892                 :                : 
                                893                 :                :     /*
                                894                 :                :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
                                895                 :                :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
                                896                 :                :      * caller hasn't promised sequential access (overriding our detection
                                897                 :                :      * heuristics), and max_ios hasn't been set to zero.
                                898                 :                :      */
                                899         [ +  + ]:         574069 :     if (stream->sync_mode &&
                                900         [ +  - ]:           3161 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
  787 tmunro@postgresql.or      901   [ +  +  +  - ]:           3161 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
                                902                 :                :         max_ios > 0)
                                903                 :            738 :         stream->advice_enabled = true;
                                904                 :                : #endif
                                905                 :                : 
                                906                 :                :     /*
                                907                 :                :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
                                908                 :                :      * we still need to allocate space to combine and run one I/O.  Bump it up
                                909                 :                :      * to one, and remember to ask for synchronous I/O only.
                                910                 :                :      */
                                911         [ +  + ]:         574069 :     if (max_ios == 0)
                                912                 :                :     {
                                913                 :              7 :         max_ios = 1;
  426 andres@anarazel.de        914                 :              7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
                                915                 :                :     }
                                916                 :                : 
                                917                 :                :     /*
                                918                 :                :      * Capture stable values for these two GUC-derived numbers for the
                                919                 :                :      * lifetime of this stream, so we don't have to worry about the GUCs
                                920                 :                :      * changing underneath us beyond this point.
                                921                 :                :      */
  787 tmunro@postgresql.or      922                 :         574069 :     stream->max_ios = max_ios;
  443                           923                 :         574069 :     stream->io_combine_limit = io_combine_limit;
                                924                 :                : 
  787                           925                 :         574069 :     stream->per_buffer_data_size = per_buffer_data_size;
                                926                 :         574069 :     stream->max_pinned_buffers = max_pinned_buffers;
                                927                 :         574069 :     stream->queue_size = queue_size;
                                928                 :         574069 :     stream->callback = callback;
                                929                 :         574069 :     stream->callback_private_data = callback_private_data;
  637                           930                 :         574069 :     stream->buffered_blocknum = InvalidBlockNumber;
  441                           931                 :         574069 :     stream->seq_blocknum = InvalidBlockNumber;
                                932                 :         574069 :     stream->seq_until_processed = InvalidBlockNumber;
  442                           933                 :         574069 :     stream->temporary = SmgrIsTemp(smgr);
   59 andres@anarazel.de        934                 :GNC      574069 :     stream->distance_decay_holdoff = 0;
                                935                 :                : 
                                936                 :                :     /*
                                937                 :                :      * Skip the initial ramp-up phase if the caller says we're going to be
                                938                 :                :      * reading the whole relation.  This way we start out assuming we'll be
                                939                 :                :      * doing full io_combine_limit sized reads.
                                940                 :                :      */
  787 tmunro@postgresql.or      941         [ +  + ]:CBC      574069 :     if (flags & READ_STREAM_FULL)
                                942                 :                :     {
   55 andres@anarazel.de        943                 :GNC       77231 :         stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                944                 :          77231 :         stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
                                945                 :                :     }
                                946                 :                :     else
                                947                 :                :     {
                                948                 :         496838 :         stream->readahead_distance = 1;
                                949                 :         496838 :         stream->combine_distance = 1;
                                950                 :                :     }
                                951                 :         574069 :     stream->resume_readahead_distance = stream->readahead_distance;
                                952                 :         574069 :     stream->resume_combine_distance = stream->combine_distance;
                                953                 :                : 
                                954                 :                :     /*
                                955                 :                :      * Since we always access the same relation, we can initialize parts of
                                956                 :                :      * the ReadBuffersOperation objects and leave them that way, to avoid
                                957                 :                :      * wasting CPU cycles writing to them for each read.
                                958                 :                :      */
  787 tmunro@postgresql.or      959         [ +  + ]:CBC     9787764 :     for (int i = 0; i < max_ios; ++i)
                                960                 :                :     {
                                961                 :        9213695 :         stream->ios[i].op.rel = rel;
  679 noah@leadboat.com         962                 :        9213695 :         stream->ios[i].op.smgr = smgr;
                                963                 :        9213695 :         stream->ios[i].op.persistence = persistence;
  787 tmunro@postgresql.or      964                 :        9213695 :         stream->ios[i].op.forknum = forknum;
                                965                 :        9213695 :         stream->ios[i].op.strategy = strategy;
                                966                 :                :     }
                                967                 :                : 
                                968                 :         574069 :     return stream;
                                969                 :                : }
                                970                 :                : 
                                971                 :                : /*
                                972                 :                :  * Create a new read stream for reading a relation.
                                973                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                974                 :                :  */
                                975                 :                : ReadStream *
  679 noah@leadboat.com         976                 :         503853 : read_stream_begin_relation(int flags,
                                977                 :                :                            BufferAccessStrategy strategy,
                                978                 :                :                            Relation rel,
                                979                 :                :                            ForkNumber forknum,
                                980                 :                :                            ReadStreamBlockNumberCB callback,
                                981                 :                :                            void *callback_private_data,
                                982                 :                :                            size_t per_buffer_data_size)
                                983                 :                : {
                                984                 :         503853 :     return read_stream_begin_impl(flags,
                                985                 :                :                                   strategy,
                                986                 :                :                                   rel,
                                987                 :                :                                   RelationGetSmgr(rel),
                                988                 :         503853 :                                   rel->rd_rel->relpersistence,
                                989                 :                :                                   forknum,
                                990                 :                :                                   callback,
                                991                 :                :                                   callback_private_data,
                                992                 :                :                                   per_buffer_data_size);
                                993                 :                : }
                                994                 :                : 
                                995                 :                : /*
                                996                 :                :  * Create a new read stream for reading a SMgr relation.
                                997                 :                :  * See read_stream_begin_impl() for the detailed explanation.
                                998                 :                :  */
                                999                 :                : ReadStream *
                               1000                 :          70221 : read_stream_begin_smgr_relation(int flags,
                               1001                 :                :                                 BufferAccessStrategy strategy,
                               1002                 :                :                                 SMgrRelation smgr,
                               1003                 :                :                                 char smgr_persistence,
                               1004                 :                :                                 ForkNumber forknum,
                               1005                 :                :                                 ReadStreamBlockNumberCB callback,
                               1006                 :                :                                 void *callback_private_data,
                               1007                 :                :                                 size_t per_buffer_data_size)
                               1008                 :                : {
                               1009                 :          70221 :     return read_stream_begin_impl(flags,
                               1010                 :                :                                   strategy,
                               1011                 :                :                                   NULL,
                               1012                 :                :                                   smgr,
                               1013                 :                :                                   smgr_persistence,
                               1014                 :                :                                   forknum,
                               1015                 :                :                                   callback,
                               1016                 :                :                                   callback_private_data,
                               1017                 :                :                                   per_buffer_data_size);
                               1018                 :                : }
                               1019                 :                : 
                               1020                 :                : /*
                               1021                 :                :  * Pull one pinned buffer out of a stream.  Each call returns successive
                               1022                 :                :  * blocks in the order specified by the callback.  If per_buffer_data_size was
                               1023                 :                :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
                               1024                 :                :  * per-buffer data that the callback had a chance to populate, which remains
                               1025                 :                :  * valid until the next call to read_stream_next_buffer().  When the stream
                               1026                 :                :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
                               1027                 :                :  * the stream early at any time by calling read_stream_end().
                               1028                 :                :  */
                               1029                 :                : Buffer
  787 tmunro@postgresql.or     1030                 :        7187427 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                               1031                 :                : {
                               1032                 :                :     Buffer      buffer;
                               1033                 :                :     int16       oldest_buffer_index;
                               1034                 :                : 
                               1035                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                               1036                 :                : 
                               1037                 :                :     /*
                               1038                 :                :      * A fast path for all-cached scans.  This is the same as the usual
                               1039                 :                :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
                               1040                 :                :      * we can skip the queue management code, stay in the same buffer slot and
                               1041                 :                :      * use singular StartReadBuffer().
                               1042                 :                :      */
                               1043         [ +  + ]:        7187427 :     if (likely(stream->fast_path))
                               1044                 :                :     {
                               1045                 :                :         BlockNumber next_blocknum;
                               1046                 :                : 
                               1047                 :                :         /* Fast path assumptions. */
                               1048         [ -  + ]:        2639112 :         Assert(stream->ios_in_progress == 0);
  435                          1049         [ -  + ]:        2639112 :         Assert(stream->forwarded_buffers == 0);
  787                          1050         [ -  + ]:        2639112 :         Assert(stream->pinned_buffers == 1);
   55 andres@anarazel.de       1051         [ -  + ]:GNC     2639112 :         Assert(stream->readahead_distance == 1);
                               1052         [ -  + ]:        2639112 :         Assert(stream->combine_distance == 1);
  784 tmunro@postgresql.or     1053         [ -  + ]:CBC     2639112 :         Assert(stream->pending_read_nblocks == 0);
  787                          1054         [ -  + ]:        2639112 :         Assert(stream->per_buffer_data_size == 0);
  435                          1055         [ -  + ]:        2639112 :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
                               1056                 :                : 
                               1057                 :                :         /* We're going to return the buffer we pinned last time. */
  787                          1058                 :        2639112 :         oldest_buffer_index = stream->oldest_buffer_index;
                               1059         [ -  + ]:        2639112 :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
                               1060                 :                :                stream->next_buffer_index);
                               1061                 :        2639112 :         buffer = stream->buffers[oldest_buffer_index];
                               1062         [ -  + ]:        2639112 :         Assert(buffer != InvalidBuffer);
                               1063                 :                : 
                               1064                 :                :         /* Choose the next block to pin. */
  637                          1065                 :        2639112 :         next_blocknum = read_stream_get_block(stream, NULL);
                               1066                 :                : 
  784                          1067         [ +  + ]:        2639112 :         if (likely(next_blocknum != InvalidBlockNumber))
                               1068                 :                :         {
  426 andres@anarazel.de       1069                 :        2535408 :             int         flags = stream->read_buffers_flags;
                               1070                 :                : 
                               1071         [ +  + ]:        2535408 :             if (stream->advice_enabled)
                               1072                 :            571 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
                               1073                 :                : 
                               1074                 :                :             /*
                               1075                 :                :              * While in fast-path, execute any IO that we might encounter
                               1076                 :                :              * synchronously. Because we are, right now, only looking one
                               1077                 :                :              * block ahead, dispatching any occasional IO to workers would
                               1078                 :                :              * have the overhead of dispatching to workers, without any
                               1079                 :                :              * realistic chance of the IO completing before we need it. We
                               1080                 :                :              * will switch to non-synchronous IO after this.
                               1081                 :                :              *
                               1082                 :                :              * Arguably we should do so only for worker, as there's far less
                               1083                 :                :              * dispatch overhead with io_uring. However, tests so far have not
                               1084                 :                :              * shown a clear downside and additional io_method awareness here
                               1085                 :                :              * seems not great from an abstraction POV.
                               1086                 :                :              */
   59 andres@anarazel.de       1087                 :GNC     2535408 :             flags |= READ_BUFFERS_SYNCHRONOUSLY;
                               1088                 :                : 
                               1089                 :                :             /*
                               1090                 :                :              * Pin a buffer for the next call.  Same buffer entry, and
                               1091                 :                :              * arbitrary I/O entry (they're all free).  We don't have to
                               1092                 :                :              * adjust pinned_buffers because we're transferring one to caller
                               1093                 :                :              * but pinning one more.
                               1094                 :                :              *
                               1095                 :                :              * In the fast path we don't need to check the pin limit.  We're
                               1096                 :                :              * always allowed at least one pin so that progress can be made,
                               1097                 :                :              * and that's all we need here.  Although two pins are momentarily
                               1098                 :                :              * held at the same time, the model used here is that the stream
                               1099                 :                :              * holds only one, and the other now belongs to the caller.
                               1100                 :                :              */
  784 tmunro@postgresql.or     1101         [ +  + ]:CBC     2535408 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
                               1102                 :                :                                         &stream->buffers[oldest_buffer_index],
                               1103                 :                :                                         next_blocknum,
                               1104                 :                :                                         flags)))
                               1105                 :                :             {
                               1106                 :                :                 /* Fast return. */
   53 tomas.vondra@postgre     1107                 :GNC     2518873 :                 read_stream_count_prefetch(stream);
  784 tmunro@postgresql.or     1108                 :CBC     2518873 :                 return buffer;
                               1109                 :                :             }
                               1110                 :                : 
                               1111                 :                :             /* Next call must wait for I/O for the newly pinned buffer. */
  787                          1112                 :          16535 :             stream->oldest_io_index = 0;
                               1113                 :          16535 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
                               1114                 :          16535 :             stream->ios_in_progress = 1;
                               1115                 :          16535 :             stream->ios[0].buffer_index = oldest_buffer_index;
                               1116                 :          16535 :             stream->seq_blocknum = next_blocknum + 1;
                               1117                 :                : 
                               1118                 :                :             /*
                               1119                 :                :              * XXX: It might be worth triggering additional read-ahead here,
                               1120                 :                :              * to avoid having to effectively do another synchronous IO for
                               1121                 :                :              * the next block (if it were also a miss).
                               1122                 :                :              */
                               1123                 :                : 
                               1124                 :                :             /* update I/O stats */
   53 tomas.vondra@postgre     1125                 :GNC       16535 :             read_stream_count_io(stream, 1, stream->ios_in_progress);
                               1126                 :                : 
                               1127                 :                :             /* update prefetch distance */
                               1128                 :          16535 :             read_stream_count_prefetch(stream);
                               1129                 :                :         }
                               1130                 :                :         else
                               1131                 :                :         {
                               1132                 :                :             /* No more blocks, end of stream. */
   55 andres@anarazel.de       1133                 :         103704 :             stream->readahead_distance = 0;
                               1134                 :         103704 :             stream->combine_distance = 0;
  784 tmunro@postgresql.or     1135                 :CBC      103704 :             stream->oldest_buffer_index = stream->next_buffer_index;
                               1136                 :         103704 :             stream->pinned_buffers = 0;
  435                          1137                 :         103704 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
                               1138                 :                :         }
                               1139                 :                : 
  784                          1140                 :         120239 :         stream->fast_path = false;
  787                          1141                 :         120239 :         return buffer;
                               1142                 :                :     }
                               1143                 :                : #endif
                               1144                 :                : 
                               1145         [ +  + ]:        4548315 :     if (unlikely(stream->pinned_buffers == 0))
                               1146                 :                :     {
                               1147         [ -  + ]:        3345910 :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
                               1148                 :                : 
                               1149                 :                :         /* End of stream reached?  */
   55 andres@anarazel.de       1150         [ +  + ]:GNC     3345910 :         if (stream->readahead_distance == 0)
  787 tmunro@postgresql.or     1151                 :CBC     1904538 :             return InvalidBuffer;
                               1152                 :                : 
                               1153                 :                :         /*
                               1154                 :                :          * The usual order of operations is that we look ahead at the bottom
                               1155                 :                :          * of this function after potentially finishing an I/O and making
                               1156                 :                :          * space for more, but if we're just starting up we'll need to crank
                               1157                 :                :          * the handle to get started.
                               1158                 :                :          */
  441                          1159                 :        1441372 :         read_stream_look_ahead(stream);
                               1160                 :                : 
                               1161                 :                :         /* End of stream reached? */
  787                          1162         [ +  + ]:        1441372 :         if (stream->pinned_buffers == 0)
                               1163                 :                :         {
   55 andres@anarazel.de       1164         [ -  + ]:GNC      784564 :             Assert(stream->readahead_distance == 0);
  787 tmunro@postgresql.or     1165                 :CBC      784564 :             return InvalidBuffer;
                               1166                 :                :         }
                               1167                 :                :     }
                               1168                 :                : 
                               1169                 :                :     /* Grab the oldest pinned buffer and associated per-buffer data. */
                               1170         [ -  + ]:        1859213 :     Assert(stream->pinned_buffers > 0);
                               1171                 :        1859213 :     oldest_buffer_index = stream->oldest_buffer_index;
                               1172   [ +  -  -  + ]:        1859213 :     Assert(oldest_buffer_index >= 0 &&
                               1173                 :                :            oldest_buffer_index < stream->queue_size);
                               1174                 :        1859213 :     buffer = stream->buffers[oldest_buffer_index];
                               1175         [ +  + ]:        1859213 :     if (per_buffer_data)
                               1176                 :         360675 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
                               1177                 :                : 
                               1178         [ -  + ]:        1859213 :     Assert(BufferIsValid(buffer));
                               1179                 :                : 
                               1180                 :                :     /* Do we have to wait for an associated I/O first? */
                               1181         [ +  + ]:        1859213 :     if (stream->ios_in_progress > 0 &&
                               1182         [ +  + ]:         745188 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
                               1183                 :                :     {
                               1184                 :         698104 :         int16       io_index = stream->oldest_io_index;
                               1185                 :                :         bool        needed_wait;
                               1186                 :                : 
                               1187                 :                :         /* Sanity check that we still agree on the buffers. */
                               1188         [ -  + ]:         698104 :         Assert(stream->ios[io_index].op.buffers ==
                               1189                 :                :                &stream->buffers[oldest_buffer_index]);
                               1190                 :                : 
   55 andres@anarazel.de       1191                 :GNC      698104 :         needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
                               1192                 :                : 
  787 tmunro@postgresql.or     1193         [ -  + ]:CBC      698072 :         Assert(stream->ios_in_progress > 0);
                               1194                 :         698072 :         stream->ios_in_progress--;
                               1195         [ +  + ]:         698072 :         if (++stream->oldest_io_index == stream->max_ios)
                               1196                 :          28758 :             stream->oldest_io_index = 0;
                               1197                 :                : 
                               1198                 :                :         /*
                               1199                 :                :          * If the IO was executed synchronously, we will never see
                               1200                 :                :          * WaitReadBuffers() block. Treat it as if it did block. This is
                               1201                 :                :          * particularly crucial when effective_io_concurrency=0 is used, as
                               1202                 :                :          * all IO will be synchronous.  Without treating synchronous IO as
                               1203                 :                :          * having waited, we'd never allow the distance to get large enough to
                               1204                 :                :          * allow for IO combining, resulting in bad performance.
                               1205                 :                :          */
   55 andres@anarazel.de       1206         [ +  + ]:GNC      698072 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
                               1207                 :          17033 :             needed_wait = true;
                               1208                 :                : 
                               1209                 :                :         /* Count it as a wait if we need to wait for IO */
   53 tomas.vondra@postgre     1210         [ +  + ]:         698072 :         if (needed_wait)
                               1211                 :         318570 :             read_stream_count_wait(stream);
                               1212                 :                : 
                               1213                 :                :         /*
                               1214                 :                :          * Have the read-ahead distance ramp up rapidly after we needed to
                               1215                 :                :          * wait for IO. We only increase the read-ahead-distance when we
                               1216                 :                :          * needed to wait, to avoid increasing the distance further than
                               1217                 :                :          * necessary, as looking ahead too far can be costly, both due to the
                               1218                 :                :          * cost of unnecessarily pinning many buffers and due to doing IOs
                               1219                 :                :          * that may never be consumed if the stream is ended/reset before
                               1220                 :                :          * completion.
                               1221                 :                :          *
                               1222                 :                :          * If we did not need to wait, the current distance was evidently
                               1223                 :                :          * sufficient.
                               1224                 :                :          *
                               1225                 :                :          * NB: Must not increase the distance if we already reached the end of
                               1226                 :                :          * the stream, as stream->readahead_distance == 0 is used to keep
                               1227                 :                :          * track of having reached the end.
                               1228                 :                :          */
   55 andres@anarazel.de       1229   [ +  +  +  + ]:         698072 :         if (stream->readahead_distance > 0 && needed_wait)
                               1230                 :                :         {
                               1231                 :                :             /* wider temporary value, due to overflow risk */
                               1232                 :                :             int32       readahead_distance;
                               1233                 :                : 
                               1234                 :         297084 :             readahead_distance = stream->readahead_distance * 2;
                               1235                 :         297084 :             readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
                               1236                 :         297084 :             stream->readahead_distance = readahead_distance;
                               1237                 :                :         }
                               1238                 :                : 
                               1239                 :                :         /*
                               1240                 :                :          * As we needed IO, prevent distances from being reduced within our
                               1241                 :                :          * maximum look-ahead window. This avoids collapsing distances too
                               1242                 :                :          * quickly in workloads where most of the required blocks are cached,
                               1243                 :                :          * but where the remaining IOs are a sufficient enough factor to cause
                               1244                 :                :          * a substantial slowdown if executed synchronously.
                               1245                 :                :          *
                               1246                 :                :          * There are valid arguments for preventing decay for max_ios or for
                               1247                 :                :          * max_pinned_buffers.  But the argument for max_pinned_buffers seems
                               1248                 :                :          * clearer - if we can't see any misses within the maximum look-ahead
                               1249                 :                :          * distance, we can't do any useful read-ahead.
                               1250                 :                :          */
   59                          1251                 :         698072 :         stream->distance_decay_holdoff = stream->max_pinned_buffers;
                               1252                 :                : 
                               1253                 :                :         /*
                               1254                 :                :          * Whether we needed to wait or not, allow for more IO combining if we
                               1255                 :                :          * needed to do IO. The reason to do so independent of needing to wait
                               1256                 :                :          * is that when the data is resident in the kernel page cache, IO
                               1257                 :                :          * combining reduces the syscall / dispatch overhead, making it
                               1258                 :                :          * worthwhile regardless of needing to wait.
                               1259                 :                :          *
                               1260                 :                :          * It is also important with io_uring as it will never signal the need
                               1261                 :                :          * to wait for reads if all the data is in the page cache. There are
                               1262                 :                :          * heuristics to deal with that in method_io_uring.c, but they only
                               1263                 :                :          * work when the IO gets large enough.
                               1264                 :                :          */
   55                          1265         [ +  + ]:         698072 :         if (stream->combine_distance > 0 &&
                               1266         [ +  + ]:         648038 :             stream->combine_distance < stream->io_combine_limit)
                               1267                 :                :         {
                               1268                 :                :             /* wider temporary value, due to overflow risk */
                               1269                 :                :             int32       combine_distance;
                               1270                 :                : 
                               1271                 :         639511 :             combine_distance = stream->combine_distance * 2;
                               1272                 :         639511 :             combine_distance = Min(combine_distance, stream->io_combine_limit);
                               1273                 :         639511 :             combine_distance = Min(combine_distance, stream->max_pinned_buffers);
                               1274                 :         639511 :             stream->combine_distance = combine_distance;
                               1275                 :                :         }
                               1276                 :                : 
                               1277                 :                :         /*
                               1278                 :                :          * If we've reached the first block of a sequential region we're
                               1279                 :                :          * issuing advice for, cancel that until the next jump.  The kernel
                               1280                 :                :          * will see the sequential preadv() pattern starting here.
                               1281                 :                :          */
  441 tmunro@postgresql.or     1282         [ +  + ]:CBC      698072 :         if (stream->advice_enabled &&
                               1283         [ +  + ]:            319 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
                               1284                 :            275 :             stream->seq_until_processed = InvalidBlockNumber;
                               1285                 :                :     }
                               1286                 :                : 
                               1287                 :                :     /*
                               1288                 :                :      * We must zap this queue entry, or else it would appear as a forwarded
                               1289                 :                :      * buffer.  If it's potentially in the overflow zone (ie from a
                               1290                 :                :      * multi-block I/O that wrapped around the queue), also zap the copy.
                               1291                 :                :      */
  787                          1292                 :        1859181 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
  435                          1293         [ +  + ]:        1859181 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
                               1294                 :        1470504 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
                               1295                 :                :             InvalidBuffer;
                               1296                 :                : 
                               1297                 :                : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
                               1298                 :                : 
                               1299                 :                :     /*
                               1300                 :                :      * The caller will get access to the per-buffer data, until the next call.
                               1301                 :                :      * We wipe the one before, which is never occupied because queue_size
                               1302                 :                :      * allowed one extra element.  This will hopefully trip up client code
                               1303                 :                :      * that is holding a dangling pointer to it.
                               1304                 :                :      */
  787                          1305         [ +  + ]:        1859181 :     if (stream->per_buffer_data)
                               1306                 :                :     {
                               1307                 :                :         void       *per_buffer_data;
                               1308                 :                : 
  469                          1309         [ +  + ]:         721380 :         per_buffer_data = get_per_buffer_data(stream,
                               1310                 :                :                                               oldest_buffer_index == 0 ?
                               1311                 :          80443 :                                               stream->queue_size - 1 :
                               1312                 :         280247 :                                               oldest_buffer_index - 1);
                               1313                 :                : 
                               1314                 :                : #if defined(CLOBBER_FREED_MEMORY)
                               1315                 :                :         /* This also tells Valgrind the memory is "noaccess". */
                               1316                 :         360690 :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
                               1317                 :                : #elif defined(USE_VALGRIND)
                               1318                 :                :         /* Tell it ourselves. */
                               1319                 :                :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
                               1320                 :                :                                    stream->per_buffer_data_size);
                               1321                 :                : #endif
                               1322                 :                :     }
                               1323                 :                : #endif
                               1324                 :                : 
   53 tomas.vondra@postgre     1325                 :GNC     1859181 :     read_stream_count_prefetch(stream);
                               1326                 :                : 
                               1327                 :                :     /* Pin transferred to caller. */
  787 tmunro@postgresql.or     1328         [ -  + ]:CBC     1859181 :     Assert(stream->pinned_buffers > 0);
                               1329                 :        1859181 :     stream->pinned_buffers--;
                               1330                 :                : 
                               1331                 :                :     /* Advance oldest buffer, with wrap-around. */
                               1332                 :        1859181 :     stream->oldest_buffer_index++;
                               1333         [ +  + ]:        1859181 :     if (stream->oldest_buffer_index == stream->queue_size)
                               1334                 :         337963 :         stream->oldest_buffer_index = 0;
                               1335                 :                : 
                               1336                 :                :     /* Prepare for the next call. */
  441                          1337                 :        1859181 :     read_stream_look_ahead(stream);
                               1338                 :                : 
                               1339                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                               1340                 :                :     /* See if we can take the fast path for all-cached scans next time. */
  787                          1341         [ +  + ]:        1859173 :     if (stream->ios_in_progress == 0 &&
  435                          1342         [ +  + ]:        1211967 :         stream->forwarded_buffers == 0 &&
  787                          1343         [ +  + ]:        1208679 :         stream->pinned_buffers == 1 &&
   55 andres@anarazel.de       1344         [ +  + ]:GNC      553124 :         stream->readahead_distance == 1 &&
                               1345         [ +  + ]:         467751 :         stream->combine_distance == 1 &&
  784 tmunro@postgresql.or     1346         [ +  + ]:CBC      464409 :         stream->pending_read_nblocks == 0 &&
  787                          1347         [ +  + ]:         463226 :         stream->per_buffer_data_size == 0)
                               1348                 :                :     {
                               1349                 :                :         /*
                               1350                 :                :          * The fast path spins on one buffer entry repeatedly instead of
                               1351                 :                :          * rotating through the whole queue and clearing the entries behind
                               1352                 :                :          * it.  If the buffer it starts with happened to be forwarded between
                               1353                 :                :          * StartReadBuffers() calls and also wrapped around the circular queue
                               1354                 :                :          * partway through, then a copy also exists in the overflow zone, and
                               1355                 :                :          * it won't clear it out as the regular path would.  Do that now, so
                               1356                 :                :          * it doesn't need code for that.
                               1357                 :                :          */
  294                          1358         [ +  + ]:         232367 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
                               1359                 :         230578 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
                               1360                 :                :                 InvalidBuffer;
                               1361                 :                : 
  787                          1362                 :         232367 :         stream->fast_path = true;
                               1363                 :                :     }
                               1364                 :                : #endif
                               1365                 :                : 
                               1366                 :        1859173 :     return buffer;
                               1367                 :                : }
                               1368                 :                : 
                               1369                 :                : /*
                               1370                 :                :  * Transitional support for code that would like to perform or skip reads
                               1371                 :                :  * itself, without using the stream.  Returns, and consumes, the next block
                               1372                 :                :  * number that would be read by the stream's look-ahead algorithm, or
                               1373                 :                :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
                               1374                 :                :  * strategy that would be used to read it.
                               1375                 :                :  */
                               1376                 :                : BlockNumber
  619 tmunro@postgresql.or     1377                 :UBC           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
                               1378                 :                : {
                               1379                 :              0 :     *strategy = stream->ios[0].op.strategy;
                               1380                 :              0 :     return read_stream_get_block(stream, NULL);
                               1381                 :                : }
                               1382                 :                : 
                               1383                 :                : /*
                               1384                 :                :  * Temporarily stop consuming block numbers from the block number callback.
                               1385                 :                :  * If called inside the block number callback, its return value should be
                               1386                 :                :  * returned by the callback.
                               1387                 :                :  */
                               1388                 :                : BlockNumber
   88 melanieplageman@gmai     1389                 :UNC           0 : read_stream_pause(ReadStream *stream)
                               1390                 :                : {
   55 andres@anarazel.de       1391                 :              0 :     stream->resume_readahead_distance = stream->readahead_distance;
                               1392                 :              0 :     stream->resume_combine_distance = stream->combine_distance;
                               1393                 :              0 :     stream->readahead_distance = 0;
                               1394                 :              0 :     stream->combine_distance = 0;
   88 melanieplageman@gmai     1395                 :              0 :     return InvalidBlockNumber;
                               1396                 :                : }
                               1397                 :                : 
                               1398                 :                : /*
                               1399                 :                :  * Resume looking ahead after the block number callback reported
                               1400                 :                :  * end-of-stream. This is useful for streams of self-referential blocks, after
                               1401                 :                :  * a buffer needed to be consumed and examined to find more block numbers.
                               1402                 :                :  */
                               1403                 :                : void
                               1404                 :              0 : read_stream_resume(ReadStream *stream)
                               1405                 :                : {
   55 andres@anarazel.de       1406                 :              0 :     stream->readahead_distance = stream->resume_readahead_distance;
                               1407                 :              0 :     stream->combine_distance = stream->resume_combine_distance;
   88 melanieplageman@gmai     1408                 :              0 : }
                               1409                 :                : 
                               1410                 :                : /*
                               1411                 :                :  * Reset a read stream by releasing any queued up buffers, allowing the stream
                               1412                 :                :  * to be used again for different blocks.  This can be used to clear an
                               1413                 :                :  * end-of-stream condition and start again, or to throw away blocks that were
                               1414                 :                :  * speculatively read and read some different blocks instead.
                               1415                 :                :  */
                               1416                 :                : void
  787 tmunro@postgresql.or     1417                 :CBC     1442345 : read_stream_reset(ReadStream *stream)
                               1418                 :                : {
                               1419                 :                :     int16       index;
                               1420                 :                :     Buffer      buffer;
                               1421                 :                : 
                               1422                 :                :     /* Stop looking ahead. */
   55 andres@anarazel.de       1423                 :GNC     1442345 :     stream->readahead_distance = 0;
                               1424                 :        1442345 :     stream->combine_distance = 0;
                               1425                 :                : 
                               1426                 :                :     /* Forget buffered block number and fast path state. */
  637 tmunro@postgresql.or     1427                 :CBC     1442345 :     stream->buffered_blocknum = InvalidBlockNumber;
  784                          1428                 :        1442345 :     stream->fast_path = false;
                               1429                 :                : 
                               1430                 :                :     /* Unpin anything that wasn't consumed. */
  787                          1431         [ +  + ]:        1577040 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
                               1432                 :         134695 :         ReleaseBuffer(buffer);
                               1433                 :                : 
                               1434                 :                :     /* Unpin any unused forwarded buffers. */
  435                          1435                 :        1442345 :     index = stream->next_buffer_index;
                               1436         [ +  + ]:        1442345 :     while (index < stream->initialized_buffers &&
                               1437         [ -  + ]:         217554 :            (buffer = stream->buffers[index]) != InvalidBuffer)
                               1438                 :                :     {
  435 tmunro@postgresql.or     1439         [ #  # ]:UBC           0 :         Assert(stream->forwarded_buffers > 0);
                               1440                 :              0 :         stream->forwarded_buffers--;
                               1441                 :              0 :         ReleaseBuffer(buffer);
                               1442                 :                : 
                               1443                 :              0 :         stream->buffers[index] = InvalidBuffer;
                               1444         [ #  # ]:              0 :         if (index < stream->io_combine_limit - 1)
                               1445                 :              0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
                               1446                 :                : 
                               1447         [ #  # ]:              0 :         if (++index == stream->queue_size)
                               1448                 :              0 :             index = 0;
                               1449                 :                :     }
                               1450                 :                : 
  435 tmunro@postgresql.or     1451         [ -  + ]:CBC     1442345 :     Assert(stream->forwarded_buffers == 0);
  787                          1452         [ -  + ]:        1442345 :     Assert(stream->pinned_buffers == 0);
                               1453         [ -  + ]:        1442345 :     Assert(stream->ios_in_progress == 0);
                               1454                 :                : 
                               1455                 :                :     /* Start off assuming data is cached. */
   55 andres@anarazel.de       1456                 :GNC     1442345 :     stream->readahead_distance = 1;
                               1457                 :        1442345 :     stream->combine_distance = 1;
                               1458                 :        1442345 :     stream->resume_readahead_distance = stream->readahead_distance;
                               1459                 :        1442345 :     stream->resume_combine_distance = stream->combine_distance;
   59                          1460                 :        1442345 :     stream->distance_decay_holdoff = 0;
  787 tmunro@postgresql.or     1461                 :CBC     1442345 : }
                               1462                 :                : 
                               1463                 :                : /*
                               1464                 :                :  * Release and free a read stream.
                               1465                 :                :  */
                               1466                 :                : void
                               1467                 :         570829 : read_stream_end(ReadStream *stream)
                               1468                 :                : {
                               1469                 :         570829 :     read_stream_reset(stream);
                               1470                 :         570829 :     pfree(stream);
                               1471                 :         570829 : }
        

Generated by: LCOV version 2.5.0-beta