Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogprefetcher.c
4 : : * Prefetching support for recovery.
5 : : *
6 : : * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/transam/xlogprefetcher.c
12 : : *
13 : : * This module provides a drop-in replacement for an XLogReader that tries to
14 : : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : : * accessed in the near future are not already in the buffer pool, it initiates
16 : : * I/Os that might complete before the caller eventually needs the data. When
17 : : * referenced blocks are found in the buffer pool already, the buffer is
18 : : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : : * avoid a second buffer mapping table lookup.
20 : : *
21 : : * Currently, only the main fork is considered for prefetching. Currently,
22 : : * prefetching is only effective on systems where PrefetchBuffer() does
23 : : * something useful (mainly Linux).
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : :
28 : : #include "postgres.h"
29 : :
30 : : #include "access/xlogprefetcher.h"
31 : : #include "access/xlogreader.h"
32 : : #include "catalog/pg_control.h"
33 : : #include "catalog/storage_xlog.h"
34 : : #include "commands/dbcommands_xlog.h"
35 : : #include "funcapi.h"
36 : : #include "miscadmin.h"
37 : : #include "port/atomics.h"
38 : : #include "storage/bufmgr.h"
39 : : #include "storage/shmem.h"
40 : : #include "storage/smgr.h"
41 : : #include "utils/fmgrprotos.h"
42 : : #include "utils/guc_hooks.h"
43 : : #include "utils/hsearch.h"
44 : : #include "utils/timestamp.h"
45 : :
46 : : /*
47 : : * Every time we process this much WAL, we'll update the values in
48 : : * pg_stat_recovery_prefetch.
49 : : */
50 : : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51 : :
52 : : /*
53 : : * To detect repeated access to the same block and skip useless extra system
54 : : * calls, we remember a small window of recently prefetched blocks.
55 : : */
56 : : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57 : :
58 : : /*
59 : : * When maintenance_io_concurrency is not saturated, we're prepared to look
60 : : * ahead up to N times that number of block references.
61 : : */
62 : : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63 : :
64 : : /* Define to log internal debugging messages. */
65 : : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
66 : :
67 : : /* GUCs */
68 : : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
69 : :
70 : : #ifdef USE_PREFETCH
71 : : #define RecoveryPrefetchEnabled() \
72 : : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 : : maintenance_io_concurrency > 0)
74 : : #else
75 : : #define RecoveryPrefetchEnabled() false
76 : : #endif
77 : :
78 : : static int XLogPrefetchReconfigureCount = 0;
79 : :
80 : : /*
81 : : * Enum used to report whether an IO should be started.
82 : : */
83 : : typedef enum
84 : : {
85 : : LRQ_NEXT_NO_IO,
86 : : LRQ_NEXT_IO,
87 : : LRQ_NEXT_AGAIN,
88 : : } LsnReadQueueNextStatus;
89 : :
90 : : /*
91 : : * Type of callback that can decide which block to prefetch next. For now
92 : : * there is only one.
93 : : */
94 : : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95 : : XLogRecPtr *lsn);
96 : :
97 : : /*
98 : : * A simple circular queue of LSNs, using to control the number of
99 : : * (potentially) inflight IOs. This stands in for a later more general IO
100 : : * control mechanism, which is why it has the apparently unnecessary
101 : : * indirection through a function pointer.
102 : : */
103 : : typedef struct LsnReadQueue
104 : : {
105 : : LsnReadQueueNextFun next;
106 : : uintptr_t lrq_private;
107 : : uint32 max_inflight;
108 : : uint32 inflight;
109 : : uint32 completed;
110 : : uint32 head;
111 : : uint32 tail;
112 : : uint32 size;
113 : : struct
114 : : {
115 : : bool io;
116 : : XLogRecPtr lsn;
117 : : } queue[FLEXIBLE_ARRAY_MEMBER];
118 : : } LsnReadQueue;
119 : :
120 : : /*
121 : : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 : : * blocks that will be soon be referenced, to try to avoid IO stalls.
123 : : */
124 : : struct XLogPrefetcher
125 : : {
126 : : /* WAL reader and current reading state. */
127 : : XLogReaderState *reader;
128 : : DecodedXLogRecord *record;
129 : : int next_block_id;
130 : :
131 : : /* When to publish stats. */
132 : : XLogRecPtr next_stats_shm_lsn;
133 : :
134 : : /* Book-keeping to avoid accessing blocks that don't exist yet. */
135 : : HTAB *filter_table;
136 : : dlist_head filter_queue;
137 : :
138 : : /* Book-keeping to avoid repeat prefetches. */
139 : : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
140 : : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
141 : : int recent_idx;
142 : :
143 : : /* Book-keeping to disable prefetching temporarily. */
144 : : XLogRecPtr no_readahead_until;
145 : :
146 : : /* IO depth manager. */
147 : : LsnReadQueue *streaming_read;
148 : :
149 : : XLogRecPtr begin_ptr;
150 : :
151 : : int reconfigure_count;
152 : : };
153 : :
154 : : /*
155 : : * A temporary filter used to track block ranges that haven't been created
156 : : * yet, whole relations that haven't been created yet, and whole relations
157 : : * that (we assume) have already been dropped, or will be created by bulk WAL
158 : : * operators.
159 : : */
160 : : typedef struct XLogPrefetcherFilter
161 : : {
162 : : RelFileLocator rlocator;
163 : : XLogRecPtr filter_until_replayed;
164 : : BlockNumber filter_from_block;
165 : : dlist_node link;
166 : : } XLogPrefetcherFilter;
167 : :
168 : : /*
169 : : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
170 : : */
171 : : typedef struct XLogPrefetchStats
172 : : {
173 : : pg_atomic_uint64 reset_time; /* Time of last reset. */
174 : : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175 : : pg_atomic_uint64 hit; /* Blocks already in cache. */
176 : : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177 : : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178 : : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179 : : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
180 : :
181 : : /* Dynamic values */
182 : : int wal_distance; /* Number of WAL bytes ahead. */
183 : : int block_distance; /* Number of block references ahead. */
184 : : int io_depth; /* Number of I/Os in progress. */
185 : : } XLogPrefetchStats;
186 : :
187 : : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188 : : RelFileLocator rlocator,
189 : : BlockNumber blockno,
190 : : XLogRecPtr lsn);
191 : : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192 : : RelFileLocator rlocator,
193 : : BlockNumber blockno);
194 : : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195 : : XLogRecPtr replaying_lsn);
196 : : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197 : : XLogRecPtr *lsn);
198 : :
199 : : static XLogPrefetchStats *SharedStats;
200 : :
201 : : static inline LsnReadQueue *
1248 tmunro@postgresql.or 202 :CBC 1902 : lrq_alloc(uint32 max_distance,
203 : : uint32 max_inflight,
204 : : uintptr_t lrq_private,
205 : : LsnReadQueueNextFun next)
206 : : {
207 : : LsnReadQueue *lrq;
208 : : uint32 size;
209 : :
210 [ - + ]: 1902 : Assert(max_distance >= max_inflight);
211 : :
212 : 1902 : size = max_distance + 1; /* full ring buffer has a gap */
213 : 1902 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214 : 1902 : lrq->lrq_private = lrq_private;
215 : 1902 : lrq->max_inflight = max_inflight;
216 : 1902 : lrq->size = size;
217 : 1902 : lrq->next = next;
218 : 1902 : lrq->head = 0;
219 : 1902 : lrq->tail = 0;
220 : 1902 : lrq->inflight = 0;
221 : 1902 : lrq->completed = 0;
222 : :
223 : 1902 : return lrq;
224 : : }
225 : :
226 : : static inline void
227 : 1847 : lrq_free(LsnReadQueue *lrq)
228 : : {
229 : 1847 : pfree(lrq);
230 : 1847 : }
231 : :
232 : : static inline uint32
233 : 1046424 : lrq_inflight(LsnReadQueue *lrq)
234 : : {
235 : 1046424 : return lrq->inflight;
236 : : }
237 : :
238 : : static inline uint32
239 : 1046424 : lrq_completed(LsnReadQueue *lrq)
240 : : {
241 : 1046424 : return lrq->completed;
242 : : }
243 : :
244 : : static inline void
245 : 2737865 : lrq_prefetch(LsnReadQueue *lrq)
246 : : {
247 : : /* Try to start as many IOs as we can within our limits. */
248 [ + + ]: 8364119 : while (lrq->inflight < lrq->max_inflight &&
249 [ + + ]: 5623345 : lrq->inflight + lrq->completed < lrq->size - 1)
250 : : {
251 [ - + ]: 3503668 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252 [ + + + - ]: 3503668 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253 : : {
254 : 615227 : case LRQ_NEXT_AGAIN:
255 : 615227 : return;
256 : 6294 : case LRQ_NEXT_IO:
257 : 6294 : lrq->queue[lrq->head].io = true;
258 : 6294 : lrq->inflight++;
259 : 6294 : break;
260 : 2882095 : case LRQ_NEXT_NO_IO:
261 : 2882095 : lrq->queue[lrq->head].io = false;
262 : 2882095 : lrq->completed++;
263 : 2882095 : break;
264 : : }
265 : 2888389 : lrq->head++;
266 [ + + ]: 2888389 : if (lrq->head == lrq->size)
267 : 44378 : lrq->head = 0;
268 : : }
269 : : }
270 : :
271 : : static inline void
272 : 2737847 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
273 : : {
274 : : /*
275 : : * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 : : * that any IOs that were started before then have finished.
277 : : */
278 [ + + ]: 8364002 : while (lrq->tail != lrq->head &&
279 [ + + ]: 5585503 : lrq->queue[lrq->tail].lsn < lsn)
280 : : {
281 [ + + ]: 2888308 : if (lrq->queue[lrq->tail].io)
282 : 6294 : lrq->inflight--;
283 : : else
284 : 2882014 : lrq->completed--;
285 : 2888308 : lrq->tail++;
286 [ + + ]: 2888308 : if (lrq->tail == lrq->size)
287 : 44377 : lrq->tail = 0;
288 : : }
289 [ + - + - ]: 2737847 : if (RecoveryPrefetchEnabled())
290 : 2737847 : lrq_prefetch(lrq);
291 : 2737795 : }
292 : :
293 : : size_t
294 : 1909 : XLogPrefetchShmemSize(void)
295 : : {
296 : 1909 : return sizeof(XLogPrefetchStats);
297 : : }
298 : :
299 : : /*
300 : : * Reset all counters to zero.
301 : : */
302 : : void
303 : 3 : XLogPrefetchResetStats(void)
304 : : {
305 : 3 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
306 : 3 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
307 : 3 : pg_atomic_write_u64(&SharedStats->hit, 0);
308 : 3 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
309 : 3 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
310 : 3 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
311 : 3 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
312 : 3 : }
313 : :
314 : : void
315 : 1029 : XLogPrefetchShmemInit(void)
316 : : {
317 : : bool found;
318 : :
319 : 1029 : SharedStats = (XLogPrefetchStats *)
320 : 1029 : ShmemInitStruct("XLogPrefetchStats",
321 : : sizeof(XLogPrefetchStats),
322 : : &found);
323 : :
324 [ + - ]: 1029 : if (!found)
325 : : {
326 : 1029 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
327 : 1029 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
328 : 1029 : pg_atomic_init_u64(&SharedStats->hit, 0);
329 : 1029 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
330 : 1029 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
331 : 1029 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
332 : 1029 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
333 : : }
334 : 1029 : }
335 : :
336 : : /*
337 : : * Called when any GUC is changed that affects prefetching.
338 : : */
339 : : void
340 : 10 : XLogPrefetchReconfigure(void)
341 : : {
342 : 10 : XLogPrefetchReconfigureCount++;
343 : 10 : }
344 : :
345 : : /*
346 : : * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 : : * plain uint64 without any memory barrier or locking, except on platforms
348 : : * where readers can't read uint64 without possibly observing a torn value.
349 : : */
350 : : static inline void
351 : 2875232 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
352 : : {
353 [ + + - + ]: 2875232 : Assert(AmStartupProcess() || !IsUnderPostmaster);
354 : 2875232 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 : 2875232 : }
356 : :
357 : : /*
358 : : * Create a prefetcher that is ready to begin prefetching blocks referenced by
359 : : * WAL records.
360 : : */
361 : : XLogPrefetcher *
362 : 887 : XLogPrefetcherAllocate(XLogReaderState *reader)
363 : : {
364 : : XLogPrefetcher *prefetcher;
365 : : HASHCTL ctl;
366 : :
367 : 887 : prefetcher = palloc0(sizeof(XLogPrefetcher));
368 : 887 : prefetcher->reader = reader;
369 : :
391 heikki.linnakangas@i 370 : 887 : ctl.keysize = sizeof(RelFileLocator);
371 : 887 : ctl.entrysize = sizeof(XLogPrefetcherFilter);
1248 tmunro@postgresql.or 372 : 887 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
373 : : &ctl, HASH_ELEM | HASH_BLOBS);
374 : 887 : dlist_init(&prefetcher->filter_queue);
375 : :
376 : 887 : SharedStats->wal_distance = 0;
377 : 887 : SharedStats->block_distance = 0;
378 : 887 : SharedStats->io_depth = 0;
379 : :
380 : : /* First usage will cause streaming_read to be allocated. */
381 : 887 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
382 : :
383 : 887 : return prefetcher;
384 : : }
385 : :
386 : : /*
387 : : * Destroy a prefetcher and release all resources.
388 : : */
389 : : void
390 : 832 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
391 : : {
392 : 832 : lrq_free(prefetcher->streaming_read);
393 : 832 : hash_destroy(prefetcher->filter_table);
394 : 832 : pfree(prefetcher);
395 : 832 : }
396 : :
397 : : /*
398 : : * Provide access to the reader.
399 : : */
400 : : XLogReaderState *
401 : 2737736 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
402 : : {
403 : 2737736 : return prefetcher->reader;
404 : : }
405 : :
406 : : /*
407 : : * Update the statistics visible in the pg_stat_recovery_prefetch view.
408 : : */
409 : : void
410 : 1046406 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
411 : : {
412 : : uint32 io_depth;
413 : : uint32 completed;
414 : : int64 wal_distance;
415 : :
416 : :
417 : : /* How far ahead of replay are we now? */
418 [ + + ]: 1046406 : if (prefetcher->reader->decode_queue_tail)
419 : : {
420 : 1035447 : wal_distance =
421 : 1035447 : prefetcher->reader->decode_queue_tail->lsn -
422 : 1035447 : prefetcher->reader->decode_queue_head->lsn;
423 : : }
424 : : else
425 : : {
426 : 10959 : wal_distance = 0;
427 : : }
428 : :
429 : : /* How many IOs are currently in flight and completed? */
430 : 1046406 : io_depth = lrq_inflight(prefetcher->streaming_read);
431 : 1046406 : completed = lrq_completed(prefetcher->streaming_read);
432 : :
433 : : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434 : 1046406 : SharedStats->io_depth = io_depth;
435 : 1046406 : SharedStats->block_distance = io_depth + completed;
436 : 1046406 : SharedStats->wal_distance = wal_distance;
437 : :
438 : 1046406 : prefetcher->next_stats_shm_lsn =
439 : 1046406 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
440 : 1046406 : }
441 : :
442 : : /*
443 : : * A callback that examines the next block reference in the WAL, and possibly
444 : : * starts an IO so that a later read will be fast.
445 : : *
446 : : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
447 : : *
448 : : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449 : : * that isn't in the buffer pool, and the kernel has been asked to start
450 : : * reading it to make a future read system call faster. An LSN is written to
451 : : * *lsn, and the I/O will be considered to have completed once that LSN is
452 : : * replayed.
453 : : *
454 : : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455 : : * that it was already in the buffer pool, or we decided for various reasons
456 : : * not to prefetch.
457 : : */
458 : : static LsnReadQueueNextStatus
459 : 3503668 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
460 : : {
461 : 3503668 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
462 : 3503668 : XLogReaderState *reader = prefetcher->reader;
463 : 3503668 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
464 : :
465 : : /*
466 : : * We keep track of the record and block we're up to between calls with
467 : : * prefetcher->record and prefetcher->next_block_id.
468 : : */
469 : : for (;;)
470 : 2735720 : {
471 : : DecodedXLogRecord *record;
472 : :
473 : : /* Try to read a new future record, if we don't already have one. */
474 [ + + ]: 6239388 : if (prefetcher->record == NULL)
475 : : {
476 : : bool nonblocking;
477 : :
478 : : /*
479 : : * If there are already records or an error queued up that could
480 : : * be replayed, we don't want to block here. Otherwise, it's OK
481 : : * to block waiting for more data: presumably the caller has
482 : : * nothing else to do.
483 : : */
484 : 3350999 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
485 : :
486 : : /* Readahead is disabled until we replay past a certain point. */
1238 487 [ + + + + ]: 3350999 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
1248 488 : 598320 : return LRQ_NEXT_AGAIN;
489 : :
490 : 2752679 : record = XLogReadAhead(prefetcher->reader, nonblocking);
491 [ + + ]: 2752627 : if (record == NULL)
492 : : {
493 : : /*
494 : : * We can't read any more, due to an error or lack of data in
495 : : * nonblocking mode. Don't try to read ahead again until
496 : : * we've replayed everything already decoded.
497 : : */
1238 498 [ + + + - ]: 15015 : if (nonblocking && prefetcher->reader->decode_queue_tail)
499 : 14849 : prefetcher->no_readahead_until =
500 : 14849 : prefetcher->reader->decode_queue_tail->lsn;
501 : :
1248 502 : 15015 : return LRQ_NEXT_AGAIN;
503 : : }
504 : :
505 : : /*
506 : : * If prefetching is disabled, we don't need to analyze the record
507 : : * or issue any prefetches. We just need to cause one record to
508 : : * be decoded.
509 : : */
510 [ + - - + ]: 2737612 : if (!RecoveryPrefetchEnabled())
511 : : {
1248 tmunro@postgresql.or 512 :UBC 0 : *lsn = InvalidXLogRecPtr;
513 : 0 : return LRQ_NEXT_NO_IO;
514 : : }
515 : :
516 : : /* We have a new record to process. */
1248 tmunro@postgresql.or 517 :CBC 2737612 : prefetcher->record = record;
518 : 2737612 : prefetcher->next_block_id = 0;
519 : : }
520 : : else
521 : : {
522 : : /* Continue to process from last call, or last loop. */
523 : 2888389 : record = prefetcher->record;
524 : : }
525 : :
526 : : /*
527 : : * Check for operations that require us to filter out block ranges, or
528 : : * pause readahead completely.
529 : : */
530 [ + - ]: 5626001 : if (replaying_lsn < record->lsn)
531 : : {
532 : 5626001 : uint8 rmid = record->header.xl_rmid;
533 : 5626001 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
534 : :
535 [ + + ]: 5626001 : if (rmid == RM_XLOG_ID)
536 : : {
537 [ + + + + ]: 87341 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
538 : : record_type == XLOG_END_OF_RECOVERY)
539 : : {
540 : : /*
541 : : * These records might change the TLI. Avoid potential
542 : : * bugs if we were to allow "read TLI" and "replay TLI" to
543 : : * differ without more analysis.
544 : : */
545 : 1520 : prefetcher->no_readahead_until = record->lsn;
546 : :
547 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 : : "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
550 : : LSN_FORMAT_ARGS(record->lsn));
551 : : #endif
552 : :
553 : : /* Fall through so we move past this record. */
554 : : }
555 : : }
556 [ + + ]: 5538660 : else if (rmid == RM_DBASE_ID)
557 : : {
558 : : /*
559 : : * When databases are created with the file-copy strategy,
560 : : * there are no WAL records to tell us about the creation of
561 : : * individual relations.
562 : : */
563 [ + + ]: 36 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
564 : : {
565 : 4 : xl_dbase_create_file_copy_rec *xlrec =
566 : : (xl_dbase_create_file_copy_rec *) record->main_data;
1158 rhaas@postgresql.org 567 : 4 : RelFileLocator rlocator =
568 : 4 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
569 : :
570 : : /*
571 : : * Don't try to prefetch anything in this database until
572 : : * it has been created, or we might confuse the blocks of
573 : : * different generations, if a database OID or
574 : : * relfilenumber is reused. It's also more efficient than
575 : : * discovering that relations don't exist on disk yet with
576 : : * ENOENT errors.
577 : : */
578 : 4 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
579 : :
580 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 : : "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
583 : : rlocator.dbOid,
584 : : LSN_FORMAT_ARGS(record->lsn));
585 : : #endif
586 : : }
587 : : }
1248 tmunro@postgresql.or 588 [ + + ]: 5538624 : else if (rmid == RM_SMGR_ID)
589 : : {
590 [ + + ]: 15260 : if (record_type == XLOG_SMGR_CREATE)
591 : : {
592 : 15207 : xl_smgr_create *xlrec = (xl_smgr_create *)
593 : : record->main_data;
594 : :
595 [ + + ]: 15207 : if (xlrec->forkNum == MAIN_FORKNUM)
596 : : {
597 : : /*
598 : : * Don't prefetch anything for this whole relation
599 : : * until it has been created. Otherwise we might
600 : : * confuse the blocks of different generations, if a
601 : : * relfilenumber is reused. This also avoids the need
602 : : * to discover the problem via extra syscalls that
603 : : * report ENOENT.
604 : : */
1158 rhaas@postgresql.org 605 : 13639 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
606 : : record->lsn);
607 : :
608 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 : : "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
611 : : xlrec->rlocator.spcOid,
612 : : xlrec->rlocator.dbOid,
613 : : xlrec->rlocator.relNumber,
614 : : LSN_FORMAT_ARGS(record->lsn));
615 : : #endif
616 : : }
617 : : }
1248 tmunro@postgresql.or 618 [ + - ]: 53 : else if (record_type == XLOG_SMGR_TRUNCATE)
619 : : {
620 : 53 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
621 : : record->main_data;
622 : :
623 : : /*
624 : : * Don't consider prefetching anything in the truncated
625 : : * range until the truncation has been performed.
626 : : */
1158 rhaas@postgresql.org 627 : 53 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
628 : : xlrec->blkno,
629 : : record->lsn);
630 : :
631 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
634 : : xlrec->rlocator.spcOid,
635 : : xlrec->rlocator.dbOid,
636 : : xlrec->rlocator.relNumber,
637 : : xlrec->blkno,
638 : : LSN_FORMAT_ARGS(record->lsn));
639 : : #endif
640 : : }
641 : : }
642 : : }
643 : :
644 : : /* Scan the block references, starting where we left off last time. */
1248 tmunro@postgresql.or 645 [ + + ]: 5628383 : while (prefetcher->next_block_id <= record->max_block_id)
646 : : {
647 : 2890771 : int block_id = prefetcher->next_block_id++;
648 : 2890771 : DecodedBkpBlock *block = &record->blocks[block_id];
649 : : SMgrRelation reln;
650 : : PrefetchBufferResult result;
651 : :
652 [ + + ]: 2890771 : if (!block->in_use)
653 : 2158 : continue;
654 : :
1110 john.naylor@postgres 655 [ - + ]: 2888613 : Assert(!BufferIsValid(block->prefetch_buffer));
656 : :
657 : : /*
658 : : * Record the LSN of this record. When it's replayed,
659 : : * LsnReadQueue will consider any IOs submitted for earlier LSNs
660 : : * to be finished.
661 : : */
1248 tmunro@postgresql.or 662 : 2888613 : *lsn = record->lsn;
663 : :
664 : : /* We don't try to prefetch anything but the main fork for now. */
665 [ + + ]: 2888613 : if (block->forknum != MAIN_FORKNUM)
666 : : {
667 : 2888389 : return LRQ_NEXT_NO_IO;
668 : : }
669 : :
670 : : /*
671 : : * If there is a full page image attached, we won't be reading the
672 : : * page, so don't bother trying to prefetch.
673 : : */
674 [ + + ]: 2875456 : if (block->has_image)
675 : : {
676 : 2362014 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
677 : 2362014 : return LRQ_NEXT_NO_IO;
678 : : }
679 : :
680 : : /* There is no point in reading a page that will be zeroed. */
681 [ + + ]: 513442 : if (block->flags & BKPBLOCK_WILL_INIT)
682 : : {
683 : 6326 : XLogPrefetchIncrement(&SharedStats->skip_init);
684 : 6326 : return LRQ_NEXT_NO_IO;
685 : : }
686 : :
687 : : /* Should we skip prefetching this block due to a filter? */
1158 rhaas@postgresql.org 688 [ + + ]: 507116 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
689 : : {
1248 tmunro@postgresql.or 690 : 68959 : XLogPrefetchIncrement(&SharedStats->skip_new);
691 : 68959 : return LRQ_NEXT_NO_IO;
692 : : }
693 : :
694 : : /* There is no point in repeatedly prefetching the same block. */
695 [ + + ]: 1133181 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
696 : : {
697 [ + + ]: 1121357 : if (block->blkno == prefetcher->recent_block[i] &&
1158 rhaas@postgresql.org 698 [ + + + + : 443157 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
+ + ]
699 : : {
700 : : /*
701 : : * XXX If we also remembered where it was, we could set
702 : : * recent_buffer so that recovery could skip smgropen()
703 : : * and a buffer table lookup.
704 : : */
1248 tmunro@postgresql.or 705 : 426333 : XLogPrefetchIncrement(&SharedStats->skip_rep);
706 : 426333 : return LRQ_NEXT_NO_IO;
707 : : }
708 : : }
1158 rhaas@postgresql.org 709 : 11824 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
1248 tmunro@postgresql.or 710 : 11824 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
711 : 11824 : prefetcher->recent_idx =
712 : 11824 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
713 : :
714 : : /*
715 : : * We could try to have a fast path for repeated references to the
716 : : * same relation (with some scheme to handle invalidations
717 : : * safely), but for now we'll call smgropen() every time.
718 : : */
552 heikki.linnakangas@i 719 : 11824 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
720 : :
721 : : /*
722 : : * If the relation file doesn't exist on disk, for example because
723 : : * we're replaying after a crash and the file will be created and
724 : : * then unlinked by WAL that hasn't been replayed yet, suppress
725 : : * further prefetching in the relation until this record is
726 : : * replayed.
727 : : */
1248 tmunro@postgresql.or 728 [ + + ]: 11824 : if (!smgrexists(reln, MAIN_FORKNUM))
729 : : {
730 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 : : "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
733 : : reln->smgr_rlocator.locator.spcOid,
734 : : reln->smgr_rlocator.locator.dbOid,
735 : : reln->smgr_rlocator.locator.relNumber,
736 : : LSN_FORMAT_ARGS(record->lsn));
737 : : #endif
1158 rhaas@postgresql.org 738 : 2 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
739 : : record->lsn);
1248 tmunro@postgresql.or 740 : 2 : XLogPrefetchIncrement(&SharedStats->skip_new);
741 : 2 : return LRQ_NEXT_NO_IO;
742 : : }
743 : :
744 : : /*
745 : : * If the relation isn't big enough to contain the referenced
746 : : * block yet, suppress prefetching of this block and higher until
747 : : * this record is replayed.
748 : : */
749 [ + + ]: 11822 : if (block->blkno >= smgrnblocks(reln, block->forknum))
750 : : {
751 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
754 : : reln->smgr_rlocator.locator.spcOid,
755 : : reln->smgr_rlocator.locator.dbOid,
756 : : reln->smgr_rlocator.locator.relNumber,
757 : : block->blkno,
758 : : LSN_FORMAT_ARGS(record->lsn));
759 : : #endif
1158 rhaas@postgresql.org 760 : 1354 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
761 : : record->lsn);
1248 tmunro@postgresql.or 762 : 1354 : XLogPrefetchIncrement(&SharedStats->skip_new);
763 : 1354 : return LRQ_NEXT_NO_IO;
764 : : }
765 : :
766 : : /* Try to initiate prefetching. */
767 : 10468 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
768 [ + + ]: 10468 : if (BufferIsValid(result.recent_buffer))
769 : : {
770 : : /* Cache hit, nothing to do. */
771 : 3950 : XLogPrefetchIncrement(&SharedStats->hit);
772 : 3950 : block->prefetch_buffer = result.recent_buffer;
773 : 3950 : return LRQ_NEXT_NO_IO;
774 : : }
775 [ + + ]: 6518 : else if (result.initiated_io)
776 : : {
777 : : /* Cache miss, I/O (presumably) started. */
778 : 6294 : XLogPrefetchIncrement(&SharedStats->prefetch);
779 : 6294 : block->prefetch_buffer = InvalidBuffer;
780 : 6294 : return LRQ_NEXT_IO;
781 : : }
882 782 [ - + ]: 224 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
783 : : {
784 : : /*
785 : : * This shouldn't be possible, because we already determined
786 : : * that the relation exists on disk and is big enough.
787 : : * Something is wrong with the cache invalidation for
788 : : * smgrexists(), smgrnblocks(), or the file was unlinked or
789 : : * truncated beneath our feet?
790 : : */
1248 tmunro@postgresql.or 791 [ # # ]:UBC 0 : elog(ERROR,
792 : : "could not prefetch relation %u/%u/%u block %u",
793 : : reln->smgr_rlocator.locator.spcOid,
794 : : reln->smgr_rlocator.locator.dbOid,
795 : : reln->smgr_rlocator.locator.relNumber,
796 : : block->blkno);
797 : : }
798 : : }
799 : :
800 : : /*
801 : : * Several callsites need to be able to read exactly one record
802 : : * without any internal readahead. Examples: xlog.c reading
803 : : * checkpoint records with emode set to PANIC, which might otherwise
804 : : * cause XLogPageRead() to panic on some future page, and xlog.c
805 : : * determining where to start writing WAL next, which depends on the
806 : : * contents of the reader's internal buffer after reading one record.
807 : : * Therefore, don't even think about prefetching until the first
808 : : * record after XLogPrefetcherBeginRead() has been consumed.
809 : : */
1248 tmunro@postgresql.or 810 [ + + ]:CBC 2737612 : if (prefetcher->reader->decode_queue_tail &&
811 [ + + ]: 2737611 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
812 : 1892 : return LRQ_NEXT_AGAIN;
813 : :
814 : : /* Advance to the next record. */
815 : 2735720 : prefetcher->record = NULL;
816 : : }
817 : : pg_unreachable();
818 : : }
819 : :
820 : : /*
821 : : * Expose statistics about recovery prefetching.
822 : : */
823 : : Datum
824 : 6 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
825 : : {
826 : : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
827 : 6 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
828 : : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
829 : : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
830 : :
1054 michael@paquier.xyz 831 : 6 : InitMaterializedSRF(fcinfo, 0);
832 : :
1248 tmunro@postgresql.or 833 [ + + ]: 66 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
834 : 60 : nulls[i] = false;
835 : :
836 : 6 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
837 : 6 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
838 : 6 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
839 : 6 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
840 : 6 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
841 : 6 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
842 : 6 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
843 : 6 : values[7] = Int32GetDatum(SharedStats->wal_distance);
844 : 6 : values[8] = Int32GetDatum(SharedStats->block_distance);
845 : 6 : values[9] = Int32GetDatum(SharedStats->io_depth);
846 : 6 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
847 : :
848 : 6 : return (Datum) 0;
849 : : }
850 : :
851 : : /*
852 : : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
853 : : * has been replayed.
854 : : */
855 : : static inline void
1158 rhaas@postgresql.org 856 : 15052 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
857 : : BlockNumber blockno, XLogRecPtr lsn)
858 : : {
859 : : XLogPrefetcherFilter *filter;
860 : : bool found;
861 : :
862 : 15052 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
1248 tmunro@postgresql.or 863 [ + + ]: 15052 : if (!found)
864 : : {
865 : : /*
866 : : * Don't allow any prefetching of this block or higher until replayed.
867 : : */
868 : 15045 : filter->filter_until_replayed = lsn;
869 : 15045 : filter->filter_from_block = blockno;
870 : 15045 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
871 : : }
872 : : else
873 : : {
874 : : /*
875 : : * We were already filtering this rlocator. Extend the filter's
876 : : * lifetime to cover this WAL record, but leave the lower of the block
877 : : * numbers there because we don't want to have to track individual
878 : : * blocks.
879 : : */
880 : 7 : filter->filter_until_replayed = lsn;
881 : 7 : dlist_delete(&filter->link);
882 : 7 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
883 : 7 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
884 : : }
885 : 15052 : }
886 : :
887 : : /*
888 : : * Have we replayed any records that caused us to begin filtering a block
889 : : * range? That means that relations should have been created, extended or
890 : : * dropped as required, so we can stop filtering out accesses to a given
891 : : * relfilenumber.
892 : : */
893 : : static inline void
894 : 2737847 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
895 : : {
896 [ + + ]: 2752891 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
897 : : {
898 : 402472 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
899 : : link,
900 : : &prefetcher->filter_queue);
901 : :
902 [ + + ]: 402472 : if (filter->filter_until_replayed >= replaying_lsn)
903 : 387428 : break;
904 : :
905 : 15044 : dlist_delete(&filter->link);
906 : 15044 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
907 : : }
908 : 2737847 : }
909 : :
910 : : /*
911 : : * Check if a given block should be skipped due to a filter.
912 : : */
913 : : static inline bool
1158 rhaas@postgresql.org 914 : 507116 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
915 : : BlockNumber blockno)
916 : : {
917 : : /*
918 : : * Test for empty queue first, because we expect it to be empty most of
919 : : * the time and we can avoid the hash table lookup in that case.
920 : : */
1248 tmunro@postgresql.or 921 [ + + ]: 507116 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
922 : : {
923 : : XLogPrefetcherFilter *filter;
924 : :
925 : : /* See if the block range is filtered. */
1158 rhaas@postgresql.org 926 : 88334 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1248 tmunro@postgresql.or 927 [ + + + + ]: 88334 : if (filter && filter->filter_from_block <= blockno)
928 : : {
929 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
932 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
933 : : LSN_FORMAT_ARGS(filter->filter_until_replayed),
934 : : filter->filter_from_block);
935 : : #endif
936 : 68959 : return true;
937 : : }
938 : :
939 : : /* See if the whole database is filtered. */
1158 rhaas@postgresql.org 940 : 19375 : rlocator.relNumber = InvalidRelFileNumber;
941 : 19375 : rlocator.spcOid = InvalidOid;
942 : 19375 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1248 tmunro@postgresql.or 943 [ - + ]: 19375 : if (filter)
944 : : {
945 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
948 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
949 : : LSN_FORMAT_ARGS(filter->filter_until_replayed));
950 : : #endif
1248 tmunro@postgresql.or 951 :UBC 0 : return true;
952 : : }
953 : : }
954 : :
1248 tmunro@postgresql.or 955 :CBC 438157 : return false;
956 : : }
957 : :
958 : : /*
959 : : * A wrapper for XLogBeginRead() that also resets the prefetcher.
960 : : */
961 : : void
962 : 1892 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
963 : : {
964 : : /* This will forget about any in-flight IO. */
965 : 1892 : prefetcher->reconfigure_count--;
966 : :
967 : : /* Book-keeping to avoid readahead on first read. */
968 : 1892 : prefetcher->begin_ptr = recPtr;
969 : :
970 : 1892 : prefetcher->no_readahead_until = 0;
971 : :
972 : : /* This will forget about any queued up records in the decoder. */
973 : 1892 : XLogBeginRead(prefetcher->reader, recPtr);
974 : 1892 : }
975 : :
976 : : /*
977 : : * A wrapper for XLogReadRecord() that provides the same interface, but also
978 : : * tries to initiate I/O for blocks referenced in future WAL records.
979 : : */
980 : : XLogRecord *
981 : 2737847 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
982 : : {
983 : : DecodedXLogRecord *record;
984 : : XLogRecPtr replayed_up_to;
985 : :
986 : : /*
987 : : * See if it's time to reset the prefetching machinery, because a relevant
988 : : * GUC was changed.
989 : : */
990 [ + + ]: 2737847 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
991 : : {
992 : : uint32 max_distance;
993 : : uint32 max_inflight;
994 : :
995 [ + + ]: 1902 : if (prefetcher->streaming_read)
996 : 1015 : lrq_free(prefetcher->streaming_read);
997 : :
998 [ + - + - ]: 1902 : if (RecoveryPrefetchEnabled())
999 : : {
1094 1000 [ - + ]: 1902 : Assert(maintenance_io_concurrency > 0);
1001 : 1902 : max_inflight = maintenance_io_concurrency;
1248 1002 : 1902 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1003 : : }
1004 : : else
1005 : : {
1248 tmunro@postgresql.or 1006 :UBC 0 : max_inflight = 1;
1007 : 0 : max_distance = 1;
1008 : : }
1009 : :
1248 tmunro@postgresql.or 1010 :CBC 1902 : prefetcher->streaming_read = lrq_alloc(max_distance,
1011 : : max_inflight,
1012 : : (uintptr_t) prefetcher,
1013 : : XLogPrefetcherNextBlock);
1014 : :
1015 : 1902 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1016 : : }
1017 : :
1018 : : /*
1019 : : * Release last returned record, if there is one, as it's now been
1020 : : * replayed.
1021 : : */
1094 1022 : 2737847 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1023 : :
1024 : : /*
1025 : : * Can we drop any filters yet? If we were waiting for a relation to be
1026 : : * created or extended, it is now OK to access blocks in the covered
1027 : : * range.
1028 : : */
1029 : 2737847 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1030 : :
1031 : : /*
1032 : : * All IO initiated by earlier WAL is now completed. This might trigger
1033 : : * further prefetching.
1034 : : */
1035 : 2737847 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1036 : :
1037 : : /*
1038 : : * If there's nothing queued yet, then start prefetching to cause at least
1039 : : * one record to be queued.
1040 : : */
1248 1041 [ + + ]: 2737795 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1042 : : {
1094 1043 [ - + ]: 18 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1044 [ - + ]: 18 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
1248 1045 : 18 : lrq_prefetch(prefetcher->streaming_read);
1046 : : }
1047 : :
1048 : : /* Read the next record. */
1049 : 2737795 : record = XLogNextRecord(prefetcher->reader, errmsg);
1050 [ + + ]: 2737795 : if (!record)
1051 : 254 : return NULL;
1052 : :
1053 : : /*
1054 : : * The record we just got is the "current" one, for the benefit of the
1055 : : * XLogRecXXX() macros.
1056 : : */
1057 [ - + ]: 2737541 : Assert(record == prefetcher->reader->record);
1058 : :
1059 : : /*
1060 : : * If maintenance_io_concurrency is set very low, we might have started
1061 : : * prefetching some but not all of the blocks referenced in the record
1062 : : * we're about to return. Forget about the rest of the blocks in this
1063 : : * record by dropping the prefetcher's reference to it.
1064 : : */
1094 1065 [ + + ]: 2737541 : if (record == prefetcher->record)
1066 : 1892 : prefetcher->record = NULL;
1067 : :
1068 : : /*
1069 : : * See if it's time to compute some statistics, because enough WAL has
1070 : : * been processed.
1071 : : */
1248 1072 [ + + ]: 2737541 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1073 : 1034615 : XLogPrefetcherComputeStats(prefetcher);
1074 : :
1075 [ - + ]: 2737541 : Assert(record == prefetcher->reader->record);
1076 : :
1077 : 2737541 : return &record->header;
1078 : : }
1079 : :
1080 : : bool
1081 : 1067 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1082 : : {
1083 : : #ifndef USE_PREFETCH
1084 : : if (*new_value == RECOVERY_PREFETCH_ON)
1085 : : {
1086 : : GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1087 : : return false;
1088 : : }
1089 : : #endif
1090 : :
1091 : 1067 : return true;
1092 : : }
1093 : :
1094 : : void
1095 : 1067 : assign_recovery_prefetch(int new_value, void *extra)
1096 : : {
1097 : : /* Reconfigure prefetching, because a setting it depends on changed. */
1098 : 1067 : recovery_prefetch = new_value;
1099 [ - + ]: 1067 : if (AmStartupProcess())
1248 tmunro@postgresql.or 1100 :UBC 0 : XLogPrefetchReconfigure();
1248 tmunro@postgresql.or 1101 :CBC 1067 : }
|