Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * read_stream.c
4 : : * Mechanism for accessing buffered relation data with look-ahead
5 : : *
6 : : * Code that needs to access relation data typically pins blocks one at a
7 : : * time, often in a predictable order that might be sequential or data-driven.
8 : : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : : * because blocks that are not yet in the buffer pool require I/O operations
10 : : * that are small and might stall waiting for storage. This mechanism looks
11 : : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : : * distance.
14 : : *
15 : : * A user-provided callback generates a stream of block numbers that is used
16 : : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : : * pending read. When that isn't possible, the existing pending read is sent
18 : : * to StartReadBuffers() so that a new one can begin to form.
19 : : *
20 : : * The algorithm for controlling the look-ahead distance is based on recent
21 : : * cache hit and miss history. When no I/O is necessary, there is no benefit
22 : : * in looking ahead more than one block. This is the default initial
23 : : * assumption, but when blocks needing I/O are streamed, the distance is
24 : : * increased rapidly to try to benefit from I/O combining and concurrency. It
25 : : * is reduced gradually when cached blocks are streamed.
26 : : *
27 : : * The main data structure is a circular queue of buffers of size
28 : : * max_pinned_buffers plus some extra space for technical reasons, ready to be
29 : : * returned by read_stream_next_buffer(). Each buffer also has an optional
30 : : * variable sized object that is passed from the callback to the consumer of
31 : : * buffers.
32 : : *
33 : : * Parallel to the queue of buffers, there is a circular queue of in-progress
34 : : * I/Os that have been started with StartReadBuffers(), and for which
35 : : * WaitReadBuffers() must be called before returning the buffer.
36 : : *
37 : : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
38 : : * successive calls, then these data structures might appear as follows:
39 : : *
40 : : * buffers buf/data ios
41 : : *
42 : : * +----+ +-----+ +--------+
43 : : * | | | | +----+ 42..44 | <- oldest_io_index
44 : : * +----+ +-----+ | +--------+
45 : : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
46 : : * +----+ +-----+ | | +--------+
47 : : * | 42 | | ? |<-+ | | | <- next_io_index
48 : : * +----+ +-----+ | +--------+
49 : : * | 43 | | ? | | | |
50 : : * +----+ +-----+ | +--------+
51 : : * | 44 | | ? | | | |
52 : : * +----+ +-----+ | +--------+
53 : : * | 60 | | ? |<---+
54 : : * +----+ +-----+
55 : : * next_buffer_index -> | | | |
56 : : * +----+ +-----+
57 : : *
58 : : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
59 : : * the client is block 10. Block 10 was a hit and has no associated I/O, but
60 : : * the range 42..44 requires an I/O wait before its buffers are returned, as
61 : : * does block 60.
62 : : *
63 : : *
64 : : * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
65 : : * Portions Copyright (c) 1994, Regents of the University of California
66 : : *
67 : : * IDENTIFICATION
68 : : * src/backend/storage/aio/read_stream.c
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include "miscadmin.h"
75 : : #include "storage/aio.h"
76 : : #include "storage/fd.h"
77 : : #include "storage/smgr.h"
78 : : #include "storage/read_stream.h"
79 : : #include "utils/memdebug.h"
80 : : #include "utils/rel.h"
81 : : #include "utils/spccache.h"
82 : :
83 : : typedef struct InProgressIO
84 : : {
85 : : int16 buffer_index;
86 : : ReadBuffersOperation op;
87 : : } InProgressIO;
88 : :
89 : : /*
90 : : * State for managing a stream of reads.
91 : : */
92 : : struct ReadStream
93 : : {
94 : : int16 max_ios;
95 : : int16 io_combine_limit;
96 : : int16 ios_in_progress;
97 : : int16 queue_size;
98 : : int16 max_pinned_buffers;
99 : : int16 forwarded_buffers;
100 : : int16 pinned_buffers;
101 : : int16 distance;
102 : : int16 initialized_buffers;
103 : : int16 resume_distance;
104 : : int read_buffers_flags;
105 : : bool sync_mode; /* using io_method=sync */
106 : : bool batch_mode; /* READ_STREAM_USE_BATCHING */
107 : : bool advice_enabled;
108 : : bool temporary;
109 : :
110 : : /*
111 : : * One-block buffer to support 'ungetting' a block number, to resolve flow
112 : : * control problems when I/Os are split.
113 : : */
114 : : BlockNumber buffered_blocknum;
115 : :
116 : : /*
117 : : * The callback that will tell us which block numbers to read, and an
118 : : * opaque pointer that will be pass to it for its own purposes.
119 : : */
120 : : ReadStreamBlockNumberCB callback;
121 : : void *callback_private_data;
122 : :
123 : : /* Next expected block, for detecting sequential access. */
124 : : BlockNumber seq_blocknum;
125 : : BlockNumber seq_until_processed;
126 : :
127 : : /* The read operation we are currently preparing. */
128 : : BlockNumber pending_read_blocknum;
129 : : int16 pending_read_nblocks;
130 : :
131 : : /* Space for buffers and optional per-buffer private data. */
132 : : size_t per_buffer_data_size;
133 : : void *per_buffer_data;
134 : :
135 : : /* Read operations that have been started but not waited for yet. */
136 : : InProgressIO *ios;
137 : : int16 oldest_io_index;
138 : : int16 next_io_index;
139 : :
140 : : bool fast_path;
141 : :
142 : : /* Circular queue of buffers. */
143 : : int16 oldest_buffer_index; /* Next pinned buffer to return */
144 : : int16 next_buffer_index; /* Index of next buffer to pin */
145 : : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
146 : : };
147 : :
148 : : /*
149 : : * Return a pointer to the per-buffer data by index.
150 : : */
151 : : static inline void *
711 tmunro@postgresql.or 152 :CBC 4060425 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
153 : : {
154 : 8120850 : return (char *) stream->per_buffer_data +
155 : 4060425 : stream->per_buffer_data_size * buffer_index;
156 : : }
157 : :
158 : : /*
159 : : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
160 : : * blocks [current_blocknum, last_exclusive).
161 : : */
162 : : BlockNumber
558 noah@leadboat.com 163 : 394733 : block_range_read_stream_cb(ReadStream *stream,
164 : : void *callback_private_data,
165 : : void *per_buffer_data)
166 : : {
167 : 394733 : BlockRangeReadStreamPrivate *p = callback_private_data;
168 : :
169 [ + + ]: 394733 : if (p->current_blocknum < p->last_exclusive)
170 : 324854 : return p->current_blocknum++;
171 : :
172 : 69879 : return InvalidBlockNumber;
173 : : }
174 : :
175 : : /*
176 : : * Ask the callback which block it would like us to read next, with a one block
177 : : * buffer in front to allow read_stream_unget_block() to work.
178 : : */
179 : : static inline BlockNumber
711 tmunro@postgresql.or 180 : 5497737 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
181 : : {
182 : : BlockNumber blocknum;
183 : :
561 184 : 5497737 : blocknum = stream->buffered_blocknum;
185 [ - + ]: 5497737 : if (blocknum != InvalidBlockNumber)
561 tmunro@postgresql.or 186 :UBC 0 : stream->buffered_blocknum = InvalidBlockNumber;
187 : : else
188 : : {
189 : : /*
190 : : * Tell Valgrind that the per-buffer data is undefined. That replaces
191 : : * the "noaccess" state that was set when the consumer moved past this
192 : : * entry last time around the queue, and should also catch callbacks
193 : : * that fail to initialize data that the buffer consumer later
194 : : * accesses. On the first go around, it is undefined already.
195 : : */
196 : : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
197 : : stream->per_buffer_data_size);
561 tmunro@postgresql.or 198 :CBC 5497737 : blocknum = stream->callback(stream,
199 : : stream->callback_private_data,
200 : : per_buffer_data);
201 : : }
202 : :
203 : 5497737 : return blocknum;
204 : : }
205 : :
206 : : /*
207 : : * In order to deal with buffer shortages and I/O limits after short reads, we
208 : : * sometimes need to defer handling of a block we've already consumed from the
209 : : * registered callback until later.
210 : : */
211 : : static inline void
711 tmunro@postgresql.or 212 :UBC 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
213 : : {
214 : : /* We shouldn't ever unget more than one block. */
561 215 [ # # ]: 0 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
216 [ # # ]: 0 : Assert(blocknum != InvalidBlockNumber);
217 : 0 : stream->buffered_blocknum = blocknum;
711 218 : 0 : }
219 : :
220 : : /*
221 : : * Start as much of the current pending read as we can. If we have to split it
222 : : * because of the per-backend buffer limit, or the buffer manager decides to
223 : : * split it, then the pending read is adjusted to hold the remaining portion.
224 : : *
225 : : * We can always start a read of at least size one if we have no progress yet.
226 : : * Otherwise it's possible that we can't start a read at all because of a lack
227 : : * of buffers, and then false is returned. Buffer shortages also reduce the
228 : : * distance to a level that prevents look-ahead until buffers are released.
229 : : */
230 : : static bool
365 tmunro@postgresql.or 231 :CBC 1842097 : read_stream_start_pending_read(ReadStream *stream)
232 : : {
233 : : bool need_wait;
234 : : int requested_nblocks;
235 : : int nblocks;
236 : : int flags;
237 : : int forwarded;
238 : : int16 io_index;
239 : : int16 overflow;
240 : : int16 buffer_index;
241 : : int buffer_limit;
242 : :
243 : : /* This should only be called with a pending read. */
711 244 [ - + ]: 1842097 : Assert(stream->pending_read_nblocks > 0);
367 245 [ - + ]: 1842097 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
246 : :
247 : : /* We had better not exceed the per-stream buffer limit with this read. */
711 248 [ - + ]: 1842097 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
249 : : stream->max_pinned_buffers);
250 : :
251 : : #ifdef USE_ASSERT_CHECKING
252 : : /* We had better not be overwriting an existing pinned buffer. */
253 [ + + ]: 1842097 : if (stream->pinned_buffers > 0)
254 [ - + ]: 13927 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
255 : : else
256 [ - + ]: 1828170 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
257 : :
258 : : /*
259 : : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
260 : : * had to split the operation should match the leading blocks of this
261 : : * following StartReadBuffers() call.
262 : : */
218 263 [ - + ]: 1842097 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
264 [ + + ]: 1843768 : for (int i = 0; i < stream->forwarded_buffers; ++i)
265 [ - + ]: 1671 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
266 : : stream->pending_read_blocknum + i);
267 : :
268 : : /*
269 : : * Check that we've cleared the queue/overflow entries corresponding to
270 : : * the rest of the blocks covered by this read, unless it's the first go
271 : : * around and we haven't even initialized them yet.
272 : : */
273 [ + + ]: 4188669 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
274 [ + + - + ]: 2346572 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
275 : : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
276 : : #endif
277 : :
278 : : /* Do we need to issue read-ahead advice? */
350 andres@anarazel.de 279 : 1842097 : flags = stream->read_buffers_flags;
365 tmunro@postgresql.or 280 [ + + ]: 1842097 : if (stream->advice_enabled)
281 : : {
282 [ + + ]: 1640 : if (stream->pending_read_blocknum == stream->seq_blocknum)
283 : : {
284 : : /*
285 : : * Sequential: Issue advice until the preadv() calls have caught
286 : : * up with the first advice issued for this sequential region, and
287 : : * then stay out of the way of the kernel's own read-ahead.
288 : : */
289 [ + + ]: 23 : if (stream->seq_until_processed != InvalidBlockNumber)
350 andres@anarazel.de 290 : 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
291 : : }
292 : : else
293 : : {
294 : : /*
295 : : * Random jump: Note the starting location of a new potential
296 : : * sequential region and start issuing advice. Skip it this time
297 : : * if the preadv() follows immediately, eg first block in stream.
298 : : */
365 tmunro@postgresql.or 299 : 1617 : stream->seq_until_processed = stream->pending_read_blocknum;
300 [ + + ]: 1617 : if (stream->pinned_buffers > 0)
350 andres@anarazel.de 301 : 25 : flags |= READ_BUFFERS_ISSUE_ADVICE;
302 : : }
303 : : }
304 : :
305 : : /*
306 : : * How many more buffers is this backend allowed?
307 : : *
308 : : * Forwarded buffers are already pinned and map to the leading blocks of
309 : : * the pending read (the remaining portion of an earlier short read that
310 : : * we're about to continue). They are not counted in pinned_buffers, but
311 : : * they are counted as pins already held by this backend according to the
312 : : * buffer manager, so they must be added to the limit it grants us.
313 : : */
366 tmunro@postgresql.or 314 [ + + ]: 1842097 : if (stream->temporary)
315 [ + - ]: 12096 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
316 : : else
317 [ + - ]: 1830001 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
359 318 [ - + ]: 1842097 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
319 : :
320 : 1842097 : buffer_limit += stream->forwarded_buffers;
342 andres@anarazel.de 321 : 1842097 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
322 : :
366 tmunro@postgresql.or 323 [ + + + - ]: 1842097 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
324 : 671676 : buffer_limit = 1; /* guarantee progress */
325 : :
326 : : /* Does the per-backend limit affect this read? */
327 : 1842097 : nblocks = stream->pending_read_nblocks;
328 [ + + ]: 1842097 : if (buffer_limit < nblocks)
329 : : {
330 : : int16 new_distance;
331 : :
332 : : /* Shrink distance: no more look-ahead until buffers are released. */
333 : 2533 : new_distance = stream->pinned_buffers + buffer_limit;
334 [ + + ]: 2533 : if (stream->distance > new_distance)
335 : 1743 : stream->distance = new_distance;
336 : :
337 : : /* Unless we have nothing to give the consumer, stop here. */
338 [ + + ]: 2533 : if (stream->pinned_buffers > 0)
339 : 1255 : return false;
340 : :
341 : : /* A short read is required to make progress. */
342 : 1278 : nblocks = buffer_limit;
343 : : }
344 : :
345 : : /*
346 : : * We say how many blocks we want to read, but it may be smaller on return
347 : : * if the buffer manager decides to shorten the read. Initialize buffers
348 : : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
349 : : * and keep the original nblocks number so we can check for forwarded
350 : : * buffers as output, below.
351 : : */
711 352 : 1840842 : buffer_index = stream->next_buffer_index;
353 : 1840842 : io_index = stream->next_io_index;
359 354 [ + + ]: 3108527 : while (stream->initialized_buffers < buffer_index + nblocks)
355 : 1267685 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
356 : 1840842 : requested_nblocks = nblocks;
711 357 : 1840842 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
358 : 1840842 : &stream->buffers[buffer_index],
359 : : stream->pending_read_blocknum,
360 : : &nblocks,
361 : : flags);
362 : 1840836 : stream->pinned_buffers += nblocks;
363 : :
364 : : /* Remember whether we need to wait before returning this buffer. */
365 [ + + ]: 1840836 : if (!need_wait)
366 : : {
367 : : /* Look-ahead distance decays, no I/O necessary. */
368 [ + + ]: 1228268 : if (stream->distance > 1)
369 : 18653 : stream->distance--;
370 : : }
371 : : else
372 : : {
373 : : /*
374 : : * Remember to call WaitReadBuffers() before returning head buffer.
375 : : * Look-ahead distance will be adjusted after waiting.
376 : : */
377 : 612568 : stream->ios[io_index].buffer_index = buffer_index;
378 [ + + ]: 612568 : if (++stream->next_io_index == stream->max_ios)
379 : 25309 : stream->next_io_index = 0;
380 [ - + ]: 612568 : Assert(stream->ios_in_progress < stream->max_ios);
381 : 612568 : stream->ios_in_progress++;
382 : 612568 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
383 : : }
384 : :
385 : : /*
386 : : * How many pins were acquired but forwarded to the next call? These need
387 : : * to be passed to the next StartReadBuffers() call by leaving them
388 : : * exactly where they are in the queue, or released if the stream ends
389 : : * early. We need the number for accounting purposes, since they are not
390 : : * counted in stream->pinned_buffers but we already hold them.
391 : : */
359 392 : 1840836 : forwarded = 0;
393 [ + + ]: 1842508 : while (nblocks + forwarded < requested_nblocks &&
394 [ + + ]: 56464 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
395 : 1672 : forwarded++;
396 : 1840836 : stream->forwarded_buffers = forwarded;
397 : :
398 : : /*
399 : : * We gave a contiguous range of buffer space to StartReadBuffers(), but
400 : : * we want it to wrap around at queue_size. Copy overflowing buffers to
401 : : * the front of the array where they'll be consumed, but also leave a copy
402 : : * in the overflow zone which the I/O operation has a pointer to (it needs
403 : : * a contiguous array). Both copies will be cleared when the buffers are
404 : : * handed to the consumer.
405 : : */
406 : 1840836 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
711 407 [ + + ]: 1840836 : if (overflow > 0)
408 : : {
359 409 [ - + ]: 288 : Assert(overflow < stream->queue_size); /* can't overlap */
410 : 288 : memcpy(&stream->buffers[0],
411 : 288 : &stream->buffers[stream->queue_size],
412 : : sizeof(stream->buffers[0]) * overflow);
413 : : }
414 : :
415 : : /* Compute location of start of next read, without using % operator. */
711 416 : 1840836 : buffer_index += nblocks;
417 [ + + ]: 1840836 : if (buffer_index >= stream->queue_size)
418 : 312629 : buffer_index -= stream->queue_size;
419 [ + - - + ]: 1840836 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
420 : 1840836 : stream->next_buffer_index = buffer_index;
421 : :
422 : : /* Adjust the pending read to cover the remaining portion, if any. */
423 : 1840836 : stream->pending_read_blocknum += nblocks;
424 : 1840836 : stream->pending_read_nblocks -= nblocks;
425 : :
366 426 : 1840836 : return true;
427 : : }
428 : :
429 : : static void
365 430 : 3568949 : read_stream_look_ahead(ReadStream *stream)
431 : : {
432 : : /*
433 : : * Allow amortizing the cost of submitting IO over multiple IOs. This
434 : : * requires that we don't do any operations that could lead to a deadlock
435 : : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
436 : : * careful.
437 : : */
350 andres@anarazel.de 438 [ + + ]: 3568949 : if (stream->batch_mode)
439 : 3477951 : pgaio_enter_batchmode();
440 : :
711 tmunro@postgresql.or 441 [ + - ]: 5588170 : while (stream->ios_in_progress < stream->max_ios &&
442 [ + + ]: 5588170 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
443 : : {
444 : : BlockNumber blocknum;
445 : : int16 buffer_index;
446 : : void *per_buffer_data;
447 : :
367 448 [ + + ]: 3473974 : if (stream->pending_read_nblocks == stream->io_combine_limit)
449 : : {
365 450 : 4285 : read_stream_start_pending_read(stream);
711 451 : 4285 : continue;
452 : : }
453 : :
454 : : /*
455 : : * See which block the callback wants next in the stream. We need to
456 : : * compute the index of the Nth block of the pending read including
457 : : * wrap-around, but we don't want to use the expensive % operator.
458 : : */
459 : 3469689 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
460 [ + + ]: 3469689 : if (buffer_index >= stream->queue_size)
461 : 1698 : buffer_index -= stream->queue_size;
462 [ + - - + ]: 3469689 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
463 : 3469689 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
464 : 3469689 : blocknum = read_stream_get_block(stream, per_buffer_data);
465 [ + + ]: 3469689 : if (blocknum == InvalidBlockNumber)
466 : : {
467 : : /* End of stream. */
468 : 1454753 : stream->distance = 0;
469 : 1454753 : break;
470 : : }
471 : :
472 : : /* Can we merge it with the pending read? */
473 [ + + ]: 2014936 : if (stream->pending_read_nblocks > 0 &&
474 [ + + ]: 230577 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
475 : : {
476 : 230542 : stream->pending_read_nblocks++;
477 : 230542 : continue;
478 : : }
479 : :
480 : : /* We have to start the pending read before we can build another. */
707 481 [ + + ]: 1784432 : while (stream->pending_read_nblocks > 0)
482 : : {
365 483 [ + - ]: 38 : if (!read_stream_start_pending_read(stream) ||
366 484 [ - + ]: 38 : stream->ios_in_progress == stream->max_ios)
485 : : {
486 : : /* We've hit the buffer or I/O limit. Rewind and stop here. */
711 tmunro@postgresql.or 487 :UBC 0 : read_stream_unget_block(stream, blocknum);
350 andres@anarazel.de 488 [ # # ]: 0 : if (stream->batch_mode)
489 : 0 : pgaio_exit_batchmode();
711 tmunro@postgresql.or 490 : 0 : return;
491 : : }
492 : : }
493 : :
494 : : /* This is the start of a new pending read. */
711 tmunro@postgresql.or 495 :CBC 1784394 : stream->pending_read_blocknum = blocknum;
496 : 1784394 : stream->pending_read_nblocks = 1;
497 : : }
498 : :
499 : : /*
500 : : * We don't start the pending read just because we've hit the distance
501 : : * limit, preferring to give it another chance to grow to full
502 : : * io_combine_limit size once more buffers have been consumed. However,
503 : : * if we've already reached io_combine_limit, or we've reached the
504 : : * distance limit and there isn't anything pinned yet, or the callback has
505 : : * signaled end-of-stream, we start the read immediately. Note that the
506 : : * pending read can exceed the distance goal, if the latter was reduced
507 : : * after hitting the per-backend buffer limit.
508 : : */
509 [ + + ]: 3568949 : if (stream->pending_read_nblocks > 0 &&
367 510 [ + + ]: 1879328 : (stream->pending_read_nblocks == stream->io_combine_limit ||
366 511 [ + + ]: 1870796 : (stream->pending_read_nblocks >= stream->distance &&
711 512 [ + + ]: 1829242 : stream->pinned_buffers == 0) ||
513 [ + + ]: 47791 : stream->distance == 0) &&
514 [ + - ]: 1837774 : stream->ios_in_progress < stream->max_ios)
365 515 : 1837774 : read_stream_start_pending_read(stream);
516 : :
517 : : /*
518 : : * There should always be something pinned when we leave this function,
519 : : * whether started by this call or not, unless we've hit the end of the
520 : : * stream. In the worst case we can always make progress one buffer at a
521 : : * time.
522 : : */
366 523 [ + + - + ]: 3568943 : Assert(stream->pinned_buffers > 0 || stream->distance == 0);
524 : :
350 andres@anarazel.de 525 [ + + ]: 3568943 : if (stream->batch_mode)
526 : 3477945 : pgaio_exit_batchmode();
527 : : }
528 : :
529 : : /*
530 : : * Create a new read stream object that can be used to perform the equivalent
531 : : * of a series of ReadBuffer() calls for one fork of one relation.
532 : : * Internally, it generates larger vectored reads where possible by looking
533 : : * ahead. The callback should return block numbers or InvalidBlockNumber to
534 : : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
535 : : * write extra data for each block into the space provided to it. It will
536 : : * also receive callback_private_data for its own purposes.
537 : : */
538 : : static ReadStream *
603 noah@leadboat.com 539 : 779068 : read_stream_begin_impl(int flags,
540 : : BufferAccessStrategy strategy,
541 : : Relation rel,
542 : : SMgrRelation smgr,
543 : : char persistence,
544 : : ForkNumber forknum,
545 : : ReadStreamBlockNumberCB callback,
546 : : void *callback_private_data,
547 : : size_t per_buffer_data_size)
548 : : {
549 : : ReadStream *stream;
550 : : size_t size;
551 : : int16 queue_size;
552 : : int16 queue_overflow;
553 : : int max_ios;
554 : : int strategy_pin_limit;
555 : : uint32 max_pinned_buffers;
556 : : uint32 max_possible_buffer_limit;
557 : : Oid tablespace_id;
558 : :
559 : : /*
560 : : * Decide how many I/Os we will allow to run at the same time. That
561 : : * currently means advice to the kernel to tell it that we will soon read.
562 : : * This number also affects how far we look ahead for opportunities to
563 : : * start more I/Os.
564 : : */
711 tmunro@postgresql.or 565 : 779068 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
566 [ + + + + ]: 779068 : if (!OidIsValid(MyDatabaseId) ||
603 noah@leadboat.com 567 [ + + + + ]: 1159646 : (rel && IsCatalogRelation(rel)) ||
711 tmunro@postgresql.or 568 : 452982 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
569 : : {
570 : : /*
571 : : * Avoid circularity while trying to look up tablespace settings or
572 : : * before spccache.c is ready.
573 : : */
574 : 385533 : max_ios = effective_io_concurrency;
575 : : }
576 [ + + ]: 393535 : else if (flags & READ_STREAM_MAINTENANCE)
577 : 6793 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
578 : : else
579 : 386742 : max_ios = get_tablespace_io_concurrency(tablespace_id);
580 : :
581 : : /* Cap to INT16_MAX to avoid overflowing below */
582 : 779068 : max_ios = Min(max_ios, PG_INT16_MAX);
583 : :
584 : : /*
585 : : * If starting a multi-block I/O near the end of the queue, we might
586 : : * temporarily need extra space for overflowing buffers before they are
587 : : * moved to regular circular position. This is the maximum extra space we
588 : : * could need.
589 : : */
367 590 : 779068 : queue_overflow = io_combine_limit - 1;
591 : :
592 : : /*
593 : : * Choose the maximum number of buffers we're prepared to pin. We try to
594 : : * pin fewer if we can, though. We add one so that we can make progress
595 : : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
596 : : * this also allows an extra full I/O's worth of buffers: after an I/O
597 : : * finishes we don't want to have to wait for its buffers to be consumed
598 : : * before starting a new one.
599 : : *
600 : : * Be careful not to allow int16 to overflow. That is possible with the
601 : : * current GUC range limits, so this is an artificial limit of ~32k
602 : : * buffers and we'd need to adjust the types to exceed that. We also have
603 : : * to allow for the spare entry and the overflow space.
604 : : */
381 605 : 779068 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
711 606 : 779068 : max_pinned_buffers = Min(max_pinned_buffers,
607 : : PG_INT16_MAX - queue_overflow - 1);
608 : :
609 : : /* Give the strategy a chance to limit the number of buffers we pin. */
708 610 : 779068 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
611 : 779068 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
612 : :
613 : : /*
614 : : * Also limit our queue to the maximum number of pins we could ever be
615 : : * allowed to acquire according to the buffer manager. We may not really
616 : : * be able to use them all due to other pins held by this backend, but
617 : : * we'll check that later in read_stream_start_pending_read().
618 : : */
711 619 [ + + ]: 779068 : if (SmgrIsTemp(smgr))
366 620 : 6970 : max_possible_buffer_limit = GetLocalPinLimit();
621 : : else
622 : 772098 : max_possible_buffer_limit = GetPinLimit();
623 : 779068 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
624 : :
625 : : /*
626 : : * The limit might be zero on a system configured with too few buffers for
627 : : * the number of connections. We need at least one to make progress.
628 : : */
629 [ + + ]: 779068 : max_pinned_buffers = Max(1, max_pinned_buffers);
630 : :
631 : : /*
632 : : * We need one extra entry for buffers and per-buffer data, because users
633 : : * of per-buffer data have access to the object until the next call to
634 : : * read_stream_next_buffer(), so we need a gap between the head and tail
635 : : * of the queue so that we don't clobber it.
636 : : */
711 637 : 779068 : queue_size = max_pinned_buffers + 1;
638 : :
639 : : /*
640 : : * Allocate the object, the buffers, the ios and per_buffer_data space in
641 : : * one big chunk. Though we have queue_size buffers, we want to be able
642 : : * to assume that all the buffers for a single read are contiguous (i.e.
643 : : * don't wrap around halfway through), so we allow temporary overflows of
644 : : * up to the maximum possible overflow size.
645 : : */
646 : 779068 : size = offsetof(ReadStream, buffers);
367 647 : 779068 : size += sizeof(Buffer) * (queue_size + queue_overflow);
711 648 [ + + ]: 779068 : size += sizeof(InProgressIO) * Max(1, max_ios);
649 : 779068 : size += per_buffer_data_size * queue_size;
650 : 779068 : size += MAXIMUM_ALIGNOF * 2;
651 : 779068 : stream = (ReadStream *) palloc(size);
652 : 779068 : memset(stream, 0, offsetof(ReadStream, buffers));
653 : 779068 : stream->ios = (InProgressIO *)
367 654 : 779068 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
711 655 [ + + ]: 779068 : if (per_buffer_data_size > 0)
656 : 25155 : stream->per_buffer_data = (void *)
657 [ + + ]: 25155 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
658 : :
350 andres@anarazel.de 659 : 779068 : stream->sync_mode = io_method == IOMETHOD_SYNC;
660 : 779068 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
661 : :
662 : : #ifdef USE_PREFETCH
663 : :
664 : : /*
665 : : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
666 : : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
667 : : * caller hasn't promised sequential access (overriding our detection
668 : : * heuristics), and max_ios hasn't been set to zero.
669 : : */
670 [ + + ]: 779068 : if (stream->sync_mode &&
671 [ + - ]: 2886 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
711 tmunro@postgresql.or 672 [ + + + - ]: 2886 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
673 : : max_ios > 0)
674 : 692 : stream->advice_enabled = true;
675 : : #endif
676 : :
677 : : /*
678 : : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
679 : : * we still need to allocate space to combine and run one I/O. Bump it up
680 : : * to one, and remember to ask for synchronous I/O only.
681 : : */
682 [ + + ]: 779068 : if (max_ios == 0)
683 : : {
684 : 7 : max_ios = 1;
350 andres@anarazel.de 685 : 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
686 : : }
687 : :
688 : : /*
689 : : * Capture stable values for these two GUC-derived numbers for the
690 : : * lifetime of this stream, so we don't have to worry about the GUCs
691 : : * changing underneath us beyond this point.
692 : : */
711 tmunro@postgresql.or 693 : 779068 : stream->max_ios = max_ios;
367 694 : 779068 : stream->io_combine_limit = io_combine_limit;
695 : :
711 696 : 779068 : stream->per_buffer_data_size = per_buffer_data_size;
697 : 779068 : stream->max_pinned_buffers = max_pinned_buffers;
698 : 779068 : stream->queue_size = queue_size;
699 : 779068 : stream->callback = callback;
700 : 779068 : stream->callback_private_data = callback_private_data;
561 701 : 779068 : stream->buffered_blocknum = InvalidBlockNumber;
365 702 : 779068 : stream->seq_blocknum = InvalidBlockNumber;
703 : 779068 : stream->seq_until_processed = InvalidBlockNumber;
366 704 : 779068 : stream->temporary = SmgrIsTemp(smgr);
705 : :
706 : : /*
707 : : * Skip the initial ramp-up phase if the caller says we're going to be
708 : : * reading the whole relation. This way we start out assuming we'll be
709 : : * doing full io_combine_limit sized reads.
710 : : */
711 711 [ + + ]: 779068 : if (flags & READ_STREAM_FULL)
367 712 : 70037 : stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
713 : : else
711 714 : 709031 : stream->distance = 1;
12 melanieplageman@gmai 715 :GNC 779068 : stream->resume_distance = stream->distance;
716 : :
717 : : /*
718 : : * Since we always access the same relation, we can initialize parts of
719 : : * the ReadBuffersOperation objects and leave them that way, to avoid
720 : : * wasting CPU cycles writing to them for each read.
721 : : */
711 tmunro@postgresql.or 722 [ + + ]:CBC 13265607 : for (int i = 0; i < max_ios; ++i)
723 : : {
724 : 12486539 : stream->ios[i].op.rel = rel;
603 noah@leadboat.com 725 : 12486539 : stream->ios[i].op.smgr = smgr;
726 : 12486539 : stream->ios[i].op.persistence = persistence;
711 tmunro@postgresql.or 727 : 12486539 : stream->ios[i].op.forknum = forknum;
728 : 12486539 : stream->ios[i].op.strategy = strategy;
729 : : }
730 : :
731 : 779068 : return stream;
732 : : }
733 : :
734 : : /*
735 : : * Create a new read stream for reading a relation.
736 : : * See read_stream_begin_impl() for the detailed explanation.
737 : : */
738 : : ReadStream *
603 noah@leadboat.com 739 : 715322 : read_stream_begin_relation(int flags,
740 : : BufferAccessStrategy strategy,
741 : : Relation rel,
742 : : ForkNumber forknum,
743 : : ReadStreamBlockNumberCB callback,
744 : : void *callback_private_data,
745 : : size_t per_buffer_data_size)
746 : : {
747 : 715322 : return read_stream_begin_impl(flags,
748 : : strategy,
749 : : rel,
750 : : RelationGetSmgr(rel),
751 : 715322 : rel->rd_rel->relpersistence,
752 : : forknum,
753 : : callback,
754 : : callback_private_data,
755 : : per_buffer_data_size);
756 : : }
757 : :
758 : : /*
759 : : * Create a new read stream for reading a SMgr relation.
760 : : * See read_stream_begin_impl() for the detailed explanation.
761 : : */
762 : : ReadStream *
763 : 63746 : read_stream_begin_smgr_relation(int flags,
764 : : BufferAccessStrategy strategy,
765 : : SMgrRelation smgr,
766 : : char smgr_persistence,
767 : : ForkNumber forknum,
768 : : ReadStreamBlockNumberCB callback,
769 : : void *callback_private_data,
770 : : size_t per_buffer_data_size)
771 : : {
772 : 63746 : return read_stream_begin_impl(flags,
773 : : strategy,
774 : : NULL,
775 : : smgr,
776 : : smgr_persistence,
777 : : forknum,
778 : : callback,
779 : : callback_private_data,
780 : : per_buffer_data_size);
781 : : }
782 : :
783 : : /*
784 : : * Pull one pinned buffer out of a stream. Each call returns successive
785 : : * blocks in the order specified by the callback. If per_buffer_data_size was
786 : : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
787 : : * per-buffer data that the callback had a chance to populate, which remains
788 : : * valid until the next call to read_stream_next_buffer(). When the stream
789 : : * runs out of data, InvalidBuffer is returned. The caller may decide to end
790 : : * the stream early at any time by calling read_stream_end().
791 : : */
792 : : Buffer
711 tmunro@postgresql.or 793 : 6773670 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
794 : : {
795 : : Buffer buffer;
796 : : int16 oldest_buffer_index;
797 : :
798 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
799 : :
800 : : /*
801 : : * A fast path for all-cached scans. This is the same as the usual
802 : : * algorithm, but it is specialized for no I/O and no per-buffer data, so
803 : : * we can skip the queue management code, stay in the same buffer slot and
804 : : * use singular StartReadBuffer().
805 : : */
806 [ + + ]: 6773670 : if (likely(stream->fast_path))
807 : : {
808 : : BlockNumber next_blocknum;
809 : :
810 : : /* Fast path assumptions. */
811 [ - + ]: 2028048 : Assert(stream->ios_in_progress == 0);
359 812 [ - + ]: 2028048 : Assert(stream->forwarded_buffers == 0);
711 813 [ - + ]: 2028048 : Assert(stream->pinned_buffers == 1);
814 [ - + ]: 2028048 : Assert(stream->distance == 1);
708 815 [ - + ]: 2028048 : Assert(stream->pending_read_nblocks == 0);
711 816 [ - + ]: 2028048 : Assert(stream->per_buffer_data_size == 0);
359 817 [ - + ]: 2028048 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
818 : :
819 : : /* We're going to return the buffer we pinned last time. */
711 820 : 2028048 : oldest_buffer_index = stream->oldest_buffer_index;
821 [ - + ]: 2028048 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
822 : : stream->next_buffer_index);
823 : 2028048 : buffer = stream->buffers[oldest_buffer_index];
824 [ - + ]: 2028048 : Assert(buffer != InvalidBuffer);
825 : :
826 : : /* Choose the next block to pin. */
561 827 : 2028048 : next_blocknum = read_stream_get_block(stream, NULL);
828 : :
708 829 [ + + ]: 2028048 : if (likely(next_blocknum != InvalidBlockNumber))
830 : : {
350 andres@anarazel.de 831 : 1941764 : int flags = stream->read_buffers_flags;
832 : :
833 [ + + ]: 1941764 : if (stream->advice_enabled)
834 : 545 : flags |= READ_BUFFERS_ISSUE_ADVICE;
835 : :
836 : : /*
837 : : * Pin a buffer for the next call. Same buffer entry, and
838 : : * arbitrary I/O entry (they're all free). We don't have to
839 : : * adjust pinned_buffers because we're transferring one to caller
840 : : * but pinning one more.
841 : : *
842 : : * In the fast path we don't need to check the pin limit. We're
843 : : * always allowed at least one pin so that progress can be made,
844 : : * and that's all we need here. Although two pins are momentarily
845 : : * held at the same time, the model used here is that the stream
846 : : * holds only one, and the other now belongs to the caller.
847 : : */
708 tmunro@postgresql.or 848 [ + + ]: 1941764 : if (likely(!StartReadBuffer(&stream->ios[0].op,
849 : : &stream->buffers[oldest_buffer_index],
850 : : next_blocknum,
851 : : flags)))
852 : : {
853 : : /* Fast return. */
854 : 1926313 : return buffer;
855 : : }
856 : :
857 : : /* Next call must wait for I/O for the newly pinned buffer. */
711 858 : 15451 : stream->oldest_io_index = 0;
859 : 15451 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
860 : 15451 : stream->ios_in_progress = 1;
861 : 15451 : stream->ios[0].buffer_index = oldest_buffer_index;
862 : 15451 : stream->seq_blocknum = next_blocknum + 1;
863 : : }
864 : : else
865 : : {
866 : : /* No more blocks, end of stream. */
708 867 : 86284 : stream->distance = 0;
868 : 86284 : stream->oldest_buffer_index = stream->next_buffer_index;
869 : 86284 : stream->pinned_buffers = 0;
359 870 : 86284 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
871 : : }
872 : :
708 873 : 101735 : stream->fast_path = false;
711 874 : 101735 : return buffer;
875 : : }
876 : : #endif
877 : :
878 [ + + ]: 4745622 : if (unlikely(stream->pinned_buffers == 0))
879 : : {
880 [ - + ]: 3698507 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
881 : :
882 : : /* End of stream reached? */
883 [ + + ]: 3698507 : if (stream->distance == 0)
884 : 2057175 : return InvalidBuffer;
885 : :
886 : : /*
887 : : * The usual order of operations is that we look ahead at the bottom
888 : : * of this function after potentially finishing an I/O and making
889 : : * space for more, but if we're just starting up we'll need to crank
890 : : * the handle to get started.
891 : : */
365 892 : 1641332 : read_stream_look_ahead(stream);
893 : :
894 : : /* End of stream reached? */
711 895 [ + + ]: 1641332 : if (stream->pinned_buffers == 0)
896 : : {
897 [ - + ]: 760798 : Assert(stream->distance == 0);
898 : 760798 : return InvalidBuffer;
899 : : }
900 : : }
901 : :
902 : : /* Grab the oldest pinned buffer and associated per-buffer data. */
903 [ - + ]: 1927649 : Assert(stream->pinned_buffers > 0);
904 : 1927649 : oldest_buffer_index = stream->oldest_buffer_index;
905 [ + - - + ]: 1927649 : Assert(oldest_buffer_index >= 0 &&
906 : : oldest_buffer_index < stream->queue_size);
907 : 1927649 : buffer = stream->buffers[oldest_buffer_index];
908 [ + + ]: 1927649 : if (per_buffer_data)
909 : 295361 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
910 : :
911 [ - + ]: 1927649 : Assert(BufferIsValid(buffer));
912 : :
913 : : /* Do we have to wait for an associated I/O first? */
914 [ + + ]: 1927649 : if (stream->ios_in_progress > 0 &&
915 [ + + ]: 718016 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
916 : : {
917 : 627719 : int16 io_index = stream->oldest_io_index;
918 : : int32 distance; /* wider temporary value, clamped below */
919 : :
920 : : /* Sanity check that we still agree on the buffers. */
921 [ - + ]: 627719 : Assert(stream->ios[io_index].op.buffers ==
922 : : &stream->buffers[oldest_buffer_index]);
923 : :
924 : 627719 : WaitReadBuffers(&stream->ios[io_index].op);
925 : :
926 [ - + ]: 627687 : Assert(stream->ios_in_progress > 0);
927 : 627687 : stream->ios_in_progress--;
928 [ + + ]: 627687 : if (++stream->oldest_io_index == stream->max_ios)
929 : 25309 : stream->oldest_io_index = 0;
930 : :
931 : : /* Look-ahead distance ramps up rapidly after we do I/O. */
364 932 : 627687 : distance = stream->distance * 2;
933 : 627687 : distance = Min(distance, stream->max_pinned_buffers);
934 : 627687 : stream->distance = distance;
935 : :
936 : : /*
937 : : * If we've reached the first block of a sequential region we're
938 : : * issuing advice for, cancel that until the next jump. The kernel
939 : : * will see the sequential preadv() pattern starting here.
940 : : */
365 941 [ + + ]: 627687 : if (stream->advice_enabled &&
942 [ + + ]: 274 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
943 : 252 : stream->seq_until_processed = InvalidBlockNumber;
944 : : }
945 : :
946 : : /*
947 : : * We must zap this queue entry, or else it would appear as a forwarded
948 : : * buffer. If it's potentially in the overflow zone (ie from a
949 : : * multi-block I/O that wrapped around the queue), also zap the copy.
950 : : */
711 951 : 1927617 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
359 952 [ + + ]: 1927617 : if (oldest_buffer_index < stream->io_combine_limit - 1)
953 : 1584864 : stream->buffers[stream->queue_size + oldest_buffer_index] =
954 : : InvalidBuffer;
955 : :
956 : : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
957 : :
958 : : /*
959 : : * The caller will get access to the per-buffer data, until the next call.
960 : : * We wipe the one before, which is never occupied because queue_size
961 : : * allowed one extra element. This will hopefully trip up client code
962 : : * that is holding a dangling pointer to it.
963 : : */
711 964 [ + + ]: 1927617 : if (stream->per_buffer_data)
965 : : {
966 : : void *per_buffer_data;
967 : :
393 968 [ + + ]: 590750 : per_buffer_data = get_per_buffer_data(stream,
969 : : oldest_buffer_index == 0 ?
970 : 74249 : stream->queue_size - 1 :
971 : 221126 : oldest_buffer_index - 1);
972 : :
973 : : #if defined(CLOBBER_FREED_MEMORY)
974 : : /* This also tells Valgrind the memory is "noaccess". */
975 : 295375 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
976 : : #elif defined(USE_VALGRIND)
977 : : /* Tell it ourselves. */
978 : : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
979 : : stream->per_buffer_data_size);
980 : : #endif
981 : : }
982 : : #endif
983 : :
984 : : /* Pin transferred to caller. */
711 985 [ - + ]: 1927617 : Assert(stream->pinned_buffers > 0);
986 : 1927617 : stream->pinned_buffers--;
987 : :
988 : : /* Advance oldest buffer, with wrap-around. */
989 : 1927617 : stream->oldest_buffer_index++;
990 [ + + ]: 1927617 : if (stream->oldest_buffer_index == stream->queue_size)
991 : 304780 : stream->oldest_buffer_index = 0;
992 : :
993 : : /* Prepare for the next call. */
365 994 : 1927617 : read_stream_look_ahead(stream);
995 : :
996 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
997 : : /* See if we can take the fast path for all-cached scans next time. */
711 998 [ + + ]: 1927611 : if (stream->ios_in_progress == 0 &&
359 999 [ + + ]: 1300627 : stream->forwarded_buffers == 0 &&
711 1000 [ + + ]: 1300225 : stream->pinned_buffers == 1 &&
1001 [ + + ]: 445773 : stream->distance == 1 &&
708 1002 [ + + ]: 372360 : stream->pending_read_nblocks == 0 &&
711 1003 [ + + ]: 370912 : stream->per_buffer_data_size == 0)
1004 : : {
1005 : : /*
1006 : : * The fast path spins on one buffer entry repeatedly instead of
1007 : : * rotating through the whole queue and clearing the entries behind
1008 : : * it. If the buffer it starts with happened to be forwarded between
1009 : : * StartReadBuffers() calls and also wrapped around the circular queue
1010 : : * partway through, then a copy also exists in the overflow zone, and
1011 : : * it won't clear it out as the regular path would. Do that now, so
1012 : : * it doesn't need code for that.
1013 : : */
218 1014 [ + + ]: 195510 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1015 : 194010 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1016 : : InvalidBuffer;
1017 : :
711 1018 : 195510 : stream->fast_path = true;
1019 : : }
1020 : : #endif
1021 : :
1022 : 1927611 : return buffer;
1023 : : }
1024 : :
1025 : : /*
1026 : : * Transitional support for code that would like to perform or skip reads
1027 : : * itself, without using the stream. Returns, and consumes, the next block
1028 : : * number that would be read by the stream's look-ahead algorithm, or
1029 : : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1030 : : * strategy that would be used to read it.
1031 : : */
1032 : : BlockNumber
543 tmunro@postgresql.or 1033 :UBC 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1034 : : {
1035 : 0 : *strategy = stream->ios[0].op.strategy;
1036 : 0 : return read_stream_get_block(stream, NULL);
1037 : : }
1038 : :
1039 : : /*
1040 : : * Temporarily stop consuming block numbers from the block number callback.
1041 : : * If called inside the block number callback, its return value should be
1042 : : * returned by the callback.
1043 : : */
1044 : : BlockNumber
12 melanieplageman@gmai 1045 :UNC 0 : read_stream_pause(ReadStream *stream)
1046 : : {
1047 : 0 : stream->resume_distance = stream->distance;
1048 : 0 : stream->distance = 0;
1049 : 0 : return InvalidBlockNumber;
1050 : : }
1051 : :
1052 : : /*
1053 : : * Resume looking ahead after the block number callback reported
1054 : : * end-of-stream. This is useful for streams of self-referential blocks, after
1055 : : * a buffer needed to be consumed and examined to find more block numbers.
1056 : : */
1057 : : void
1058 : 0 : read_stream_resume(ReadStream *stream)
1059 : : {
1060 : 0 : stream->distance = stream->resume_distance;
1061 : 0 : }
1062 : :
1063 : : /*
1064 : : * Reset a read stream by releasing any queued up buffers, allowing the stream
1065 : : * to be used again for different blocks. This can be used to clear an
1066 : : * end-of-stream condition and start again, or to throw away blocks that were
1067 : : * speculatively read and read some different blocks instead.
1068 : : */
1069 : : void
711 tmunro@postgresql.or 1070 :CBC 1641333 : read_stream_reset(ReadStream *stream)
1071 : : {
1072 : : int16 index;
1073 : : Buffer buffer;
1074 : :
1075 : : /* Stop looking ahead. */
1076 : 1641333 : stream->distance = 0;
1077 : :
1078 : : /* Forget buffered block number and fast path state. */
561 1079 : 1641333 : stream->buffered_blocknum = InvalidBlockNumber;
708 1080 : 1641333 : stream->fast_path = false;
1081 : :
1082 : : /* Unpin anything that wasn't consumed. */
711 1083 [ + + ]: 1763803 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1084 : 122470 : ReleaseBuffer(buffer);
1085 : :
1086 : : /* Unpin any unused forwarded buffers. */
359 1087 : 1641333 : index = stream->next_buffer_index;
1088 [ + + ]: 1641333 : while (index < stream->initialized_buffers &&
1089 [ - + ]: 214664 : (buffer = stream->buffers[index]) != InvalidBuffer)
1090 : : {
359 tmunro@postgresql.or 1091 [ # # ]:UBC 0 : Assert(stream->forwarded_buffers > 0);
1092 : 0 : stream->forwarded_buffers--;
1093 : 0 : ReleaseBuffer(buffer);
1094 : :
1095 : 0 : stream->buffers[index] = InvalidBuffer;
1096 [ # # ]: 0 : if (index < stream->io_combine_limit - 1)
1097 : 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1098 : :
1099 [ # # ]: 0 : if (++index == stream->queue_size)
1100 : 0 : index = 0;
1101 : : }
1102 : :
359 tmunro@postgresql.or 1103 [ - + ]:CBC 1641333 : Assert(stream->forwarded_buffers == 0);
711 1104 [ - + ]: 1641333 : Assert(stream->pinned_buffers == 0);
1105 [ - + ]: 1641333 : Assert(stream->ios_in_progress == 0);
1106 : :
1107 : : /* Start off assuming data is cached. */
1108 : 1641333 : stream->distance = 1;
12 melanieplageman@gmai 1109 :GNC 1641333 : stream->resume_distance = stream->distance;
711 tmunro@postgresql.or 1110 :CBC 1641333 : }
1111 : :
1112 : : /*
1113 : : * Release and free a read stream.
1114 : : */
1115 : : void
1116 : 776648 : read_stream_end(ReadStream *stream)
1117 : : {
1118 : 776648 : read_stream_reset(stream);
1119 : 776648 : pfree(stream);
1120 : 776648 : }
|