Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * read_stream.c
4 : : * Mechanism for accessing buffered relation data with look-ahead
5 : : *
6 : : * Code that needs to access relation data typically pins blocks one at a
7 : : * time, often in a predictable order that might be sequential or data-driven.
8 : : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : : * because blocks that are not yet in the buffer pool require I/O operations
10 : : * that are small and might stall waiting for storage. This mechanism looks
11 : : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : : * distance.
14 : : *
15 : : * A user-provided callback generates a stream of block numbers that is used
16 : : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : : * pending read. When that isn't possible, the existing pending read is sent
18 : : * to StartReadBuffers() so that a new one can begin to form.
19 : : *
20 : : * The algorithm for controlling the look-ahead distance is based on recent
21 : : * cache / miss history, as well as whether we need to wait for I/O completion
22 : : * after a miss. When no I/O is necessary, there is no benefit in looking
23 : : * ahead more than one block. This is the default initial assumption. When
24 : : * blocks needing I/O are streamed, the combine distance is increased to
25 : : * benefit from I/O combining and the read-ahead distance is increased
26 : : * whenever we need to wait for I/O to try to benefit from increased I/O
27 : : * concurrency. Both are reduced gradually when cached blocks are streamed.
28 : : *
29 : : * The main data structure is a circular queue of buffers of size
30 : : * max_pinned_buffers plus some extra space for technical reasons, ready to be
31 : : * returned by read_stream_next_buffer(). Each buffer also has an optional
32 : : * variable sized object that is passed from the callback to the consumer of
33 : : * buffers.
34 : : *
35 : : * Parallel to the queue of buffers, there is a circular queue of in-progress
36 : : * I/Os that have been started with StartReadBuffers(), and for which
37 : : * WaitReadBuffers() must be called before returning the buffer.
38 : : *
39 : : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
40 : : * successive calls, then these data structures might appear as follows:
41 : : *
42 : : * buffers buf/data ios
43 : : *
44 : : * +----+ +-----+ +--------+
45 : : * | | | | +----+ 42..44 | <- oldest_io_index
46 : : * +----+ +-----+ | +--------+
47 : : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
48 : : * +----+ +-----+ | | +--------+
49 : : * | 42 | | ? |<-+ | | | <- next_io_index
50 : : * +----+ +-----+ | +--------+
51 : : * | 43 | | ? | | | |
52 : : * +----+ +-----+ | +--------+
53 : : * | 44 | | ? | | | |
54 : : * +----+ +-----+ | +--------+
55 : : * | 60 | | ? |<---+
56 : : * +----+ +-----+
57 : : * next_buffer_index -> | | | |
58 : : * +----+ +-----+
59 : : *
60 : : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
61 : : * the client is block 10. Block 10 was a hit and has no associated I/O, but
62 : : * the range 42..44 requires an I/O wait before its buffers are returned, as
63 : : * does block 60.
64 : : *
65 : : *
66 : : * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
67 : : * Portions Copyright (c) 1994, Regents of the University of California
68 : : *
69 : : * IDENTIFICATION
70 : : * src/backend/storage/aio/read_stream.c
71 : : *
72 : : *-------------------------------------------------------------------------
73 : : */
74 : : #include "postgres.h"
75 : :
76 : : #include "miscadmin.h"
77 : : #include "executor/instrument_node.h"
78 : : #include "storage/aio.h"
79 : : #include "storage/fd.h"
80 : : #include "storage/smgr.h"
81 : : #include "storage/read_stream.h"
82 : : #include "utils/memdebug.h"
83 : : #include "utils/rel.h"
84 : : #include "utils/spccache.h"
85 : :
86 : : typedef struct InProgressIO
87 : : {
88 : : int16 buffer_index;
89 : : ReadBuffersOperation op;
90 : : } InProgressIO;
91 : :
92 : : /*
93 : : * State for managing a stream of reads.
94 : : */
95 : : struct ReadStream
96 : : {
97 : : int16 max_ios;
98 : : int16 io_combine_limit;
99 : : int16 ios_in_progress;
100 : : int16 queue_size;
101 : : int16 max_pinned_buffers;
102 : : int16 forwarded_buffers;
103 : : int16 pinned_buffers;
104 : :
105 : : /*
106 : : * Limit of how far, in blocks, to look-ahead for IO combining and for
107 : : * read-ahead.
108 : : *
109 : : * The limits for read-ahead and combining are handled separately to allow
110 : : * for IO combining even in cases where the I/O subsystem can keep up at a
111 : : * low read-ahead distance, as doing larger IOs is more efficient.
112 : : *
113 : : * Set to 0 when the end of the stream is reached.
114 : : */
115 : : int16 combine_distance;
116 : : int16 readahead_distance;
117 : : uint16 distance_decay_holdoff;
118 : : int16 initialized_buffers;
119 : : int16 resume_readahead_distance;
120 : : int16 resume_combine_distance;
121 : : int read_buffers_flags;
122 : : bool sync_mode; /* using io_method=sync */
123 : : bool batch_mode; /* READ_STREAM_USE_BATCHING */
124 : : bool advice_enabled;
125 : : bool temporary;
126 : :
127 : : /* scan stats counters */
128 : : IOStats *stats;
129 : :
130 : : /*
131 : : * One-block buffer to support 'ungetting' a block number, to resolve flow
132 : : * control problems when I/Os are split.
133 : : */
134 : : BlockNumber buffered_blocknum;
135 : :
136 : : /*
137 : : * The callback that will tell us which block numbers to read, and an
138 : : * opaque pointer that will be pass to it for its own purposes.
139 : : */
140 : : ReadStreamBlockNumberCB callback;
141 : : void *callback_private_data;
142 : :
143 : : /* Next expected block, for detecting sequential access. */
144 : : BlockNumber seq_blocknum;
145 : : BlockNumber seq_until_processed;
146 : :
147 : : /* The read operation we are currently preparing. */
148 : : BlockNumber pending_read_blocknum;
149 : : int16 pending_read_nblocks;
150 : :
151 : : /* Space for buffers and optional per-buffer private data. */
152 : : size_t per_buffer_data_size;
153 : : void *per_buffer_data;
154 : :
155 : : /* Read operations that have been started but not waited for yet. */
156 : : InProgressIO *ios;
157 : : int16 oldest_io_index;
158 : : int16 next_io_index;
159 : :
160 : : bool fast_path;
161 : :
162 : : /* Circular queue of buffers. */
163 : : int16 oldest_buffer_index; /* Next pinned buffer to return */
164 : : int16 next_buffer_index; /* Index of next buffer to pin */
165 : : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
166 : : };
167 : :
168 : : /*
169 : : * Return a pointer to the per-buffer data by index.
170 : : */
171 : : static inline void *
762 tmunro@postgresql.or 172 :CBC 3903586 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
173 : : {
174 : 7807172 : return (char *) stream->per_buffer_data +
175 : 3903586 : stream->per_buffer_data_size * buffer_index;
176 : : }
177 : :
178 : : /*
179 : : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
180 : : * blocks [current_blocknum, last_exclusive).
181 : : */
182 : : BlockNumber
609 noah@leadboat.com 183 : 429088 : block_range_read_stream_cb(ReadStream *stream,
184 : : void *callback_private_data,
185 : : void *per_buffer_data)
186 : : {
187 : 429088 : BlockRangeReadStreamPrivate *p = callback_private_data;
188 : :
189 [ + + ]: 429088 : if (p->current_blocknum < p->last_exclusive)
190 : 352345 : return p->current_blocknum++;
191 : :
192 : 76743 : return InvalidBlockNumber;
193 : : }
194 : :
195 : : /*
196 : : * Update stream stats with current pinned buffer depth.
197 : : *
198 : : * Called once per buffer returned to the consumer in read_stream_next_buffer().
199 : : * Records the number of pinned buffers at that moment, so we can compute the
200 : : * average look-ahead depth.
201 : : */
202 : : static inline void
28 tomas.vondra@postgre 203 :GNC 4395792 : read_stream_count_prefetch(ReadStream *stream)
204 : : {
205 : 4395792 : IOStats *stats = stream->stats;
206 : :
207 [ + + ]: 4395792 : if (stats == NULL)
208 : 4395784 : return;
209 : :
210 : 8 : stats->prefetch_count++;
211 : 8 : stats->distance_sum += stream->pinned_buffers;
212 [ + - ]: 8 : if (stream->pinned_buffers > stats->distance_max)
213 : 8 : stats->distance_max = stream->pinned_buffers;
214 : : }
215 : :
216 : : /*
217 : : * Update stream stats about size of I/O requests.
218 : : *
219 : : * We count the number of I/O requests, size of requests (counted in blocks)
220 : : * and number of in-progress I/Os.
221 : : */
222 : : static inline void
223 : 697822 : read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
224 : : {
225 : 697822 : IOStats *stats = stream->stats;
226 : :
227 [ + - ]: 697822 : if (stats == NULL)
228 : 697822 : return;
229 : :
28 tomas.vondra@postgre 230 :UNC 0 : stats->io_count++;
231 : 0 : stats->io_nblocks += nblocks;
232 : 0 : stats->io_in_progress += in_progress;
233 : : }
234 : :
235 : : /*
236 : : * Update stream stats about waits for I/O when consuming buffers.
237 : : *
238 : : * We count the number of I/O waits while pulling buffers out of a stream.
239 : : */
240 : : static inline void
28 tomas.vondra@postgre 241 :GNC 317670 : read_stream_count_wait(ReadStream *stream)
242 : : {
243 : 317670 : IOStats *stats = stream->stats;
244 : :
245 [ + - ]: 317670 : if (stats == NULL)
246 : 317670 : return;
247 : :
28 tomas.vondra@postgre 248 :UNC 0 : stats->wait_count++;
249 : : }
250 : :
251 : : /*
252 : : * Enable collection of stats into the provided IOStats.
253 : : */
254 : : void
28 tomas.vondra@postgre 255 :GNC 8 : read_stream_enable_stats(ReadStream *stream, IOStats *stats)
256 : : {
257 : 8 : stream->stats = stats;
258 [ + - ]: 8 : if (stream->stats)
259 : 8 : stream->stats->distance_capacity = stream->max_pinned_buffers;
260 : 8 : }
261 : :
262 : : /*
263 : : * Ask the callback which block it would like us to read next, with a one block
264 : : * buffer in front to allow read_stream_unget_block() to work.
265 : : */
266 : : static inline BlockNumber
762 tmunro@postgresql.or 267 :CBC 5820862 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
268 : : {
269 : : BlockNumber blocknum;
270 : :
612 271 : 5820862 : blocknum = stream->buffered_blocknum;
272 [ - + ]: 5820862 : if (blocknum != InvalidBlockNumber)
612 tmunro@postgresql.or 273 :UBC 0 : stream->buffered_blocknum = InvalidBlockNumber;
274 : : else
275 : : {
276 : : /*
277 : : * Tell Valgrind that the per-buffer data is undefined. That replaces
278 : : * the "noaccess" state that was set when the consumer moved past this
279 : : * entry last time around the queue, and should also catch callbacks
280 : : * that fail to initialize data that the buffer consumer later
281 : : * accesses. On the first go around, it is undefined already.
282 : : */
283 : : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
284 : : stream->per_buffer_data_size);
612 tmunro@postgresql.or 285 :CBC 5820862 : blocknum = stream->callback(stream,
286 : : stream->callback_private_data,
287 : : per_buffer_data);
288 : : }
289 : :
290 : 5820862 : return blocknum;
291 : : }
292 : :
293 : : /*
294 : : * In order to deal with buffer shortages and I/O limits after short reads, we
295 : : * sometimes need to defer handling of a block we've already consumed from the
296 : : * registered callback until later.
297 : : */
298 : : static inline void
762 tmunro@postgresql.or 299 :UBC 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
300 : : {
301 : : /* We shouldn't ever unget more than one block. */
612 302 [ # # ]: 0 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
303 [ # # ]: 0 : Assert(blocknum != InvalidBlockNumber);
304 : 0 : stream->buffered_blocknum = blocknum;
762 305 : 0 : }
306 : :
307 : : /*
308 : : * Start as much of the current pending read as we can. If we have to split it
309 : : * because of the per-backend buffer limit, or the buffer manager decides to
310 : : * split it, then the pending read is adjusted to hold the remaining portion.
311 : : *
312 : : * We can always start a read of at least size one if we have no progress yet.
313 : : * Otherwise it's possible that we can't start a read at all because of a lack
314 : : * of buffers, and then false is returned. Buffer shortages also reduce the
315 : : * distance to a level that prevents look-ahead until buffers are released.
316 : : */
317 : : static bool
416 tmunro@postgresql.or 318 :CBC 1786329 : read_stream_start_pending_read(ReadStream *stream)
319 : : {
320 : : bool need_wait;
321 : : int requested_nblocks;
322 : : int nblocks;
323 : : int flags;
324 : : int forwarded;
325 : : int16 io_index;
326 : : int16 overflow;
327 : : int16 buffer_index;
328 : : int buffer_limit;
329 : :
330 : : /* This should only be called with a pending read. */
762 331 [ - + ]: 1786329 : Assert(stream->pending_read_nblocks > 0);
418 332 [ - + ]: 1786329 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
333 : :
334 : : /* We had better not exceed the per-stream buffer limit with this read. */
762 335 [ - + ]: 1786329 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
336 : : stream->max_pinned_buffers);
337 : :
338 : : #ifdef USE_ASSERT_CHECKING
339 : : /* We had better not be overwriting an existing pinned buffer. */
340 [ + + ]: 1786329 : if (stream->pinned_buffers > 0)
341 [ - + ]: 8417 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
342 : : else
343 [ - + ]: 1777912 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
344 : :
345 : : /*
346 : : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
347 : : * had to split the operation should match the leading blocks of this
348 : : * following StartReadBuffers() call.
349 : : */
269 350 [ - + ]: 1786329 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
351 [ + + ]: 1788506 : for (int i = 0; i < stream->forwarded_buffers; ++i)
352 [ - + ]: 2177 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
353 : : stream->pending_read_blocknum + i);
354 : :
355 : : /*
356 : : * Check that we've cleared the queue/overflow entries corresponding to
357 : : * the rest of the blocks covered by this read, unless it's the first go
358 : : * around and we haven't even initialized them yet.
359 : : */
360 [ + + ]: 4185685 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
361 [ + + - + ]: 2399356 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
362 : : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
363 : : #endif
364 : :
365 : : /* Do we need to issue read-ahead advice? */
401 andres@anarazel.de 366 : 1786329 : flags = stream->read_buffers_flags;
416 tmunro@postgresql.or 367 [ + + ]: 1786329 : if (stream->advice_enabled)
368 : : {
369 [ + + ]: 1729 : if (stream->pending_read_blocknum == stream->seq_blocknum)
370 : : {
371 : : /*
372 : : * Sequential: Issue advice until the preadv() calls have caught
373 : : * up with the first advice issued for this sequential region, and
374 : : * then stay out of the way of the kernel's own read-ahead.
375 : : */
376 [ + + ]: 30 : if (stream->seq_until_processed != InvalidBlockNumber)
401 andres@anarazel.de 377 : 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
378 : : }
379 : : else
380 : : {
381 : : /*
382 : : * Random jump: Note the starting location of a new potential
383 : : * sequential region and start issuing advice. Skip it this time
384 : : * if the preadv() follows immediately, eg first block in stream.
385 : : */
416 tmunro@postgresql.or 386 : 1699 : stream->seq_until_processed = stream->pending_read_blocknum;
387 [ + + ]: 1699 : if (stream->pinned_buffers > 0)
401 andres@anarazel.de 388 : 44 : flags |= READ_BUFFERS_ISSUE_ADVICE;
389 : : }
390 : : }
391 : :
392 : : /*
393 : : * How many more buffers is this backend allowed?
394 : : *
395 : : * Forwarded buffers are already pinned and map to the leading blocks of
396 : : * the pending read (the remaining portion of an earlier short read that
397 : : * we're about to continue). They are not counted in pinned_buffers, but
398 : : * they are counted as pins already held by this backend according to the
399 : : * buffer manager, so they must be added to the limit it grants us.
400 : : */
417 tmunro@postgresql.or 401 [ + + ]: 1786329 : if (stream->temporary)
402 [ + - ]: 16425 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
403 : : else
404 [ + - ]: 1769904 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
410 405 [ - + ]: 1786329 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
406 : :
407 : 1786329 : buffer_limit += stream->forwarded_buffers;
393 andres@anarazel.de 408 : 1786329 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
409 : :
417 tmunro@postgresql.or 410 [ + + + - ]: 1786329 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
411 : 742397 : buffer_limit = 1; /* guarantee progress */
412 : :
413 : : /* Does the per-backend limit affect this read? */
414 : 1786329 : nblocks = stream->pending_read_nblocks;
415 [ + + ]: 1786329 : if (buffer_limit < nblocks)
416 : : {
417 : : int16 new_distance;
418 : :
419 : : /* Shrink distance: no more look-ahead until buffers are released. */
420 : 1909 : new_distance = stream->pinned_buffers + buffer_limit;
30 andres@anarazel.de 421 [ + + ]:GNC 1909 : if (stream->readahead_distance > new_distance)
422 : 481 : stream->readahead_distance = new_distance;
423 : :
424 : : /* Unless we have nothing to give the consumer, stop here. */
417 tmunro@postgresql.or 425 [ + + ]:CBC 1909 : if (stream->pinned_buffers > 0)
426 : 74 : return false;
427 : :
428 : : /* A short read is required to make progress. */
429 : 1835 : nblocks = buffer_limit;
430 : : }
431 : :
432 : : /*
433 : : * We say how many blocks we want to read, but it may be smaller on return
434 : : * if the buffer manager decides to shorten the read. Initialize buffers
435 : : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
436 : : * and keep the original nblocks number so we can check for forwarded
437 : : * buffers as output, below.
438 : : */
762 439 : 1786255 : buffer_index = stream->next_buffer_index;
440 : 1786255 : io_index = stream->next_io_index;
410 441 [ + + ]: 2931426 : while (stream->initialized_buffers < buffer_index + nblocks)
442 : 1145171 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
443 : 1786255 : requested_nblocks = nblocks;
762 444 : 1786255 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
445 : 1786255 : &stream->buffers[buffer_index],
446 : : stream->pending_read_blocknum,
447 : : &nblocks,
448 : : flags);
449 : 1786247 : stream->pinned_buffers += nblocks;
450 : :
451 : : /* Remember whether we need to wait before returning this buffer. */
452 [ + + ]: 1786247 : if (!need_wait)
453 : : {
454 : : /*
455 : : * If there currently is no IO in progress, and we have not needed to
456 : : * issue IO recently, decay the look-ahead distance. We detect if we
457 : : * had to issue IO recently by having a decay holdoff that's set to
458 : : * the max look-ahead distance whenever we need to do IO. This is
459 : : * important to ensure we eventually reach a high enough distance to
460 : : * perform IO asynchronously when starting out with a small look-ahead
461 : : * distance.
462 : : */
30 andres@anarazel.de 463 [ + + ]:GNC 1105499 : if (stream->ios_in_progress == 0)
464 : : {
465 [ + + ]: 1105011 : if (stream->distance_decay_holdoff > 0)
34 466 : 26910 : stream->distance_decay_holdoff--;
467 : : else
468 : : {
30 469 [ + + ]: 1078101 : if (stream->readahead_distance > 1)
470 : 18089 : stream->readahead_distance--;
471 : :
472 : : /*
473 : : * For now we reduce the IO combine distance after
474 : : * sufficiently many buffer hits. There is no clear
475 : : * performance argument for doing so, but at the moment we
476 : : * need to do so to make the entrance into fast_path work
477 : : * correctly: We require combine_distance == 1 to enter
478 : : * fast-path, as without that condition we would wrongly
479 : : * re-enter fast-path when readahead_distance == 1 and
480 : : * pinned_buffers == 1, as we would not yet have prepared
481 : : * another IO in that situation.
482 : : */
483 [ + + ]: 1078101 : if (stream->combine_distance > 1)
484 : 18080 : stream->combine_distance--;
485 : : }
486 : : }
487 : : }
488 : : else
489 : : {
490 : : /*
491 : : * Remember to call WaitReadBuffers() before returning head buffer.
492 : : * Look-ahead distance will be adjusted after waiting.
493 : : */
762 tmunro@postgresql.or 494 :CBC 680748 : stream->ios[io_index].buffer_index = buffer_index;
495 [ + + ]: 680748 : if (++stream->next_io_index == stream->max_ios)
496 : 28578 : stream->next_io_index = 0;
497 [ - + ]: 680748 : Assert(stream->ios_in_progress < stream->max_ios);
498 : 680748 : stream->ios_in_progress++;
499 : 680748 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
500 : :
501 : : /* update I/O stats */
28 tomas.vondra@postgre 502 :GNC 680748 : read_stream_count_io(stream, nblocks, stream->ios_in_progress);
503 : : }
504 : :
505 : : /*
506 : : * How many pins were acquired but forwarded to the next call? These need
507 : : * to be passed to the next StartReadBuffers() call by leaving them
508 : : * exactly where they are in the queue, or released if the stream ends
509 : : * early. We need the number for accounting purposes, since they are not
510 : : * counted in stream->pinned_buffers but we already hold them.
511 : : */
410 tmunro@postgresql.or 512 :CBC 1786247 : forwarded = 0;
513 [ + + ]: 1788427 : while (nblocks + forwarded < requested_nblocks &&
514 [ + + ]: 67451 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
515 : 2180 : forwarded++;
516 : 1786247 : stream->forwarded_buffers = forwarded;
517 : :
518 : : /*
519 : : * We gave a contiguous range of buffer space to StartReadBuffers(), but
520 : : * we want it to wrap around at queue_size. Copy overflowing buffers to
521 : : * the front of the array where they'll be consumed, but also leave a copy
522 : : * in the overflow zone which the I/O operation has a pointer to (it needs
523 : : * a contiguous array). Both copies will be cleared when the buffers are
524 : : * handed to the consumer.
525 : : */
526 : 1786247 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
762 527 [ + + ]: 1786247 : if (overflow > 0)
528 : : {
410 529 [ - + ]: 439 : Assert(overflow < stream->queue_size); /* can't overlap */
530 : 439 : memcpy(&stream->buffers[0],
531 : 439 : &stream->buffers[stream->queue_size],
532 : : sizeof(stream->buffers[0]) * overflow);
533 : : }
534 : :
535 : : /* Compute location of start of next read, without using % operator. */
762 536 : 1786247 : buffer_index += nblocks;
537 [ + + ]: 1786247 : if (buffer_index >= stream->queue_size)
538 : 345915 : buffer_index -= stream->queue_size;
539 [ + - - + ]: 1786247 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
540 : 1786247 : stream->next_buffer_index = buffer_index;
541 : :
542 : : /* Adjust the pending read to cover the remaining portion, if any. */
543 : 1786247 : stream->pending_read_blocknum += nblocks;
544 : 1786247 : stream->pending_read_nblocks -= nblocks;
545 : :
417 546 : 1786247 : return true;
547 : : }
548 : :
549 : : /*
550 : : * Should we continue to perform look ahead? Looking ahead may allow us to
551 : : * make the pending IO larger via IO combining or to issue more read-ahead.
552 : : */
553 : : static inline bool
30 andres@anarazel.de 554 :GNC 5515606 : read_stream_should_look_ahead(ReadStream *stream)
555 : : {
556 : : /* If the callback has signaled end-of-stream, we're done */
557 [ + + ]: 5515606 : if (stream->readahead_distance == 0)
558 : 330305 : return false;
559 : :
560 : : /* never start more IOs than our cap */
561 [ - + ]: 5185301 : if (stream->ios_in_progress >= stream->max_ios)
30 andres@anarazel.de 562 :UNC 0 : return false;
563 : :
564 : : /*
565 : : * Allow looking further ahead if we are in the process of building a
566 : : * larger IO, the IO is not yet big enough, and we don't yet have IO in
567 : : * flight.
568 : : *
569 : : * We do so to allow building larger reads when readahead_distance is
570 : : * small (e.g. because the I/O subsystem is keeping up or
571 : : * effective_io_concurrency is small). That's a useful goal because larger
572 : : * reads are more CPU efficient than smaller reads, even if the system is
573 : : * not IO bound.
574 : : *
575 : : * The reason we do *not* do so when we already have a read prepared (i.e.
576 : : * why we check for pinned_buffers == 0) is once we are actually reading
577 : : * ahead, we don't need it:
578 : : *
579 : : * - We won't issue unnecessarily small reads as
580 : : * read_stream_should_issue_now() will return false until the IO is
581 : : * suitably sized. The issuance of the pending read will be delayed until
582 : : * enough buffers have been consumed.
583 : : *
584 : : * - If we are not reading ahead aggressively enough, future
585 : : * WaitReadBuffers() calls will return true, leading to readahead_distance
586 : : * being increased. After that more full-sized IOs can be issued.
587 : : *
588 : : * Furthermore, if we did not have the pinned_buffers == 0 condition, we
589 : : * might end up issuing I/O more aggressively than we need.
590 : : *
591 : : * Note that a return of true here can lead to exceeding the read-ahead
592 : : * limit, but we won't exceed the buffer pin limit (because pinned_buffers
593 : : * == 0 and combine_distance is capped by max_pinned_buffers).
594 : : */
30 andres@anarazel.de 595 [ + + ]:GNC 5185301 : if (stream->pending_read_nblocks > 0 &&
596 [ + + ]: 2288479 : stream->pinned_buffers == 0 &&
597 [ + + ]: 2164157 : stream->pending_read_nblocks < stream->combine_distance)
598 : 485782 : return true;
599 : :
600 : : /*
601 : : * Don't start more read-ahead if that'd put us over the distance limit
602 : : * for doing read-ahead. As stream->readahead_distance is capped by
603 : : * max_pinned_buffers, this prevents us from looking ahead so far that it
604 : : * would put us over the pin limit.
605 : : */
606 [ + + ]: 4699519 : if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
607 : 1756687 : return false;
608 : :
609 : 2942832 : return true;
610 : : }
611 : :
612 : : /*
613 : : * We don't start the pending read just because we've hit the distance limit,
614 : : * preferring to give it another chance to grow to full io_combine_limit size
615 : : * once more buffers have been consumed. But this is not desirable in all
616 : : * situations - see below.
617 : : */
618 : : static inline bool
619 : 6489684 : read_stream_should_issue_now(ReadStream *stream)
620 : : {
621 : 6489684 : int16 pending_read_nblocks = stream->pending_read_nblocks;
622 : :
623 : : /* there is no pending IO that could be issued */
624 [ + + ]: 6489684 : if (pending_read_nblocks == 0)
625 : 4340285 : return false;
626 : :
627 : : /* never start more IOs than our cap */
628 [ - + ]: 2149399 : if (stream->ios_in_progress >= stream->max_ios)
30 andres@anarazel.de 629 :UNC 0 : return false;
630 : :
631 : : /*
632 : : * If the callback has signaled end-of-stream, start the pending read
633 : : * immediately. There is no further potential for IO combining.
634 : : */
30 andres@anarazel.de 635 [ + + ]:GNC 2149399 : if (stream->readahead_distance == 0)
636 : 103811 : return true;
637 : :
638 : : /*
639 : : * If we've already reached combine_distance, there's no chance of growing
640 : : * the read further.
641 : : */
642 [ + + ]: 2045588 : if (pending_read_nblocks >= stream->combine_distance)
643 : 1682459 : return true;
644 : :
645 : : /*
646 : : * If we currently have no reads in flight or prepared, issue the IO once
647 : : * we are not looking ahead further. This ensures there's always at least
648 : : * one IO prepared.
649 : : */
650 [ + + ]: 363129 : if (stream->pinned_buffers == 0 &&
651 [ - + ]: 242891 : !read_stream_should_look_ahead(stream))
30 andres@anarazel.de 652 :UNC 0 : return true;
653 : :
30 andres@anarazel.de 654 :GNC 363129 : return false;
655 : : }
656 : :
657 : : static void
416 tmunro@postgresql.or 658 :CBC 3303961 : read_stream_look_ahead(ReadStream *stream)
659 : : {
660 : : /*
661 : : * Allow amortizing the cost of submitting IO over multiple IOs. This
662 : : * requires that we don't do any operations that could lead to a deadlock
663 : : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
664 : : * careful.
665 : : */
401 andres@anarazel.de 666 [ + + ]: 3303961 : if (stream->batch_mode)
667 : 3194181 : pgaio_enter_batchmode();
668 : :
30 andres@anarazel.de 669 [ + + ]:GNC 5272715 : while (read_stream_should_look_ahead(stream))
670 : : {
671 : : BlockNumber blocknum;
672 : : int16 buffer_index;
673 : : void *per_buffer_data;
674 : :
675 [ + + ]: 3185723 : if (read_stream_should_issue_now(stream))
676 : : {
416 tmunro@postgresql.or 677 :CBC 1514 : read_stream_start_pending_read(stream);
762 678 : 1514 : continue;
679 : : }
680 : :
681 : : /*
682 : : * See which block the callback wants next in the stream. We need to
683 : : * compute the index of the Nth block of the pending read including
684 : : * wrap-around, but we don't want to use the expensive % operator.
685 : : */
686 : 3184209 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
687 [ + + ]: 3184209 : if (buffer_index >= stream->queue_size)
688 : 3482 : buffer_index -= stream->queue_size;
689 [ + - - + ]: 3184209 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
690 : 3184209 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
691 : 3184209 : blocknum = read_stream_get_block(stream, per_buffer_data);
692 [ + + ]: 3184209 : if (blocknum == InvalidBlockNumber)
693 : : {
694 : : /* End of stream. */
30 andres@anarazel.de 695 :GNC 1216969 : stream->readahead_distance = 0;
696 : 1216969 : stream->combine_distance = 0;
762 tmunro@postgresql.or 697 :CBC 1216969 : break;
698 : : }
699 : :
700 : : /* Can we merge it with the pending read? */
701 [ + + ]: 1967240 : if (stream->pending_read_nblocks > 0 &&
702 [ + + ]: 248552 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
703 : : {
704 : 248493 : stream->pending_read_nblocks++;
705 : 248493 : continue;
706 : : }
707 : :
708 : : /* We have to start the pending read before we can build another. */
758 709 [ + + ]: 1718806 : while (stream->pending_read_nblocks > 0)
710 : : {
416 711 [ + - ]: 59 : if (!read_stream_start_pending_read(stream) ||
417 712 [ - + ]: 59 : stream->ios_in_progress == stream->max_ios)
713 : : {
714 : : /* We've hit the buffer or I/O limit. Rewind and stop here. */
762 tmunro@postgresql.or 715 :UBC 0 : read_stream_unget_block(stream, blocknum);
401 andres@anarazel.de 716 [ # # ]: 0 : if (stream->batch_mode)
717 : 0 : pgaio_exit_batchmode();
762 tmunro@postgresql.or 718 : 0 : return;
719 : : }
720 : : }
721 : :
722 : : /* This is the start of a new pending read. */
762 tmunro@postgresql.or 723 :CBC 1718747 : stream->pending_read_blocknum = blocknum;
724 : 1718747 : stream->pending_read_nblocks = 1;
725 : : }
726 : :
727 : : /*
728 : : * Check if the pending read should be issued now, or if we should give it
729 : : * another chance to grow to the full size.
730 : : *
731 : : * Note that the pending read can exceed the distance goal, if the latter
732 : : * was reduced after hitting the per-backend buffer limit.
733 : : */
30 andres@anarazel.de 734 [ + + ]:GNC 3303961 : if (read_stream_should_issue_now(stream))
416 tmunro@postgresql.or 735 :CBC 1784756 : read_stream_start_pending_read(stream);
736 : :
737 : : /*
738 : : * There should always be something pinned when we leave this function,
739 : : * whether started by this call or not, unless we've hit the end of the
740 : : * stream. In the worst case we can always make progress one buffer at a
741 : : * time.
742 : : */
30 andres@anarazel.de 743 [ + + - + ]:GNC 3303953 : Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
744 : :
401 andres@anarazel.de 745 [ + + ]:CBC 3303953 : if (stream->batch_mode)
746 : 3194173 : pgaio_exit_batchmode();
747 : : }
748 : :
749 : : /*
750 : : * Create a new read stream object that can be used to perform the equivalent
751 : : * of a series of ReadBuffer() calls for one fork of one relation.
752 : : * Internally, it generates larger vectored reads where possible by looking
753 : : * ahead. The callback should return block numbers or InvalidBlockNumber to
754 : : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
755 : : * write extra data for each block into the space provided to it. It will
756 : : * also receive callback_private_data for its own purposes.
757 : : */
758 : : static ReadStream *
654 noah@leadboat.com 759 : 574103 : read_stream_begin_impl(int flags,
760 : : BufferAccessStrategy strategy,
761 : : Relation rel,
762 : : SMgrRelation smgr,
763 : : char persistence,
764 : : ForkNumber forknum,
765 : : ReadStreamBlockNumberCB callback,
766 : : void *callback_private_data,
767 : : size_t per_buffer_data_size)
768 : : {
769 : : ReadStream *stream;
770 : : size_t size;
771 : : int16 queue_size;
772 : : int16 queue_overflow;
773 : : int max_ios;
774 : : int strategy_pin_limit;
775 : : uint32 max_pinned_buffers;
776 : : uint32 max_possible_buffer_limit;
777 : : Oid tablespace_id;
778 : :
779 : : /*
780 : : * Decide how many I/Os we will allow to run at the same time. This
781 : : * number also affects how far we look ahead for opportunities to start
782 : : * more I/Os.
783 : : */
762 tmunro@postgresql.or 784 : 574103 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
785 [ + + + + ]: 574103 : if (!OidIsValid(MyDatabaseId) ||
654 noah@leadboat.com 786 [ + + + + ]: 681538 : (rel && IsCatalogRelation(rel)) ||
762 tmunro@postgresql.or 787 : 187149 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
788 : : {
789 : : /*
790 : : * Avoid circularity while trying to look up tablespace settings or
791 : : * before spccache.c is ready.
792 : : */
793 : 452428 : max_ios = effective_io_concurrency;
794 : : }
795 [ + + ]: 121675 : else if (flags & READ_STREAM_MAINTENANCE)
796 : 8674 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
797 : : else
798 : 113001 : max_ios = get_tablespace_io_concurrency(tablespace_id);
799 : :
800 : : /* Cap to INT16_MAX to avoid overflowing below */
801 : 574103 : max_ios = Min(max_ios, PG_INT16_MAX);
802 : :
803 : : /*
804 : : * If starting a multi-block I/O near the end of the queue, we might
805 : : * temporarily need extra space for overflowing buffers before they are
806 : : * moved to regular circular position. This is the maximum extra space we
807 : : * could need.
808 : : */
418 809 : 574103 : queue_overflow = io_combine_limit - 1;
810 : :
811 : : /*
812 : : * Choose the maximum number of buffers we're prepared to pin. We try to
813 : : * pin fewer if we can, though. We add one so that we can make progress
814 : : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
815 : : * this also allows an extra full I/O's worth of buffers: after an I/O
816 : : * finishes we don't want to have to wait for its buffers to be consumed
817 : : * before starting a new one.
818 : : *
819 : : * Be careful not to allow int16 to overflow. That is possible with the
820 : : * current GUC range limits, so this is an artificial limit of ~32k
821 : : * buffers and we'd need to adjust the types to exceed that. We also have
822 : : * to allow for the spare entry and the overflow space.
823 : : */
432 824 : 574103 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
762 825 : 574103 : max_pinned_buffers = Min(max_pinned_buffers,
826 : : PG_INT16_MAX - queue_overflow - 1);
827 : :
828 : : /* Give the strategy a chance to limit the number of buffers we pin. */
759 829 : 574103 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
830 : 574103 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
831 : :
832 : : /*
833 : : * Also limit our queue to the maximum number of pins we could ever be
834 : : * allowed to acquire according to the buffer manager. We may not really
835 : : * be able to use them all due to other pins held by this backend, but
836 : : * we'll check that later in read_stream_start_pending_read().
837 : : */
762 838 [ + + ]: 574103 : if (SmgrIsTemp(smgr))
417 839 : 9288 : max_possible_buffer_limit = GetLocalPinLimit();
840 : : else
841 : 564815 : max_possible_buffer_limit = GetPinLimit();
842 : 574103 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
843 : :
844 : : /*
845 : : * The limit might be zero on a system configured with too few buffers for
846 : : * the number of connections. We need at least one to make progress.
847 : : */
848 [ + + ]: 574103 : max_pinned_buffers = Max(1, max_pinned_buffers);
849 : :
850 : : /*
851 : : * We need one extra entry for buffers and per-buffer data, because users
852 : : * of per-buffer data have access to the object until the next call to
853 : : * read_stream_next_buffer(), so we need a gap between the head and tail
854 : : * of the queue so that we don't clobber it.
855 : : */
762 856 : 574103 : queue_size = max_pinned_buffers + 1;
857 : :
858 : : /*
859 : : * Allocate the object, the buffers, the ios and per_buffer_data space in
860 : : * one big chunk. Though we have queue_size buffers, we want to be able
861 : : * to assume that all the buffers for a single read are contiguous (i.e.
862 : : * don't wrap around halfway through), so we allow temporary overflows of
863 : : * up to the maximum possible overflow size.
864 : : */
865 : 574103 : size = offsetof(ReadStream, buffers);
418 866 : 574103 : size += sizeof(Buffer) * (queue_size + queue_overflow);
762 867 [ + + ]: 574103 : size += sizeof(InProgressIO) * Max(1, max_ios);
868 : 574103 : size += per_buffer_data_size * queue_size;
869 : 574103 : size += MAXIMUM_ALIGNOF * 2;
870 : 574103 : stream = (ReadStream *) palloc(size);
871 : 574103 : memset(stream, 0, offsetof(ReadStream, buffers));
872 : 574103 : stream->ios = (InProgressIO *)
418 873 : 574103 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
762 874 [ + + ]: 574103 : if (per_buffer_data_size > 0)
875 : 30102 : stream->per_buffer_data = (void *)
876 [ + + ]: 30102 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
877 : :
401 andres@anarazel.de 878 : 574103 : stream->sync_mode = io_method == IOMETHOD_SYNC;
879 : 574103 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
880 : :
881 : : #ifdef USE_PREFETCH
882 : :
883 : : /*
884 : : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
885 : : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
886 : : * caller hasn't promised sequential access (overriding our detection
887 : : * heuristics), and max_ios hasn't been set to zero.
888 : : */
889 [ + + ]: 574103 : if (stream->sync_mode &&
890 [ + - ]: 3161 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
762 tmunro@postgresql.or 891 [ + + + - ]: 3161 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
892 : : max_ios > 0)
893 : 738 : stream->advice_enabled = true;
894 : : #endif
895 : :
896 : : /*
897 : : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
898 : : * we still need to allocate space to combine and run one I/O. Bump it up
899 : : * to one, and remember to ask for synchronous I/O only.
900 : : */
901 [ + + ]: 574103 : if (max_ios == 0)
902 : : {
903 : 7 : max_ios = 1;
401 andres@anarazel.de 904 : 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
905 : : }
906 : :
907 : : /*
908 : : * Capture stable values for these two GUC-derived numbers for the
909 : : * lifetime of this stream, so we don't have to worry about the GUCs
910 : : * changing underneath us beyond this point.
911 : : */
762 tmunro@postgresql.or 912 : 574103 : stream->max_ios = max_ios;
418 913 : 574103 : stream->io_combine_limit = io_combine_limit;
914 : :
762 915 : 574103 : stream->per_buffer_data_size = per_buffer_data_size;
916 : 574103 : stream->max_pinned_buffers = max_pinned_buffers;
917 : 574103 : stream->queue_size = queue_size;
918 : 574103 : stream->callback = callback;
919 : 574103 : stream->callback_private_data = callback_private_data;
612 920 : 574103 : stream->buffered_blocknum = InvalidBlockNumber;
416 921 : 574103 : stream->seq_blocknum = InvalidBlockNumber;
922 : 574103 : stream->seq_until_processed = InvalidBlockNumber;
417 923 : 574103 : stream->temporary = SmgrIsTemp(smgr);
34 andres@anarazel.de 924 :GNC 574103 : stream->distance_decay_holdoff = 0;
925 : :
926 : : /*
927 : : * Skip the initial ramp-up phase if the caller says we're going to be
928 : : * reading the whole relation. This way we start out assuming we'll be
929 : : * doing full io_combine_limit sized reads.
930 : : */
762 tmunro@postgresql.or 931 [ + + ]:CBC 574103 : if (flags & READ_STREAM_FULL)
932 : : {
30 andres@anarazel.de 933 :GNC 76966 : stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
934 : 76966 : stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
935 : : }
936 : : else
937 : : {
938 : 497137 : stream->readahead_distance = 1;
939 : 497137 : stream->combine_distance = 1;
940 : : }
941 : 574103 : stream->resume_readahead_distance = stream->readahead_distance;
942 : 574103 : stream->resume_combine_distance = stream->combine_distance;
943 : :
944 : : /*
945 : : * Since we always access the same relation, we can initialize parts of
946 : : * the ReadBuffersOperation objects and leave them that way, to avoid
947 : : * wasting CPU cycles writing to them for each read.
948 : : */
762 tmunro@postgresql.or 949 [ + + ]:CBC 9788410 : for (int i = 0; i < max_ios; ++i)
950 : : {
951 : 9214307 : stream->ios[i].op.rel = rel;
654 noah@leadboat.com 952 : 9214307 : stream->ios[i].op.smgr = smgr;
953 : 9214307 : stream->ios[i].op.persistence = persistence;
762 tmunro@postgresql.or 954 : 9214307 : stream->ios[i].op.forknum = forknum;
955 : 9214307 : stream->ios[i].op.strategy = strategy;
956 : : }
957 : :
958 : 574103 : return stream;
959 : : }
960 : :
961 : : /*
962 : : * Create a new read stream for reading a relation.
963 : : * See read_stream_begin_impl() for the detailed explanation.
964 : : */
965 : : ReadStream *
654 noah@leadboat.com 966 : 504141 : read_stream_begin_relation(int flags,
967 : : BufferAccessStrategy strategy,
968 : : Relation rel,
969 : : ForkNumber forknum,
970 : : ReadStreamBlockNumberCB callback,
971 : : void *callback_private_data,
972 : : size_t per_buffer_data_size)
973 : : {
974 : 504141 : return read_stream_begin_impl(flags,
975 : : strategy,
976 : : rel,
977 : : RelationGetSmgr(rel),
978 : 504141 : rel->rd_rel->relpersistence,
979 : : forknum,
980 : : callback,
981 : : callback_private_data,
982 : : per_buffer_data_size);
983 : : }
984 : :
985 : : /*
986 : : * Create a new read stream for reading a SMgr relation.
987 : : * See read_stream_begin_impl() for the detailed explanation.
988 : : */
989 : : ReadStream *
990 : 69962 : read_stream_begin_smgr_relation(int flags,
991 : : BufferAccessStrategy strategy,
992 : : SMgrRelation smgr,
993 : : char smgr_persistence,
994 : : ForkNumber forknum,
995 : : ReadStreamBlockNumberCB callback,
996 : : void *callback_private_data,
997 : : size_t per_buffer_data_size)
998 : : {
999 : 69962 : return read_stream_begin_impl(flags,
1000 : : strategy,
1001 : : NULL,
1002 : : smgr,
1003 : : smgr_persistence,
1004 : : forknum,
1005 : : callback,
1006 : : callback_private_data,
1007 : : per_buffer_data_size);
1008 : : }
1009 : :
1010 : : /*
1011 : : * Pull one pinned buffer out of a stream. Each call returns successive
1012 : : * blocks in the order specified by the callback. If per_buffer_data_size was
1013 : : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
1014 : : * per-buffer data that the callback had a chance to populate, which remains
1015 : : * valid until the next call to read_stream_next_buffer(). When the stream
1016 : : * runs out of data, InvalidBuffer is returned. The caller may decide to end
1017 : : * the stream early at any time by calling read_stream_end().
1018 : : */
1019 : : Buffer
762 tmunro@postgresql.or 1020 : 7187525 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
1021 : : {
1022 : : Buffer buffer;
1023 : : int16 oldest_buffer_index;
1024 : :
1025 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
1026 : :
1027 : : /*
1028 : : * A fast path for all-cached scans. This is the same as the usual
1029 : : * algorithm, but it is specialized for no I/O and no per-buffer data, so
1030 : : * we can skip the queue management code, stay in the same buffer slot and
1031 : : * use singular StartReadBuffer().
1032 : : */
1033 [ + + ]: 7187525 : if (likely(stream->fast_path))
1034 : : {
1035 : : BlockNumber next_blocknum;
1036 : :
1037 : : /* Fast path assumptions. */
1038 [ - + ]: 2636653 : Assert(stream->ios_in_progress == 0);
410 1039 [ - + ]: 2636653 : Assert(stream->forwarded_buffers == 0);
762 1040 [ - + ]: 2636653 : Assert(stream->pinned_buffers == 1);
30 andres@anarazel.de 1041 [ - + ]:GNC 2636653 : Assert(stream->readahead_distance == 1);
1042 [ - + ]: 2636653 : Assert(stream->combine_distance == 1);
759 tmunro@postgresql.or 1043 [ - + ]:CBC 2636653 : Assert(stream->pending_read_nblocks == 0);
762 1044 [ - + ]: 2636653 : Assert(stream->per_buffer_data_size == 0);
410 1045 [ - + ]: 2636653 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
1046 : :
1047 : : /* We're going to return the buffer we pinned last time. */
762 1048 : 2636653 : oldest_buffer_index = stream->oldest_buffer_index;
1049 [ - + ]: 2636653 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
1050 : : stream->next_buffer_index);
1051 : 2636653 : buffer = stream->buffers[oldest_buffer_index];
1052 [ - + ]: 2636653 : Assert(buffer != InvalidBuffer);
1053 : :
1054 : : /* Choose the next block to pin. */
612 1055 : 2636653 : next_blocknum = read_stream_get_block(stream, NULL);
1056 : :
759 1057 [ + + ]: 2636653 : if (likely(next_blocknum != InvalidBlockNumber))
1058 : : {
401 andres@anarazel.de 1059 : 2533239 : int flags = stream->read_buffers_flags;
1060 : :
1061 [ + + ]: 2533239 : if (stream->advice_enabled)
1062 : 571 : flags |= READ_BUFFERS_ISSUE_ADVICE;
1063 : :
1064 : : /*
1065 : : * While in fast-path, execute any IO that we might encounter
1066 : : * synchronously. Because we are, right now, only looking one
1067 : : * block ahead, dispatching any occasional IO to workers would
1068 : : * have the overhead of dispatching to workers, without any
1069 : : * realistic chance of the IO completing before we need it. We
1070 : : * will switch to non-synchronous IO after this.
1071 : : *
1072 : : * Arguably we should do so only for worker, as there's far less
1073 : : * dispatch overhead with io_uring. However, tests so far have not
1074 : : * shown a clear downside and additional io_method awareness here
1075 : : * seems not great from an abstraction POV.
1076 : : */
34 andres@anarazel.de 1077 :GNC 2533239 : flags |= READ_BUFFERS_SYNCHRONOUSLY;
1078 : :
1079 : : /*
1080 : : * Pin a buffer for the next call. Same buffer entry, and
1081 : : * arbitrary I/O entry (they're all free). We don't have to
1082 : : * adjust pinned_buffers because we're transferring one to caller
1083 : : * but pinning one more.
1084 : : *
1085 : : * In the fast path we don't need to check the pin limit. We're
1086 : : * always allowed at least one pin so that progress can be made,
1087 : : * and that's all we need here. Although two pins are momentarily
1088 : : * held at the same time, the model used here is that the stream
1089 : : * holds only one, and the other now belongs to the caller.
1090 : : */
759 tmunro@postgresql.or 1091 [ + + ]:CBC 2533239 : if (likely(!StartReadBuffer(&stream->ios[0].op,
1092 : : &stream->buffers[oldest_buffer_index],
1093 : : next_blocknum,
1094 : : flags)))
1095 : : {
1096 : : /* Fast return. */
28 tomas.vondra@postgre 1097 :GNC 2516165 : read_stream_count_prefetch(stream);
759 tmunro@postgresql.or 1098 :CBC 2516165 : return buffer;
1099 : : }
1100 : :
1101 : : /* Next call must wait for I/O for the newly pinned buffer. */
762 1102 : 17074 : stream->oldest_io_index = 0;
1103 : 17074 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
1104 : 17074 : stream->ios_in_progress = 1;
1105 : 17074 : stream->ios[0].buffer_index = oldest_buffer_index;
1106 : 17074 : stream->seq_blocknum = next_blocknum + 1;
1107 : :
1108 : : /*
1109 : : * XXX: It might be worth triggering additional read-ahead here,
1110 : : * to avoid having to effectively do another synchronous IO for
1111 : : * the next block (if it were also a miss).
1112 : : */
1113 : :
1114 : : /* update I/O stats */
28 tomas.vondra@postgre 1115 :GNC 17074 : read_stream_count_io(stream, 1, stream->ios_in_progress);
1116 : :
1117 : : /* update prefetch distance */
1118 : 17074 : read_stream_count_prefetch(stream);
1119 : : }
1120 : : else
1121 : : {
1122 : : /* No more blocks, end of stream. */
30 andres@anarazel.de 1123 : 103414 : stream->readahead_distance = 0;
1124 : 103414 : stream->combine_distance = 0;
759 tmunro@postgresql.or 1125 :CBC 103414 : stream->oldest_buffer_index = stream->next_buffer_index;
1126 : 103414 : stream->pinned_buffers = 0;
410 1127 : 103414 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
1128 : : }
1129 : :
759 1130 : 120488 : stream->fast_path = false;
762 1131 : 120488 : return buffer;
1132 : : }
1133 : : #endif
1134 : :
1135 [ + + ]: 4550872 : if (unlikely(stream->pinned_buffers == 0))
1136 : : {
1137 [ - + ]: 3345370 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
1138 : :
1139 : : /* End of stream reached? */
30 andres@anarazel.de 1140 [ + + ]:GNC 3345370 : if (stream->readahead_distance == 0)
762 tmunro@postgresql.or 1141 :CBC 1903962 : return InvalidBuffer;
1142 : :
1143 : : /*
1144 : : * The usual order of operations is that we look ahead at the bottom
1145 : : * of this function after potentially finishing an I/O and making
1146 : : * space for more, but if we're just starting up we'll need to crank
1147 : : * the handle to get started.
1148 : : */
416 1149 : 1441408 : read_stream_look_ahead(stream);
1150 : :
1151 : : /* End of stream reached? */
762 1152 [ + + ]: 1441408 : if (stream->pinned_buffers == 0)
1153 : : {
30 andres@anarazel.de 1154 [ - + ]:GNC 784326 : Assert(stream->readahead_distance == 0);
762 tmunro@postgresql.or 1155 :CBC 784326 : return InvalidBuffer;
1156 : : }
1157 : : }
1158 : :
1159 : : /* Grab the oldest pinned buffer and associated per-buffer data. */
1160 [ - + ]: 1862584 : Assert(stream->pinned_buffers > 0);
1161 : 1862584 : oldest_buffer_index = stream->oldest_buffer_index;
1162 [ + - - + ]: 1862584 : Assert(oldest_buffer_index >= 0 &&
1163 : : oldest_buffer_index < stream->queue_size);
1164 : 1862584 : buffer = stream->buffers[oldest_buffer_index];
1165 [ + + ]: 1862584 : if (per_buffer_data)
1166 : 359681 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
1167 : :
1168 [ - + ]: 1862584 : Assert(BufferIsValid(buffer));
1169 : :
1170 : : /* Do we have to wait for an associated I/O first? */
1171 [ + + ]: 1862584 : if (stream->ios_in_progress > 0 &&
1172 [ + + ]: 745072 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
1173 : : {
1174 : 697425 : int16 io_index = stream->oldest_io_index;
1175 : : bool needed_wait;
1176 : :
1177 : : /* Sanity check that we still agree on the buffers. */
1178 [ - + ]: 697425 : Assert(stream->ios[io_index].op.buffers ==
1179 : : &stream->buffers[oldest_buffer_index]);
1180 : :
30 andres@anarazel.de 1181 :GNC 697425 : needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
1182 : :
762 tmunro@postgresql.or 1183 [ - + ]:CBC 697394 : Assert(stream->ios_in_progress > 0);
1184 : 697394 : stream->ios_in_progress--;
1185 [ + + ]: 697394 : if (++stream->oldest_io_index == stream->max_ios)
1186 : 28578 : stream->oldest_io_index = 0;
1187 : :
1188 : : /*
1189 : : * If the IO was executed synchronously, we will never see
1190 : : * WaitReadBuffers() block. Treat it as if it did block. This is
1191 : : * particularly crucial when effective_io_concurrency=0 is used, as
1192 : : * all IO will be synchronous. Without treating synchronous IO as
1193 : : * having waited, we'd never allow the distance to get large enough to
1194 : : * allow for IO combining, resulting in bad performance.
1195 : : */
30 andres@anarazel.de 1196 [ + + ]:GNC 697394 : if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
1197 : 17541 : needed_wait = true;
1198 : :
1199 : : /* Count it as a wait if we need to wait for IO */
28 tomas.vondra@postgre 1200 [ + + ]: 697394 : if (needed_wait)
1201 : 317670 : read_stream_count_wait(stream);
1202 : :
1203 : : /*
1204 : : * Have the read-ahead distance ramp up rapidly after we needed to
1205 : : * wait for IO. We only increase the read-ahead-distance when we
1206 : : * needed to wait, to avoid increasing the distance further than
1207 : : * necessary, as looking ahead too far can be costly, both due to the
1208 : : * cost of unnecessarily pinning many buffers and due to doing IOs
1209 : : * that may never be consumed if the stream is ended/reset before
1210 : : * completion.
1211 : : *
1212 : : * If we did not need to wait, the current distance was evidently
1213 : : * sufficient.
1214 : : *
1215 : : * NB: Must not increase the distance if we already reached the end of
1216 : : * the stream, as stream->readahead_distance == 0 is used to keep
1217 : : * track of having reached the end.
1218 : : */
30 andres@anarazel.de 1219 [ + + + + ]: 697394 : if (stream->readahead_distance > 0 && needed_wait)
1220 : : {
1221 : : /* wider temporary value, due to overflow risk */
1222 : : int32 readahead_distance;
1223 : :
1224 : 296615 : readahead_distance = stream->readahead_distance * 2;
1225 : 296615 : readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
1226 : 296615 : stream->readahead_distance = readahead_distance;
1227 : : }
1228 : :
1229 : : /*
1230 : : * As we needed IO, prevent distances from being reduced within our
1231 : : * maximum look-ahead window. This avoids collapsing distances too
1232 : : * quickly in workloads where most of the required blocks are cached,
1233 : : * but where the remaining IOs are a sufficient enough factor to cause
1234 : : * a substantial slowdown if executed synchronously.
1235 : : *
1236 : : * There are valid arguments for preventing decay for max_ios or for
1237 : : * max_pinned_buffers. But the argument for max_pinned_buffers seems
1238 : : * clearer - if we can't see any misses within the maximum look-ahead
1239 : : * distance, we can't do any useful read-ahead.
1240 : : */
34 1241 : 697394 : stream->distance_decay_holdoff = stream->max_pinned_buffers;
1242 : :
1243 : : /*
1244 : : * Whether we needed to wait or not, allow for more IO combining if we
1245 : : * needed to do IO. The reason to do so independent of needing to wait
1246 : : * is that when the data is resident in the kernel page cache, IO
1247 : : * combining reduces the syscall / dispatch overhead, making it
1248 : : * worthwhile regardless of needing to wait.
1249 : : *
1250 : : * It is also important with io_uring as it will never signal the need
1251 : : * to wait for reads if all the data is in the page cache. There are
1252 : : * heuristics to deal with that in method_io_uring.c, but they only
1253 : : * work when the IO gets large enough.
1254 : : */
30 1255 [ + + ]: 697394 : if (stream->combine_distance > 0 &&
1256 [ + + ]: 647438 : stream->combine_distance < stream->io_combine_limit)
1257 : : {
1258 : : /* wider temporary value, due to overflow risk */
1259 : : int32 combine_distance;
1260 : :
1261 : 638958 : combine_distance = stream->combine_distance * 2;
1262 : 638958 : combine_distance = Min(combine_distance, stream->io_combine_limit);
1263 : 638958 : combine_distance = Min(combine_distance, stream->max_pinned_buffers);
1264 : 638958 : stream->combine_distance = combine_distance;
1265 : : }
1266 : :
1267 : : /*
1268 : : * If we've reached the first block of a sequential region we're
1269 : : * issuing advice for, cancel that until the next jump. The kernel
1270 : : * will see the sequential preadv() pattern starting here.
1271 : : */
416 tmunro@postgresql.or 1272 [ + + ]:CBC 697394 : if (stream->advice_enabled &&
1273 [ + + ]: 319 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
1274 : 275 : stream->seq_until_processed = InvalidBlockNumber;
1275 : : }
1276 : :
1277 : : /*
1278 : : * We must zap this queue entry, or else it would appear as a forwarded
1279 : : * buffer. If it's potentially in the overflow zone (ie from a
1280 : : * multi-block I/O that wrapped around the queue), also zap the copy.
1281 : : */
762 1282 : 1862553 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
410 1283 [ + + ]: 1862553 : if (oldest_buffer_index < stream->io_combine_limit - 1)
1284 : 1472550 : stream->buffers[stream->queue_size + oldest_buffer_index] =
1285 : : InvalidBuffer;
1286 : :
1287 : : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
1288 : :
1289 : : /*
1290 : : * The caller will get access to the per-buffer data, until the next call.
1291 : : * We wipe the one before, which is never occupied because queue_size
1292 : : * allowed one extra element. This will hopefully trip up client code
1293 : : * that is holding a dangling pointer to it.
1294 : : */
762 1295 [ + + ]: 1862553 : if (stream->per_buffer_data)
1296 : : {
1297 : : void *per_buffer_data;
1298 : :
444 1299 [ + + ]: 719392 : per_buffer_data = get_per_buffer_data(stream,
1300 : : oldest_buffer_index == 0 ?
1301 : 78744 : stream->queue_size - 1 :
1302 : 280952 : oldest_buffer_index - 1);
1303 : :
1304 : : #if defined(CLOBBER_FREED_MEMORY)
1305 : : /* This also tells Valgrind the memory is "noaccess". */
1306 : 359696 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
1307 : : #elif defined(USE_VALGRIND)
1308 : : /* Tell it ourselves. */
1309 : : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
1310 : : stream->per_buffer_data_size);
1311 : : #endif
1312 : : }
1313 : : #endif
1314 : :
28 tomas.vondra@postgre 1315 :GNC 1862553 : read_stream_count_prefetch(stream);
1316 : :
1317 : : /* Pin transferred to caller. */
762 tmunro@postgresql.or 1318 [ - + ]:CBC 1862553 : Assert(stream->pinned_buffers > 0);
1319 : 1862553 : stream->pinned_buffers--;
1320 : :
1321 : : /* Advance oldest buffer, with wrap-around. */
1322 : 1862553 : stream->oldest_buffer_index++;
1323 [ + + ]: 1862553 : if (stream->oldest_buffer_index == stream->queue_size)
1324 : 338453 : stream->oldest_buffer_index = 0;
1325 : :
1326 : : /* Prepare for the next call. */
416 1327 : 1862553 : read_stream_look_ahead(stream);
1328 : :
1329 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
1330 : : /* See if we can take the fast path for all-cached scans next time. */
762 1331 [ + + ]: 1862545 : if (stream->ios_in_progress == 0 &&
410 1332 [ + + ]: 1215613 : stream->forwarded_buffers == 0 &&
762 1333 [ + + ]: 1212402 : stream->pinned_buffers == 1 &&
30 andres@anarazel.de 1334 [ + + ]:GNC 556963 : stream->readahead_distance == 1 &&
1335 [ + + ]: 471756 : stream->combine_distance == 1 &&
759 tmunro@postgresql.or 1336 [ + + ]:CBC 468396 : stream->pending_read_nblocks == 0 &&
762 1337 [ + + ]: 467202 : stream->per_buffer_data_size == 0)
1338 : : {
1339 : : /*
1340 : : * The fast path spins on one buffer entry repeatedly instead of
1341 : : * rotating through the whole queue and clearing the entries behind
1342 : : * it. If the buffer it starts with happened to be forwarded between
1343 : : * StartReadBuffers() calls and also wrapped around the circular queue
1344 : : * partway through, then a copy also exists in the overflow zone, and
1345 : : * it won't clear it out as the regular path would. Do that now, so
1346 : : * it doesn't need code for that.
1347 : : */
269 1348 [ + + ]: 233704 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1349 : 231904 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1350 : : InvalidBuffer;
1351 : :
762 1352 : 233704 : stream->fast_path = true;
1353 : : }
1354 : : #endif
1355 : :
1356 : 1862545 : return buffer;
1357 : : }
1358 : :
1359 : : /*
1360 : : * Transitional support for code that would like to perform or skip reads
1361 : : * itself, without using the stream. Returns, and consumes, the next block
1362 : : * number that would be read by the stream's look-ahead algorithm, or
1363 : : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1364 : : * strategy that would be used to read it.
1365 : : */
1366 : : BlockNumber
594 tmunro@postgresql.or 1367 :UBC 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1368 : : {
1369 : 0 : *strategy = stream->ios[0].op.strategy;
1370 : 0 : return read_stream_get_block(stream, NULL);
1371 : : }
1372 : :
1373 : : /*
1374 : : * Temporarily stop consuming block numbers from the block number callback.
1375 : : * If called inside the block number callback, its return value should be
1376 : : * returned by the callback.
1377 : : */
1378 : : BlockNumber
63 melanieplageman@gmai 1379 :UNC 0 : read_stream_pause(ReadStream *stream)
1380 : : {
30 andres@anarazel.de 1381 : 0 : stream->resume_readahead_distance = stream->readahead_distance;
1382 : 0 : stream->resume_combine_distance = stream->combine_distance;
1383 : 0 : stream->readahead_distance = 0;
1384 : 0 : stream->combine_distance = 0;
63 melanieplageman@gmai 1385 : 0 : return InvalidBlockNumber;
1386 : : }
1387 : :
1388 : : /*
1389 : : * Resume looking ahead after the block number callback reported
1390 : : * end-of-stream. This is useful for streams of self-referential blocks, after
1391 : : * a buffer needed to be consumed and examined to find more block numbers.
1392 : : */
1393 : : void
1394 : 0 : read_stream_resume(ReadStream *stream)
1395 : : {
30 andres@anarazel.de 1396 : 0 : stream->readahead_distance = stream->resume_readahead_distance;
1397 : 0 : stream->combine_distance = stream->resume_combine_distance;
63 melanieplageman@gmai 1398 : 0 : }
1399 : :
1400 : : /*
1401 : : * Reset a read stream by releasing any queued up buffers, allowing the stream
1402 : : * to be used again for different blocks. This can be used to clear an
1403 : : * end-of-stream condition and start again, or to throw away blocks that were
1404 : : * speculatively read and read some different blocks instead.
1405 : : */
1406 : : void
762 tmunro@postgresql.or 1407 :CBC 1442370 : read_stream_reset(ReadStream *stream)
1408 : : {
1409 : : int16 index;
1410 : : Buffer buffer;
1411 : :
1412 : : /* Stop looking ahead. */
30 andres@anarazel.de 1413 :GNC 1442370 : stream->readahead_distance = 0;
1414 : 1442370 : stream->combine_distance = 0;
1415 : :
1416 : : /* Forget buffered block number and fast path state. */
612 tmunro@postgresql.or 1417 :CBC 1442370 : stream->buffered_blocknum = InvalidBlockNumber;
759 1418 : 1442370 : stream->fast_path = false;
1419 : :
1420 : : /* Unpin anything that wasn't consumed. */
762 1421 [ + + ]: 1578084 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1422 : 135714 : ReleaseBuffer(buffer);
1423 : :
1424 : : /* Unpin any unused forwarded buffers. */
410 1425 : 1442370 : index = stream->next_buffer_index;
1426 [ + + ]: 1442370 : while (index < stream->initialized_buffers &&
1427 [ - + ]: 217842 : (buffer = stream->buffers[index]) != InvalidBuffer)
1428 : : {
410 tmunro@postgresql.or 1429 [ # # ]:UBC 0 : Assert(stream->forwarded_buffers > 0);
1430 : 0 : stream->forwarded_buffers--;
1431 : 0 : ReleaseBuffer(buffer);
1432 : :
1433 : 0 : stream->buffers[index] = InvalidBuffer;
1434 [ # # ]: 0 : if (index < stream->io_combine_limit - 1)
1435 : 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1436 : :
1437 [ # # ]: 0 : if (++index == stream->queue_size)
1438 : 0 : index = 0;
1439 : : }
1440 : :
410 tmunro@postgresql.or 1441 [ - + ]:CBC 1442370 : Assert(stream->forwarded_buffers == 0);
762 1442 [ - + ]: 1442370 : Assert(stream->pinned_buffers == 0);
1443 [ - + ]: 1442370 : Assert(stream->ios_in_progress == 0);
1444 : :
1445 : : /* Start off assuming data is cached. */
30 andres@anarazel.de 1446 :GNC 1442370 : stream->readahead_distance = 1;
1447 : 1442370 : stream->combine_distance = 1;
1448 : 1442370 : stream->resume_readahead_distance = stream->readahead_distance;
1449 : 1442370 : stream->resume_combine_distance = stream->combine_distance;
34 1450 : 1442370 : stream->distance_decay_holdoff = 0;
762 tmunro@postgresql.or 1451 :CBC 1442370 : }
1452 : :
1453 : : /*
1454 : : * Release and free a read stream.
1455 : : */
1456 : : void
1457 : 570866 : read_stream_end(ReadStream *stream)
1458 : : {
1459 : 570866 : read_stream_reset(stream);
1460 : 570866 : pfree(stream);
1461 : 570866 : }
|