Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * read_stream.c
4 : : * Mechanism for accessing buffered relation data with look-ahead
5 : : *
6 : : * Code that needs to access relation data typically pins blocks one at a
7 : : * time, often in a predictable order that might be sequential or data-driven.
8 : : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : : * because blocks that are not yet in the buffer pool require I/O operations
10 : : * that are small and might stall waiting for storage. This mechanism looks
11 : : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : : * distance.
14 : : *
15 : : * A user-provided callback generates a stream of block numbers that is used
16 : : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : : * pending read. When that isn't possible, the existing pending read is sent
18 : : * to StartReadBuffers() so that a new one can begin to form.
19 : : *
20 : : * The algorithm for controlling the look-ahead distance is based on recent
21 : : * cache hit and miss history. When no I/O is necessary, there is no benefit
22 : : * in looking ahead more than one block. This is the default initial
23 : : * assumption, but when blocks needing I/O are streamed, the distance is
24 : : * increased rapidly to try to benefit from I/O combining and concurrency. It
25 : : * is reduced gradually when cached blocks are streamed.
26 : : *
27 : : * The main data structure is a circular queue of buffers of size
28 : : * max_pinned_buffers plus some extra space for technical reasons, ready to be
29 : : * returned by read_stream_next_buffer(). Each buffer also has an optional
30 : : * variable sized object that is passed from the callback to the consumer of
31 : : * buffers.
32 : : *
33 : : * Parallel to the queue of buffers, there is a circular queue of in-progress
34 : : * I/Os that have been started with StartReadBuffers(), and for which
35 : : * WaitReadBuffers() must be called before returning the buffer.
36 : : *
37 : : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
38 : : * successive calls, then these data structures might appear as follows:
39 : : *
40 : : * buffers buf/data ios
41 : : *
42 : : * +----+ +-----+ +--------+
43 : : * | | | | +----+ 42..44 | <- oldest_io_index
44 : : * +----+ +-----+ | +--------+
45 : : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
46 : : * +----+ +-----+ | | +--------+
47 : : * | 42 | | ? |<-+ | | | <- next_io_index
48 : : * +----+ +-----+ | +--------+
49 : : * | 43 | | ? | | | |
50 : : * +----+ +-----+ | +--------+
51 : : * | 44 | | ? | | | |
52 : : * +----+ +-----+ | +--------+
53 : : * | 60 | | ? |<---+
54 : : * +----+ +-----+
55 : : * next_buffer_index -> | | | |
56 : : * +----+ +-----+
57 : : *
58 : : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
59 : : * the client is block 10. Block 10 was a hit and has no associated I/O, but
60 : : * the range 42..44 requires an I/O wait before its buffers are returned, as
61 : : * does block 60.
62 : : *
63 : : *
64 : : * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
65 : : * Portions Copyright (c) 1994, Regents of the University of California
66 : : *
67 : : * IDENTIFICATION
68 : : * src/backend/storage/aio/read_stream.c
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include "miscadmin.h"
75 : : #include "storage/aio.h"
76 : : #include "storage/fd.h"
77 : : #include "storage/smgr.h"
78 : : #include "storage/read_stream.h"
79 : : #include "utils/memdebug.h"
80 : : #include "utils/rel.h"
81 : : #include "utils/spccache.h"
82 : :
83 : : typedef struct InProgressIO
84 : : {
85 : : int16 buffer_index;
86 : : ReadBuffersOperation op;
87 : : } InProgressIO;
88 : :
89 : : /*
90 : : * State for managing a stream of reads.
91 : : */
92 : : struct ReadStream
93 : : {
94 : : int16 max_ios;
95 : : int16 io_combine_limit;
96 : : int16 ios_in_progress;
97 : : int16 queue_size;
98 : : int16 max_pinned_buffers;
99 : : int16 forwarded_buffers;
100 : : int16 pinned_buffers;
101 : : int16 distance;
102 : : int16 initialized_buffers;
103 : : int read_buffers_flags;
104 : : bool sync_mode; /* using io_method=sync */
105 : : bool batch_mode; /* READ_STREAM_USE_BATCHING */
106 : : bool advice_enabled;
107 : : bool temporary;
108 : :
109 : : /*
110 : : * One-block buffer to support 'ungetting' a block number, to resolve flow
111 : : * control problems when I/Os are split.
112 : : */
113 : : BlockNumber buffered_blocknum;
114 : :
115 : : /*
116 : : * The callback that will tell us which block numbers to read, and an
117 : : * opaque pointer that will be pass to it for its own purposes.
118 : : */
119 : : ReadStreamBlockNumberCB callback;
120 : : void *callback_private_data;
121 : :
122 : : /* Next expected block, for detecting sequential access. */
123 : : BlockNumber seq_blocknum;
124 : : BlockNumber seq_until_processed;
125 : :
126 : : /* The read operation we are currently preparing. */
127 : : BlockNumber pending_read_blocknum;
128 : : int16 pending_read_nblocks;
129 : :
130 : : /* Space for buffers and optional per-buffer private data. */
131 : : size_t per_buffer_data_size;
132 : : void *per_buffer_data;
133 : :
134 : : /* Read operations that have been started but not waited for yet. */
135 : : InProgressIO *ios;
136 : : int16 oldest_io_index;
137 : : int16 next_io_index;
138 : :
139 : : bool fast_path;
140 : :
141 : : /* Circular queue of buffers. */
142 : : int16 oldest_buffer_index; /* Next pinned buffer to return */
143 : : int16 next_buffer_index; /* Index of next buffer to pin */
144 : : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
145 : : };
146 : :
147 : : /*
148 : : * Return a pointer to the per-buffer data by index.
149 : : */
150 : : static inline void *
521 tmunro@postgresql.or 151 :CBC 3040952 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
152 : : {
153 : 6081904 : return (char *) stream->per_buffer_data +
154 : 3040952 : stream->per_buffer_data_size * buffer_index;
155 : : }
156 : :
157 : : /*
158 : : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
159 : : * blocks [current_blocknum, last_exclusive).
160 : : */
161 : : BlockNumber
368 noah@leadboat.com 162 : 334631 : block_range_read_stream_cb(ReadStream *stream,
163 : : void *callback_private_data,
164 : : void *per_buffer_data)
165 : : {
166 : 334631 : BlockRangeReadStreamPrivate *p = callback_private_data;
167 : :
168 [ + + ]: 334631 : if (p->current_blocknum < p->last_exclusive)
169 : 270277 : return p->current_blocknum++;
170 : :
171 : 64354 : return InvalidBlockNumber;
172 : : }
173 : :
174 : : /*
175 : : * Ask the callback which block it would like us to read next, with a one block
176 : : * buffer in front to allow read_stream_unget_block() to work.
177 : : */
178 : : static inline BlockNumber
521 tmunro@postgresql.or 179 : 4318544 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
180 : : {
181 : : BlockNumber blocknum;
182 : :
371 183 : 4318544 : blocknum = stream->buffered_blocknum;
184 [ - + ]: 4318544 : if (blocknum != InvalidBlockNumber)
371 tmunro@postgresql.or 185 :LBC (2) : stream->buffered_blocknum = InvalidBlockNumber;
186 : : else
187 : : {
188 : : /*
189 : : * Tell Valgrind that the per-buffer data is undefined. That replaces
190 : : * the "noaccess" state that was set when the consumer moved past this
191 : : * entry last time around the queue, and should also catch callbacks
192 : : * that fail to initialize data that the buffer consumer later
193 : : * accesses. On the first go around, it is undefined already.
194 : : */
195 : : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
196 : : stream->per_buffer_data_size);
371 tmunro@postgresql.or 197 :CBC 4318544 : blocknum = stream->callback(stream,
198 : : stream->callback_private_data,
199 : : per_buffer_data);
200 : : }
201 : :
202 : 4318544 : return blocknum;
203 : : }
204 : :
205 : : /*
206 : : * In order to deal with buffer shortages and I/O limits after short reads, we
207 : : * sometimes need to defer handling of a block we've already consumed from the
208 : : * registered callback until later.
209 : : */
210 : : static inline void
521 tmunro@postgresql.or 211 :LBC (2) : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
212 : : {
213 : : /* We shouldn't ever unget more than one block. */
371 214 [ # # ]: (2) : Assert(stream->buffered_blocknum == InvalidBlockNumber);
215 [ # # ]: (2) : Assert(blocknum != InvalidBlockNumber);
216 : (2) : stream->buffered_blocknum = blocknum;
521 217 : (2) : }
218 : :
219 : : /*
220 : : * Start as much of the current pending read as we can. If we have to split it
221 : : * because of the per-backend buffer limit, or the buffer manager decides to
222 : : * split it, then the pending read is adjusted to hold the remaining portion.
223 : : *
224 : : * We can always start a read of at least size one if we have no progress yet.
225 : : * Otherwise it's possible that we can't start a read at all because of a lack
226 : : * of buffers, and then false is returned. Buffer shortages also reduce the
227 : : * distance to a level that prevents look-ahead until buffers are released.
228 : : */
229 : : static bool
175 tmunro@postgresql.or 230 :CBC 1441194 : read_stream_start_pending_read(ReadStream *stream)
231 : : {
232 : : bool need_wait;
233 : : int requested_nblocks;
234 : : int nblocks;
235 : : int flags;
236 : : int forwarded;
237 : : int16 io_index;
238 : : int16 overflow;
239 : : int16 buffer_index;
240 : : int buffer_limit;
241 : :
242 : : /* This should only be called with a pending read. */
521 243 [ - + ]: 1441194 : Assert(stream->pending_read_nblocks > 0);
177 244 [ - + ]: 1441194 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
245 : :
246 : : /* We had better not exceed the per-stream buffer limit with this read. */
521 247 [ - + ]: 1441194 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248 : : stream->max_pinned_buffers);
249 : :
250 : : #ifdef USE_ASSERT_CHECKING
251 : : /* We had better not be overwriting an existing pinned buffer. */
252 [ + + ]: 1441194 : if (stream->pinned_buffers > 0)
253 [ - + ]: 12104 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
254 : : else
255 [ - + ]: 1429090 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
256 : :
257 : : /*
258 : : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
259 : : * had to split the operation should match the leading blocks of this
260 : : * following StartReadBuffers() call.
261 : : */
28 262 [ - + ]: 1441194 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
263 [ + + ]: 1442536 : for (int i = 0; i < stream->forwarded_buffers; ++i)
264 [ - + ]: 1342 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
265 : : stream->pending_read_blocknum + i);
266 : :
267 : : /*
268 : : * Check that we've cleared the queue/overflow entries corresponding to
269 : : * the rest of the blocks covered by this read, unless it's the first go
270 : : * around and we haven't even initialized them yet.
271 : : */
272 [ + + ]: 3346438 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
273 [ + + - + ]: 1905244 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
274 : : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
275 : : #endif
276 : :
277 : : /* Do we need to issue read-ahead advice? */
160 andres@anarazel.de 278 : 1441194 : flags = stream->read_buffers_flags;
175 tmunro@postgresql.or 279 [ + + ]: 1441194 : if (stream->advice_enabled)
280 : : {
281 [ + + ]: 1669 : if (stream->pending_read_blocknum == stream->seq_blocknum)
282 : : {
283 : : /*
284 : : * Sequential: Issue advice until the preadv() calls have caught
285 : : * up with the first advice issued for this sequential region, and
286 : : * then stay of the way of the kernel's own read-ahead.
287 : : */
288 [ + + ]: 23 : if (stream->seq_until_processed != InvalidBlockNumber)
160 andres@anarazel.de 289 : 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
290 : : }
291 : : else
292 : : {
293 : : /*
294 : : * Random jump: Note the starting location of a new potential
295 : : * sequential region and start issuing advice. Skip it this time
296 : : * if the preadv() follows immediately, eg first block in stream.
297 : : */
175 tmunro@postgresql.or 298 : 1646 : stream->seq_until_processed = stream->pending_read_blocknum;
299 [ + + ]: 1646 : if (stream->pinned_buffers > 0)
160 andres@anarazel.de 300 : 24 : flags |= READ_BUFFERS_ISSUE_ADVICE;
301 : : }
302 : : }
303 : :
304 : : /*
305 : : * How many more buffers is this backend allowed?
306 : : *
307 : : * Forwarded buffers are already pinned and map to the leading blocks of
308 : : * the pending read (the remaining portion of an earlier short read that
309 : : * we're about to continue). They are not counted in pinned_buffers, but
310 : : * they are counted as pins already held by this backend according to the
311 : : * buffer manager, so they must be added to the limit it grants us.
312 : : */
176 tmunro@postgresql.or 313 [ + + ]: 1441194 : if (stream->temporary)
314 [ + - ]: 11906 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
315 : : else
316 [ + - ]: 1429288 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
169 317 [ - + ]: 1441194 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
318 : :
319 : 1441194 : buffer_limit += stream->forwarded_buffers;
152 andres@anarazel.de 320 : 1441194 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
321 : :
176 tmunro@postgresql.or 322 [ + + + - ]: 1441194 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
323 : 600143 : buffer_limit = 1; /* guarantee progress */
324 : :
325 : : /* Does the per-backend limit affect this read? */
326 : 1441194 : nblocks = stream->pending_read_nblocks;
327 [ + + ]: 1441194 : if (buffer_limit < nblocks)
328 : : {
329 : : int16 new_distance;
330 : :
331 : : /* Shrink distance: no more look-ahead until buffers are released. */
332 : 2276 : new_distance = stream->pinned_buffers + buffer_limit;
333 [ + + ]: 2276 : if (stream->distance > new_distance)
334 : 1695 : stream->distance = new_distance;
335 : :
336 : : /* Unless we have nothing to give the consumer, stop here. */
337 [ + + ]: 2276 : if (stream->pinned_buffers > 0)
338 : 1001 : return false;
339 : :
340 : : /* A short read is required to make progress. */
341 : 1275 : nblocks = buffer_limit;
342 : : }
343 : :
344 : : /*
345 : : * We say how many blocks we want to read, but it may be smaller on return
346 : : * if the buffer manager decides to shorten the read. Initialize buffers
347 : : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
348 : : * and keep the original nblocks number so we can check for forwarded
349 : : * buffers as output, below.
350 : : */
521 351 : 1440193 : buffer_index = stream->next_buffer_index;
352 : 1440193 : io_index = stream->next_io_index;
169 353 [ + + ]: 2342099 : while (stream->initialized_buffers < buffer_index + nblocks)
354 : 901906 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
355 : 1440193 : requested_nblocks = nblocks;
521 356 : 1440193 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
357 : 1440193 : &stream->buffers[buffer_index],
358 : : stream->pending_read_blocknum,
359 : : &nblocks,
360 : : flags);
361 : 1440187 : stream->pinned_buffers += nblocks;
362 : :
363 : : /* Remember whether we need to wait before returning this buffer. */
364 [ + + ]: 1440187 : if (!need_wait)
365 : : {
366 : : /* Look-ahead distance decays, no I/O necessary. */
367 [ + + ]: 896721 : if (stream->distance > 1)
368 : 17232 : stream->distance--;
369 : : }
370 : : else
371 : : {
372 : : /*
373 : : * Remember to call WaitReadBuffers() before returning head buffer.
374 : : * Look-ahead distance will be adjusted after waiting.
375 : : */
376 : 543466 : stream->ios[io_index].buffer_index = buffer_index;
377 [ + + ]: 543466 : if (++stream->next_io_index == stream->max_ios)
378 : 22308 : stream->next_io_index = 0;
379 [ - + ]: 543466 : Assert(stream->ios_in_progress < stream->max_ios);
380 : 543466 : stream->ios_in_progress++;
381 : 543466 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
382 : : }
383 : :
384 : : /*
385 : : * How many pins were acquired but forwarded to the next call? These need
386 : : * to be passed to the next StartReadBuffers() call by leaving them
387 : : * exactly where they are in the queue, or released if the stream ends
388 : : * early. We need the number for accounting purposes, since they are not
389 : : * counted in stream->pinned_buffers but we already hold them.
390 : : */
169 391 : 1440187 : forwarded = 0;
392 [ + + ]: 1441530 : while (nblocks + forwarded < requested_nblocks &&
393 [ + + ]: 52443 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
394 : 1343 : forwarded++;
395 : 1440187 : stream->forwarded_buffers = forwarded;
396 : :
397 : : /*
398 : : * We gave a contiguous range of buffer space to StartReadBuffers(), but
399 : : * we want it to wrap around at queue_size. Copy overflowing buffers to
400 : : * the front of the array where they'll be consumed, but also leave a copy
401 : : * in the overflow zone which the I/O operation has a pointer to (it needs
402 : : * a contiguous array). Both copies will be cleared when the buffers are
403 : : * handed to the consumer.
404 : : */
405 : 1440187 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
521 406 [ + + ]: 1440187 : if (overflow > 0)
407 : : {
169 408 [ - + ]: 306 : Assert(overflow < stream->queue_size); /* can't overlap */
409 : 306 : memcpy(&stream->buffers[0],
410 : 306 : &stream->buffers[stream->queue_size],
411 : : sizeof(stream->buffers[0]) * overflow);
412 : : }
413 : :
414 : : /* Compute location of start of next read, without using % operator. */
521 415 : 1440187 : buffer_index += nblocks;
416 [ + + ]: 1440187 : if (buffer_index >= stream->queue_size)
417 : 278815 : buffer_index -= stream->queue_size;
418 [ + - - + ]: 1440187 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
419 : 1440187 : stream->next_buffer_index = buffer_index;
420 : :
421 : : /* Adjust the pending read to cover the remaining portion, if any. */
422 : 1440187 : stream->pending_read_blocknum += nblocks;
423 : 1440187 : stream->pending_read_nblocks -= nblocks;
424 : :
176 425 : 1440187 : return true;
426 : : }
427 : :
428 : : static void
175 429 : 2566751 : read_stream_look_ahead(ReadStream *stream)
430 : : {
431 : : /*
432 : : * Allow amortizing the cost of submitting IO over multiple IOs. This
433 : : * requires that we don't do any operations that could lead to a deadlock
434 : : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
435 : : * careful.
436 : : */
160 andres@anarazel.de 437 [ + + ]: 2566751 : if (stream->batch_mode)
438 : 2483048 : pgaio_enter_batchmode();
439 : :
521 tmunro@postgresql.or 440 [ + - ]: 4170359 : while (stream->ios_in_progress < stream->max_ios &&
441 [ + + ]: 4170359 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
442 : : {
443 : : BlockNumber blocknum;
444 : : int16 buffer_index;
445 : : void *per_buffer_data;
446 : :
177 447 [ + + ]: 2476918 : if (stream->pending_read_nblocks == stream->io_combine_limit)
448 : : {
175 449 : 3550 : read_stream_start_pending_read(stream);
521 450 : 3550 : continue;
451 : : }
452 : :
453 : : /*
454 : : * See which block the callback wants next in the stream. We need to
455 : : * compute the index of the Nth block of the pending read including
456 : : * wrap-around, but we don't want to use the expensive % operator.
457 : : */
458 : 2473368 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
459 [ + + ]: 2473368 : if (buffer_index >= stream->queue_size)
460 : 1694 : buffer_index -= stream->queue_size;
461 [ + - - + ]: 2473368 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
462 : 2473368 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
463 : 2473368 : blocknum = read_stream_get_block(stream, per_buffer_data);
464 [ + + ]: 2473368 : if (blocknum == InvalidBlockNumber)
465 : : {
466 : : /* End of stream. */
467 : 873310 : stream->distance = 0;
468 : 873310 : break;
469 : : }
470 : :
471 : : /* Can we merge it with the pending read? */
472 [ + + ]: 1600058 : if (stream->pending_read_nblocks > 0 &&
473 [ + + ]: 212623 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
474 : : {
475 : 212579 : stream->pending_read_nblocks++;
476 : 212579 : continue;
477 : : }
478 : :
479 : : /* We have to start the pending read before we can build another. */
517 480 [ + + ]: 1387532 : while (stream->pending_read_nblocks > 0)
481 : : {
175 482 [ + - ]: 53 : if (!read_stream_start_pending_read(stream) ||
176 483 [ - + ]: 53 : stream->ios_in_progress == stream->max_ios)
484 : : {
485 : : /* We've hit the buffer or I/O limit. Rewind and stop here. */
521 tmunro@postgresql.or 486 :LBC (2) : read_stream_unget_block(stream, blocknum);
160 andres@anarazel.de 487 [ # # ]: (2) : if (stream->batch_mode)
488 : (2) : pgaio_exit_batchmode();
521 tmunro@postgresql.or 489 : (2) : return;
490 : : }
491 : : }
492 : :
493 : : /* This is the start of a new pending read. */
521 tmunro@postgresql.or 494 :CBC 1387479 : stream->pending_read_blocknum = blocknum;
495 : 1387479 : stream->pending_read_nblocks = 1;
496 : : }
497 : :
498 : : /*
499 : : * We don't start the pending read just because we've hit the distance
500 : : * limit, preferring to give it another chance to grow to full
501 : : * io_combine_limit size once more buffers have been consumed. However,
502 : : * if we've already reached io_combine_limit, or we've reached the
503 : : * distance limit and there isn't anything pinned yet, or the callback has
504 : : * signaled end-of-stream, we start the read immediately. Note that the
505 : : * pending read can exceed the distance goal, if the latter was reduced
506 : : * after hitting the per-backend buffer limit.
507 : : */
508 [ + + ]: 2566751 : if (stream->pending_read_nblocks > 0 &&
177 509 [ + + ]: 1474999 : (stream->pending_read_nblocks == stream->io_combine_limit ||
176 510 [ + + ]: 1467386 : (stream->pending_read_nblocks >= stream->distance &&
521 511 [ + + ]: 1429978 : stream->pinned_buffers == 0) ||
512 [ + + ]: 43164 : stream->distance == 0) &&
513 [ + - ]: 1437591 : stream->ios_in_progress < stream->max_ios)
175 514 : 1437591 : read_stream_start_pending_read(stream);
515 : :
516 : : /*
517 : : * There should always be something pinned when we leave this function,
518 : : * whether started by this call or not, unless we've hit the end of the
519 : : * stream. In the worst case we can always make progress one buffer at a
520 : : * time.
521 : : */
176 522 [ + + - + ]: 2566745 : Assert(stream->pinned_buffers > 0 || stream->distance == 0);
523 : :
160 andres@anarazel.de 524 [ + + ]: 2566745 : if (stream->batch_mode)
525 : 2483042 : pgaio_exit_batchmode();
526 : : }
527 : :
528 : : /*
529 : : * Create a new read stream object that can be used to perform the equivalent
530 : : * of a series of ReadBuffer() calls for one fork of one relation.
531 : : * Internally, it generates larger vectored reads where possible by looking
532 : : * ahead. The callback should return block numbers or InvalidBlockNumber to
533 : : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
534 : : * write extra data for each block into the space provided to it. It will
535 : : * also receive callback_private_data for its own purposes.
536 : : */
537 : : static ReadStream *
413 noah@leadboat.com 538 : 437347 : read_stream_begin_impl(int flags,
539 : : BufferAccessStrategy strategy,
540 : : Relation rel,
541 : : SMgrRelation smgr,
542 : : char persistence,
543 : : ForkNumber forknum,
544 : : ReadStreamBlockNumberCB callback,
545 : : void *callback_private_data,
546 : : size_t per_buffer_data_size)
547 : : {
548 : : ReadStream *stream;
549 : : size_t size;
550 : : int16 queue_size;
551 : : int16 queue_overflow;
552 : : int max_ios;
553 : : int strategy_pin_limit;
554 : : uint32 max_pinned_buffers;
555 : : uint32 max_possible_buffer_limit;
556 : : Oid tablespace_id;
557 : :
558 : : /*
559 : : * Decide how many I/Os we will allow to run at the same time. That
560 : : * currently means advice to the kernel to tell it that we will soon read.
561 : : * This number also affects how far we look ahead for opportunities to
562 : : * start more I/Os.
563 : : */
521 tmunro@postgresql.or 564 : 437347 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
565 [ + + + + ]: 437347 : if (!OidIsValid(MyDatabaseId) ||
413 noah@leadboat.com 566 [ + + + + ]: 511819 : (rel && IsCatalogRelation(rel)) ||
521 tmunro@postgresql.or 567 : 141025 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
568 : : {
569 : : /*
570 : : * Avoid circularity while trying to look up tablespace settings or
571 : : * before spccache.c is ready.
572 : : */
573 : 351169 : max_ios = effective_io_concurrency;
574 : : }
575 [ + + ]: 86178 : else if (flags & READ_STREAM_MAINTENANCE)
576 : 6251 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
577 : : else
578 : 79927 : max_ios = get_tablespace_io_concurrency(tablespace_id);
579 : :
580 : : /* Cap to INT16_MAX to avoid overflowing below */
581 : 437347 : max_ios = Min(max_ios, PG_INT16_MAX);
582 : :
583 : : /*
584 : : * If starting a multi-block I/O near the end of the queue, we might
585 : : * temporarily need extra space for overflowing buffers before they are
586 : : * moved to regular circular position. This is the maximum extra space we
587 : : * could need.
588 : : */
177 589 : 437347 : queue_overflow = io_combine_limit - 1;
590 : :
591 : : /*
592 : : * Choose the maximum number of buffers we're prepared to pin. We try to
593 : : * pin fewer if we can, though. We add one so that we can make progress
594 : : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
595 : : * this also allows an extra full I/O's worth of buffers: after an I/O
596 : : * finishes we don't want to have to wait for its buffers to be consumed
597 : : * before starting a new one.
598 : : *
599 : : * Be careful not to allow int16 to overflow. That is possible with the
600 : : * current GUC range limits, so this is an artificial limit of ~32k
601 : : * buffers and we'd need to adjust the types to exceed that. We also have
602 : : * to allow for the spare entry and the overflow space.
603 : : */
191 604 : 437347 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
521 605 : 437347 : max_pinned_buffers = Min(max_pinned_buffers,
606 : : PG_INT16_MAX - queue_overflow - 1);
607 : :
608 : : /* Give the strategy a chance to limit the number of buffers we pin. */
518 609 : 437347 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
610 : 437347 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
611 : :
612 : : /*
613 : : * Also limit our queue to the maximum number of pins we could ever be
614 : : * allowed to acquire according to the buffer manager. We may not really
615 : : * be able to use them all due to other pins held by this backend, but
616 : : * we'll check that later in read_stream_start_pending_read().
617 : : */
521 618 [ + + ]: 437347 : if (SmgrIsTemp(smgr))
176 619 : 6789 : max_possible_buffer_limit = GetLocalPinLimit();
620 : : else
621 : 430558 : max_possible_buffer_limit = GetPinLimit();
622 : 437347 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
623 : :
624 : : /*
625 : : * The limit might be zero on a system configured with too few buffers for
626 : : * the number of connections. We need at least one to make progress.
627 : : */
628 [ + + ]: 437347 : max_pinned_buffers = Max(1, max_pinned_buffers);
629 : :
630 : : /*
631 : : * We need one extra entry for buffers and per-buffer data, because users
632 : : * of per-buffer data have access to the object until the next call to
633 : : * read_stream_next_buffer(), so we need a gap between the head and tail
634 : : * of the queue so that we don't clobber it.
635 : : */
521 636 : 437347 : queue_size = max_pinned_buffers + 1;
637 : :
638 : : /*
639 : : * Allocate the object, the buffers, the ios and per_buffer_data space in
640 : : * one big chunk. Though we have queue_size buffers, we want to be able
641 : : * to assume that all the buffers for a single read are contiguous (i.e.
642 : : * don't wrap around halfway through), so we allow temporary overflows of
643 : : * up to the maximum possible overflow size.
644 : : */
645 : 437347 : size = offsetof(ReadStream, buffers);
177 646 : 437347 : size += sizeof(Buffer) * (queue_size + queue_overflow);
521 647 [ + + ]: 437347 : size += sizeof(InProgressIO) * Max(1, max_ios);
648 : 437347 : size += per_buffer_data_size * queue_size;
649 : 437347 : size += MAXIMUM_ALIGNOF * 2;
650 : 437347 : stream = (ReadStream *) palloc(size);
651 : 437347 : memset(stream, 0, offsetof(ReadStream, buffers));
652 : 437347 : stream->ios = (InProgressIO *)
177 653 : 437347 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
521 654 [ + + ]: 437347 : if (per_buffer_data_size > 0)
655 : 21694 : stream->per_buffer_data = (void *)
656 [ + + ]: 21694 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
657 : :
160 andres@anarazel.de 658 : 437347 : stream->sync_mode = io_method == IOMETHOD_SYNC;
659 : 437347 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
660 : :
661 : : #ifdef USE_PREFETCH
662 : :
663 : : /*
664 : : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
665 : : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
666 : : * caller hasn't promised sequential access (overriding our detection
667 : : * heuristics), and max_ios hasn't been set to zero.
668 : : */
669 [ + + ]: 437347 : if (stream->sync_mode &&
670 [ + - ]: 2878 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
521 tmunro@postgresql.or 671 [ + + + - ]: 2878 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
672 : : max_ios > 0)
673 : 692 : stream->advice_enabled = true;
674 : : #endif
675 : :
676 : : /*
677 : : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
678 : : * we still need to allocate space to combine and run one I/O. Bump it up
679 : : * to one, and remember to ask for synchronous I/O only.
680 : : */
681 [ + + ]: 437347 : if (max_ios == 0)
682 : : {
683 : 7 : max_ios = 1;
160 andres@anarazel.de 684 : 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
685 : : }
686 : :
687 : : /*
688 : : * Capture stable values for these two GUC-derived numbers for the
689 : : * lifetime of this stream, so we don't have to worry about the GUCs
690 : : * changing underneath us beyond this point.
691 : : */
521 tmunro@postgresql.or 692 : 437347 : stream->max_ios = max_ios;
177 693 : 437347 : stream->io_combine_limit = io_combine_limit;
694 : :
521 695 : 437347 : stream->per_buffer_data_size = per_buffer_data_size;
696 : 437347 : stream->max_pinned_buffers = max_pinned_buffers;
697 : 437347 : stream->queue_size = queue_size;
698 : 437347 : stream->callback = callback;
699 : 437347 : stream->callback_private_data = callback_private_data;
371 700 : 437347 : stream->buffered_blocknum = InvalidBlockNumber;
175 701 : 437347 : stream->seq_blocknum = InvalidBlockNumber;
702 : 437347 : stream->seq_until_processed = InvalidBlockNumber;
176 703 : 437347 : stream->temporary = SmgrIsTemp(smgr);
704 : :
705 : : /*
706 : : * Skip the initial ramp-up phase if the caller says we're going to be
707 : : * reading the whole relation. This way we start out assuming we'll be
708 : : * doing full io_combine_limit sized reads.
709 : : */
521 710 [ + + ]: 437347 : if (flags & READ_STREAM_FULL)
177 711 : 64494 : stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
712 : : else
521 713 : 372853 : stream->distance = 1;
714 : :
715 : : /*
716 : : * Since we always access the same relation, we can initialize parts of
717 : : * the ReadBuffersOperation objects and leave them that way, to avoid
718 : : * wasting CPU cycles writing to them for each read.
719 : : */
720 [ + + ]: 7456316 : for (int i = 0; i < max_ios; ++i)
721 : : {
722 : 7018969 : stream->ios[i].op.rel = rel;
413 noah@leadboat.com 723 : 7018969 : stream->ios[i].op.smgr = smgr;
724 : 7018969 : stream->ios[i].op.persistence = persistence;
521 tmunro@postgresql.or 725 : 7018969 : stream->ios[i].op.forknum = forknum;
726 : 7018969 : stream->ios[i].op.strategy = strategy;
727 : : }
728 : :
729 : 437347 : return stream;
730 : : }
731 : :
732 : : /*
733 : : * Create a new read stream for reading a relation.
734 : : * See read_stream_begin_impl() for the detailed explanation.
735 : : */
736 : : ReadStream *
413 noah@leadboat.com 737 : 378525 : read_stream_begin_relation(int flags,
738 : : BufferAccessStrategy strategy,
739 : : Relation rel,
740 : : ForkNumber forknum,
741 : : ReadStreamBlockNumberCB callback,
742 : : void *callback_private_data,
743 : : size_t per_buffer_data_size)
744 : : {
745 : 378525 : return read_stream_begin_impl(flags,
746 : : strategy,
747 : : rel,
748 : : RelationGetSmgr(rel),
749 : 378525 : rel->rd_rel->relpersistence,
750 : : forknum,
751 : : callback,
752 : : callback_private_data,
753 : : per_buffer_data_size);
754 : : }
755 : :
756 : : /*
757 : : * Create a new read stream for reading a SMgr relation.
758 : : * See read_stream_begin_impl() for the detailed explanation.
759 : : */
760 : : ReadStream *
761 : 58822 : read_stream_begin_smgr_relation(int flags,
762 : : BufferAccessStrategy strategy,
763 : : SMgrRelation smgr,
764 : : char smgr_persistence,
765 : : ForkNumber forknum,
766 : : ReadStreamBlockNumberCB callback,
767 : : void *callback_private_data,
768 : : size_t per_buffer_data_size)
769 : : {
770 : 58822 : return read_stream_begin_impl(flags,
771 : : strategy,
772 : : NULL,
773 : : smgr,
774 : : smgr_persistence,
775 : : forknum,
776 : : callback,
777 : : callback_private_data,
778 : : per_buffer_data_size);
779 : : }
780 : :
781 : : /*
782 : : * Pull one pinned buffer out of a stream. Each call returns successive
783 : : * blocks in the order specified by the callback. If per_buffer_data_size was
784 : : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
785 : : * per-buffer data that the callback had a chance to populate, which remains
786 : : * valid until the next call to read_stream_next_buffer(). When the stream
787 : : * runs out of data, InvalidBuffer is returned. The caller may decide to end
788 : : * the stream early at any time by calling read_stream_end().
789 : : */
790 : : Buffer
521 tmunro@postgresql.or 791 : 5306077 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
792 : : {
793 : : Buffer buffer;
794 : : int16 oldest_buffer_index;
795 : :
796 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
797 : :
798 : : /*
799 : : * A fast path for all-cached scans. This is the same as the usual
800 : : * algorithm, but it is specialized for no I/O and no per-buffer data, so
801 : : * we can skip the queue management code, stay in the same buffer slot and
802 : : * use singular StartReadBuffer().
803 : : */
804 [ + + ]: 5306077 : if (likely(stream->fast_path))
805 : : {
806 : : BlockNumber next_blocknum;
807 : :
808 : : /* Fast path assumptions. */
809 [ - + ]: 1845176 : Assert(stream->ios_in_progress == 0);
169 810 [ - + ]: 1845176 : Assert(stream->forwarded_buffers == 0);
521 811 [ - + ]: 1845176 : Assert(stream->pinned_buffers == 1);
812 [ - + ]: 1845176 : Assert(stream->distance == 1);
518 813 [ - + ]: 1845176 : Assert(stream->pending_read_nblocks == 0);
521 814 [ - + ]: 1845176 : Assert(stream->per_buffer_data_size == 0);
169 815 [ - + ]: 1845176 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
816 : :
817 : : /* We're going to return the buffer we pinned last time. */
521 818 : 1845176 : oldest_buffer_index = stream->oldest_buffer_index;
819 [ - + ]: 1845176 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
820 : : stream->next_buffer_index);
821 : 1845176 : buffer = stream->buffers[oldest_buffer_index];
822 [ - + ]: 1845176 : Assert(buffer != InvalidBuffer);
823 : :
824 : : /* Choose the next block to pin. */
371 825 : 1845176 : next_blocknum = read_stream_get_block(stream, NULL);
826 : :
518 827 [ + + ]: 1845176 : if (likely(next_blocknum != InvalidBlockNumber))
828 : : {
160 andres@anarazel.de 829 : 1764749 : int flags = stream->read_buffers_flags;
830 : :
831 [ + + ]: 1764749 : if (stream->advice_enabled)
832 : 554 : flags |= READ_BUFFERS_ISSUE_ADVICE;
833 : :
834 : : /*
835 : : * Pin a buffer for the next call. Same buffer entry, and
836 : : * arbitrary I/O entry (they're all free). We don't have to
837 : : * adjust pinned_buffers because we're transferring one to caller
838 : : * but pinning one more.
839 : : *
840 : : * In the fast path we don't need to check the pin limit. We're
841 : : * always allowed at least one pin so that progress can be made,
842 : : * and that's all we need here. Although two pins are momentarily
843 : : * held at the same time, the model used here is that the stream
844 : : * holds only one, and the other now belongs to the caller.
845 : : */
518 tmunro@postgresql.or 846 [ + + ]: 1764749 : if (likely(!StartReadBuffer(&stream->ios[0].op,
847 : : &stream->buffers[oldest_buffer_index],
848 : : next_blocknum,
849 : : flags)))
850 : : {
851 : : /* Fast return. */
852 : 1751047 : return buffer;
853 : : }
854 : :
855 : : /* Next call must wait for I/O for the newly pinned buffer. */
521 856 : 13702 : stream->oldest_io_index = 0;
857 : 13702 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
858 : 13702 : stream->ios_in_progress = 1;
859 : 13702 : stream->ios[0].buffer_index = oldest_buffer_index;
860 : 13702 : stream->seq_blocknum = next_blocknum + 1;
861 : : }
862 : : else
863 : : {
864 : : /* No more blocks, end of stream. */
518 865 : 80427 : stream->distance = 0;
866 : 80427 : stream->oldest_buffer_index = stream->next_buffer_index;
867 : 80427 : stream->pinned_buffers = 0;
169 868 : 80427 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
869 : : }
870 : :
518 871 : 94129 : stream->fast_path = false;
521 872 : 94129 : return buffer;
873 : : }
874 : : #endif
875 : :
876 [ + + ]: 3460901 : if (unlikely(stream->pinned_buffers == 0))
877 : : {
878 [ - + ]: 2509805 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
879 : :
880 : : /* End of stream reached? */
881 [ + + ]: 2509805 : if (stream->distance == 0)
882 : 1461752 : return InvalidBuffer;
883 : :
884 : : /*
885 : : * The usual order of operations is that we look ahead at the bottom
886 : : * of this function after potentially finishing an I/O and making
887 : : * space for more, but if we're just starting up we'll need to crank
888 : : * the handle to get started.
889 : : */
175 890 : 1048053 : read_stream_look_ahead(stream);
891 : :
892 : : /* End of stream reached? */
521 893 [ + + ]: 1048053 : if (stream->pinned_buffers == 0)
894 : : {
895 [ - + ]: 480432 : Assert(stream->distance == 0);
896 : 480432 : return InvalidBuffer;
897 : : }
898 : : }
899 : :
900 : : /* Grab the oldest pinned buffer and associated per-buffer data. */
901 [ - + ]: 1518717 : Assert(stream->pinned_buffers > 0);
902 : 1518717 : oldest_buffer_index = stream->oldest_buffer_index;
903 [ + - - + ]: 1518717 : Assert(oldest_buffer_index >= 0 &&
904 : : oldest_buffer_index < stream->queue_size);
905 : 1518717 : buffer = stream->buffers[oldest_buffer_index];
906 [ + + ]: 1518717 : if (per_buffer_data)
907 : 283785 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
908 : :
909 [ - + ]: 1518717 : Assert(BufferIsValid(buffer));
910 : :
911 : : /* Do we have to wait for an associated I/O first? */
912 [ + + ]: 1518717 : if (stream->ios_in_progress > 0 &&
913 [ + + ]: 639156 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
914 : : {
915 : 556869 : int16 io_index = stream->oldest_io_index;
916 : : int32 distance; /* wider temporary value, clamped below */
917 : :
918 : : /* Sanity check that we still agree on the buffers. */
919 [ - + ]: 556869 : Assert(stream->ios[io_index].op.buffers ==
920 : : &stream->buffers[oldest_buffer_index]);
921 : :
922 : 556869 : WaitReadBuffers(&stream->ios[io_index].op);
923 : :
924 [ - + ]: 556850 : Assert(stream->ios_in_progress > 0);
925 : 556850 : stream->ios_in_progress--;
926 [ + + ]: 556850 : if (++stream->oldest_io_index == stream->max_ios)
927 : 22308 : stream->oldest_io_index = 0;
928 : :
929 : : /* Look-ahead distance ramps up rapidly after we do I/O. */
174 930 : 556850 : distance = stream->distance * 2;
931 : 556850 : distance = Min(distance, stream->max_pinned_buffers);
932 : 556850 : stream->distance = distance;
933 : :
934 : : /*
935 : : * If we've reached the first block of a sequential region we're
936 : : * issuing advice for, cancel that until the next jump. The kernel
937 : : * will see the sequential preadv() pattern starting here.
938 : : */
175 939 [ + + ]: 556850 : if (stream->advice_enabled &&
940 [ + + ]: 273 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
941 : 251 : stream->seq_until_processed = InvalidBlockNumber;
942 : : }
943 : :
944 : : /*
945 : : * We must zap this queue entry, or else it would appear as a forwarded
946 : : * buffer. If it's potentially in the overflow zone (ie from a
947 : : * multi-block I/O that wrapped around the queue), also zap the copy.
948 : : */
521 949 : 1518698 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
169 950 [ + + ]: 1518698 : if (oldest_buffer_index < stream->io_combine_limit - 1)
951 : 1174314 : stream->buffers[stream->queue_size + oldest_buffer_index] =
952 : : InvalidBuffer;
953 : :
954 : : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
955 : :
956 : : /*
957 : : * The caller will get access to the per-buffer data, until the next call.
958 : : * We wipe the one before, which is never occupied because queue_size
959 : : * allowed one extra element. This will hopefully trip up client code
960 : : * that is holding a dangling pointer to it.
961 : : */
521 962 [ + + ]: 1518698 : if (stream->per_buffer_data)
963 : : {
964 : : void *per_buffer_data;
965 : :
203 966 [ + + ]: 567598 : per_buffer_data = get_per_buffer_data(stream,
967 : : oldest_buffer_index == 0 ?
968 : 71462 : stream->queue_size - 1 :
969 : 212337 : oldest_buffer_index - 1);
970 : :
971 : : #if defined(CLOBBER_FREED_MEMORY)
972 : : /* This also tells Valgrind the memory is "noaccess". */
973 : 283799 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
974 : : #elif defined(USE_VALGRIND)
975 : : /* Tell it ourselves. */
976 : : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
977 : : stream->per_buffer_data_size);
978 : : #endif
979 : : }
980 : : #endif
981 : :
982 : : /* Pin transferred to caller. */
521 983 [ - + ]: 1518698 : Assert(stream->pinned_buffers > 0);
984 : 1518698 : stream->pinned_buffers--;
985 : :
986 : : /* Advance oldest buffer, with wrap-around. */
987 : 1518698 : stream->oldest_buffer_index++;
988 [ + + ]: 1518698 : if (stream->oldest_buffer_index == stream->queue_size)
989 : 271751 : stream->oldest_buffer_index = 0;
990 : :
991 : : /* Prepare for the next call. */
175 992 : 1518698 : read_stream_look_ahead(stream);
993 : :
994 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
995 : : /* See if we can take the fast path for all-cached scans next time. */
521 996 [ + + ]: 1518692 : if (stream->ios_in_progress == 0 &&
169 997 [ + + ]: 962000 : stream->forwarded_buffers == 0 &&
521 998 [ + + ]: 961485 : stream->pinned_buffers == 1 &&
999 [ + + ]: 418462 : stream->distance == 1 &&
518 1000 [ + + ]: 350227 : stream->pending_read_nblocks == 0 &&
521 1001 [ + + ]: 348828 : stream->per_buffer_data_size == 0)
1002 : : {
1003 : : /*
1004 : : * The fast path spins on one buffer entry repeatedly instead of
1005 : : * rotating through the whole queue and clearing the entries behind
1006 : : * it. If the buffer it starts with happened to be forwarded between
1007 : : * StartReadBuffers() calls and also wrapped around the circular queue
1008 : : * partway through, then a copy also exists in the overflow zone, and
1009 : : * it won't clear it out as the regular path would. Do that now, so
1010 : : * it doesn't need code for that.
1011 : : */
28 1012 [ + + ]: 182315 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1013 : 180908 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1014 : : InvalidBuffer;
1015 : :
521 1016 : 182315 : stream->fast_path = true;
1017 : : }
1018 : : #endif
1019 : :
1020 : 1518692 : return buffer;
1021 : : }
1022 : :
1023 : : /*
1024 : : * Transitional support for code that would like to perform or skip reads
1025 : : * itself, without using the stream. Returns, and consumes, the next block
1026 : : * number that would be read by the stream's look-ahead algorithm, or
1027 : : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1028 : : * strategy that would be used to read it.
1029 : : */
1030 : : BlockNumber
353 tmunro@postgresql.or 1031 :UBC 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1032 : : {
1033 : 0 : *strategy = stream->ios[0].op.strategy;
1034 : 0 : return read_stream_get_block(stream, NULL);
1035 : : }
1036 : :
1037 : : /*
1038 : : * Reset a read stream by releasing any queued up buffers, allowing the stream
1039 : : * to be used again for different blocks. This can be used to clear an
1040 : : * end-of-stream condition and start again, or to throw away blocks that were
1041 : : * speculatively read and read some different blocks instead.
1042 : : */
1043 : : void
521 tmunro@postgresql.or 1044 :CBC 1048012 : read_stream_reset(ReadStream *stream)
1045 : : {
1046 : : int16 index;
1047 : : Buffer buffer;
1048 : :
1049 : : /* Stop looking ahead. */
1050 : 1048012 : stream->distance = 0;
1051 : :
1052 : : /* Forget buffered block number and fast path state. */
371 1053 : 1048012 : stream->buffered_blocknum = InvalidBlockNumber;
518 1054 : 1048012 : stream->fast_path = false;
1055 : :
1056 : : /* Unpin anything that wasn't consumed. */
521 1057 [ + + ]: 1161859 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1058 : 113847 : ReleaseBuffer(buffer);
1059 : :
1060 : : /* Unpin any unused forwarded buffers. */
169 1061 : 1048012 : index = stream->next_buffer_index;
1062 [ + + ]: 1048012 : while (index < stream->initialized_buffers &&
1063 [ - + ]: 227709 : (buffer = stream->buffers[index]) != InvalidBuffer)
1064 : : {
169 tmunro@postgresql.or 1065 [ # # ]:UBC 0 : Assert(stream->forwarded_buffers > 0);
1066 : 0 : stream->forwarded_buffers--;
1067 : 0 : ReleaseBuffer(buffer);
1068 : :
1069 : 0 : stream->buffers[index] = InvalidBuffer;
1070 [ # # ]: 0 : if (index < stream->io_combine_limit - 1)
1071 : 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1072 : :
1073 [ # # ]: 0 : if (++index == stream->queue_size)
1074 : 0 : index = 0;
1075 : : }
1076 : :
169 tmunro@postgresql.or 1077 [ - + ]:CBC 1048012 : Assert(stream->forwarded_buffers == 0);
521 1078 [ - + ]: 1048012 : Assert(stream->pinned_buffers == 0);
1079 [ - + ]: 1048012 : Assert(stream->ios_in_progress == 0);
1080 : :
1081 : : /* Start off assuming data is cached. */
1082 : 1048012 : stream->distance = 1;
1083 : 1048012 : }
1084 : :
1085 : : /*
1086 : : * Release and free a read stream.
1087 : : */
1088 : : void
1089 : 435069 : read_stream_end(ReadStream *stream)
1090 : : {
1091 : 435069 : read_stream_reset(stream);
1092 : 435069 : pfree(stream);
1093 : 435069 : }
|