Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogprefetcher.c
4 : : * Prefetching support for recovery.
5 : : *
6 : : * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/transam/xlogprefetcher.c
12 : : *
13 : : * This module provides a drop-in replacement for an XLogReader that tries to
14 : : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : : * accessed in the near future are not already in the buffer pool, it initiates
16 : : * I/Os that might complete before the caller eventually needs the data. When
17 : : * referenced blocks are found in the buffer pool already, the buffer is
18 : : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : : * avoid a second buffer mapping table lookup.
20 : : *
21 : : * Currently, only the main fork is considered for prefetching. Currently,
22 : : * prefetching is only effective on systems where PrefetchBuffer() does
23 : : * something useful (mainly Linux).
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : :
28 : : #include "postgres.h"
29 : :
30 : : #include "access/xlogprefetcher.h"
31 : : #include "access/xlogreader.h"
32 : : #include "catalog/pg_control.h"
33 : : #include "catalog/storage_xlog.h"
34 : : #include "commands/dbcommands_xlog.h"
35 : : #include "funcapi.h"
36 : : #include "miscadmin.h"
37 : : #include "port/atomics.h"
38 : : #include "storage/bufmgr.h"
39 : : #include "storage/fd.h"
40 : : #include "storage/shmem.h"
41 : : #include "storage/smgr.h"
42 : : #include "storage/subsystems.h"
43 : : #include "utils/fmgrprotos.h"
44 : : #include "utils/guc_hooks.h"
45 : : #include "utils/hsearch.h"
46 : : #include "utils/timestamp.h"
47 : : #include "utils/tuplestore.h"
48 : :
49 : : /*
50 : : * Every time we process this much WAL, we'll update the values in
51 : : * pg_stat_recovery_prefetch.
52 : : */
53 : : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
54 : :
55 : : /*
56 : : * To detect repeated access to the same block and skip useless extra system
57 : : * calls, we remember a small window of recently prefetched blocks.
58 : : */
59 : : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
60 : :
61 : : /*
62 : : * When maintenance_io_concurrency is not saturated, we're prepared to look
63 : : * ahead up to N times that number of block references.
64 : : */
65 : : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
66 : :
67 : : /* Define to log internal debugging messages. */
68 : : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
69 : :
70 : : /* GUCs */
71 : : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
72 : :
73 : : #ifdef USE_PREFETCH
74 : : #define RecoveryPrefetchEnabled() \
75 : : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
76 : : maintenance_io_concurrency > 0)
77 : : #else
78 : : #define RecoveryPrefetchEnabled() false
79 : : #endif
80 : :
81 : : static int XLogPrefetchReconfigureCount = 0;
82 : :
83 : : /*
84 : : * Enum used to report whether an IO should be started.
85 : : */
86 : : typedef enum
87 : : {
88 : : LRQ_NEXT_NO_IO,
89 : : LRQ_NEXT_IO,
90 : : LRQ_NEXT_AGAIN,
91 : : } LsnReadQueueNextStatus;
92 : :
93 : : /*
94 : : * Type of callback that can decide which block to prefetch next. For now
95 : : * there is only one.
96 : : */
97 : : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
98 : : XLogRecPtr *lsn);
99 : :
100 : : /*
101 : : * A simple circular queue of LSNs, using to control the number of
102 : : * (potentially) inflight IOs. This stands in for a later more general IO
103 : : * control mechanism, which is why it has the apparently unnecessary
104 : : * indirection through a function pointer.
105 : : */
106 : : typedef struct LsnReadQueue
107 : : {
108 : : LsnReadQueueNextFun next;
109 : : uintptr_t lrq_private;
110 : : uint32 max_inflight;
111 : : uint32 inflight;
112 : : uint32 completed;
113 : : uint32 head;
114 : : uint32 tail;
115 : : uint32 size;
116 : : struct
117 : : {
118 : : bool io;
119 : : XLogRecPtr lsn;
120 : : } queue[FLEXIBLE_ARRAY_MEMBER];
121 : : } LsnReadQueue;
122 : :
123 : : /*
124 : : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
125 : : * blocks that will be soon be referenced, to try to avoid IO stalls.
126 : : */
127 : : struct XLogPrefetcher
128 : : {
129 : : /* WAL reader and current reading state. */
130 : : XLogReaderState *reader;
131 : : DecodedXLogRecord *record;
132 : : int next_block_id;
133 : :
134 : : /* When to publish stats. */
135 : : XLogRecPtr next_stats_shm_lsn;
136 : :
137 : : /* Book-keeping to avoid accessing blocks that don't exist yet. */
138 : : HTAB *filter_table;
139 : : dlist_head filter_queue;
140 : :
141 : : /* Book-keeping to avoid repeat prefetches. */
142 : : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
143 : : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
144 : : int recent_idx;
145 : :
146 : : /* Book-keeping to disable prefetching temporarily. */
147 : : XLogRecPtr no_readahead_until;
148 : :
149 : : /* IO depth manager. */
150 : : LsnReadQueue *streaming_read;
151 : :
152 : : XLogRecPtr begin_ptr;
153 : :
154 : : int reconfigure_count;
155 : : };
156 : :
157 : : /*
158 : : * A temporary filter used to track block ranges that haven't been created
159 : : * yet, whole relations that haven't been created yet, and whole relations
160 : : * that (we assume) have already been dropped, or will be created by bulk WAL
161 : : * operators.
162 : : */
163 : : typedef struct XLogPrefetcherFilter
164 : : {
165 : : RelFileLocator rlocator;
166 : : XLogRecPtr filter_until_replayed;
167 : : BlockNumber filter_from_block;
168 : : dlist_node link;
169 : : } XLogPrefetcherFilter;
170 : :
171 : : /*
172 : : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
173 : : */
174 : : typedef struct XLogPrefetchStats
175 : : {
176 : : pg_atomic_uint64 reset_time; /* Time of last reset. */
177 : : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
178 : : pg_atomic_uint64 hit; /* Blocks already in cache. */
179 : : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
180 : : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
181 : : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
182 : : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
183 : :
184 : : /* Dynamic values */
185 : : int wal_distance; /* Number of WAL bytes ahead. */
186 : : int block_distance; /* Number of block references ahead. */
187 : : int io_depth; /* Number of I/Os in progress. */
188 : : } XLogPrefetchStats;
189 : :
190 : : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
191 : : RelFileLocator rlocator,
192 : : BlockNumber blockno,
193 : : XLogRecPtr lsn);
194 : : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
195 : : RelFileLocator rlocator,
196 : : BlockNumber blockno);
197 : : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
198 : : XLogRecPtr replaying_lsn);
199 : : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
200 : : XLogRecPtr *lsn);
201 : :
202 : : static XLogPrefetchStats *SharedStats;
203 : :
204 : : static void XLogPrefetchShmemRequest(void *arg);
205 : : static void XLogPrefetchShmemInit(void *arg);
206 : :
207 : : const ShmemCallbacks XLogPrefetchShmemCallbacks = {
208 : : .request_fn = XLogPrefetchShmemRequest,
209 : : .init_fn = XLogPrefetchShmemInit,
210 : : };
211 : :
212 : : static inline LsnReadQueue *
1489 tmunro@postgresql.or 213 :CBC 2364 : lrq_alloc(uint32 max_distance,
214 : : uint32 max_inflight,
215 : : uintptr_t lrq_private,
216 : : LsnReadQueueNextFun next)
217 : : {
218 : : LsnReadQueue *lrq;
219 : : uint32 size;
220 : :
221 [ - + ]: 2364 : Assert(max_distance >= max_inflight);
222 : :
223 : 2364 : size = max_distance + 1; /* full ring buffer has a gap */
224 : 2364 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
225 : 2364 : lrq->lrq_private = lrq_private;
226 : 2364 : lrq->max_inflight = max_inflight;
227 : 2364 : lrq->size = size;
228 : 2364 : lrq->next = next;
229 : 2364 : lrq->head = 0;
230 : 2364 : lrq->tail = 0;
231 : 2364 : lrq->inflight = 0;
232 : 2364 : lrq->completed = 0;
233 : :
234 : 2364 : return lrq;
235 : : }
236 : :
237 : : static inline void
238 : 2293 : lrq_free(LsnReadQueue *lrq)
239 : : {
240 : 2293 : pfree(lrq);
241 : 2293 : }
242 : :
243 : : static inline uint32
244 : 1103002 : lrq_inflight(LsnReadQueue *lrq)
245 : : {
246 : 1103002 : return lrq->inflight;
247 : : }
248 : :
249 : : static inline uint32
250 : 1103002 : lrq_completed(LsnReadQueue *lrq)
251 : : {
252 : 1103002 : return lrq->completed;
253 : : }
254 : :
255 : : static inline void
256 : 2946558 : lrq_prefetch(LsnReadQueue *lrq)
257 : : {
258 : : /* Try to start as many IOs as we can within our limits. */
259 [ + + ]: 8922950 : while (lrq->inflight < lrq->max_inflight &&
260 [ + + ]: 5972046 : lrq->inflight + lrq->completed < lrq->size - 1)
261 : : {
262 [ - + ]: 3842463 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
263 [ + + + - ]: 3842463 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
264 : : {
265 : 812565 : case LRQ_NEXT_AGAIN:
266 : 812565 : return;
267 : 7123 : case LRQ_NEXT_IO:
268 : 7123 : lrq->queue[lrq->head].io = true;
269 : 7123 : lrq->inflight++;
270 : 7123 : break;
271 : 3022711 : case LRQ_NEXT_NO_IO:
272 : 3022711 : lrq->queue[lrq->head].io = false;
273 : 3022711 : lrq->completed++;
274 : 3022711 : break;
275 : : }
276 : 3029834 : lrq->head++;
277 [ + + ]: 3029834 : if (lrq->head == lrq->size)
278 : 46545 : lrq->head = 0;
279 : : }
280 : : }
281 : :
282 : : static inline void
283 : 2946534 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
284 : : {
285 : : /*
286 : : * We know that LSNs before 'lsn' have been replayed, so we can now assume
287 : : * that any IOs that were started before then have finished.
288 : : */
289 [ + + ]: 8922829 : while (lrq->tail != lrq->head &&
290 [ + + ]: 5876966 : lrq->queue[lrq->tail].lsn < lsn)
291 : : {
292 [ + + ]: 3029761 : if (lrq->queue[lrq->tail].io)
293 : 7123 : lrq->inflight--;
294 : : else
295 : 3022638 : lrq->completed--;
296 : 3029761 : lrq->tail++;
297 [ + + ]: 3029761 : if (lrq->tail == lrq->size)
298 : 46544 : lrq->tail = 0;
299 : : }
300 [ + - + - ]: 2946534 : if (RecoveryPrefetchEnabled())
301 : 2946534 : lrq_prefetch(lrq);
302 : 2946470 : }
303 : :
304 : : static void
29 heikki.linnakangas@i 305 :GNC 1244 : XLogPrefetchShmemRequest(void *arg)
306 : : {
307 : 1244 : ShmemRequestStruct(.name = "XLogPrefetchStats",
308 : : .size = sizeof(XLogPrefetchStats),
309 : : .ptr = (void **) &SharedStats,
310 : : );
311 : 1244 : }
312 : :
313 : : static void
314 : 1241 : XLogPrefetchShmemInit(void *arg)
315 : : {
316 : 1241 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
317 : 1241 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
318 : 1241 : pg_atomic_init_u64(&SharedStats->hit, 0);
319 : 1241 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
320 : 1241 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
321 : 1241 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
322 : 1241 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
1489 tmunro@postgresql.or 323 :GIC 1241 : }
324 : :
325 : : /*
326 : : * Reset all counters to zero.
327 : : */
328 : : void
1489 tmunro@postgresql.or 329 :CBC 4 : XLogPrefetchResetStats(void)
330 : : {
331 : 4 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
332 : 4 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
333 : 4 : pg_atomic_write_u64(&SharedStats->hit, 0);
334 : 4 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
335 : 4 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
336 : 4 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
337 : 4 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
338 : 4 : }
339 : :
340 : :
341 : : /*
342 : : * Called when any GUC is changed that affects prefetching.
343 : : */
344 : : void
345 : 13 : XLogPrefetchReconfigure(void)
346 : : {
347 : 13 : XLogPrefetchReconfigureCount++;
348 : 13 : }
349 : :
350 : : /*
351 : : * Increment a counter in shared memory. This is equivalent to *counter++ on a
352 : : * plain uint64 without any memory barrier or locking, except on platforms
353 : : * where readers can't read uint64 without possibly observing a torn value.
354 : : */
355 : : static inline void
356 : 3013354 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
357 : : {
358 [ + + - + ]: 3013354 : Assert(AmStartupProcess() || !IsUnderPostmaster);
359 : 3013354 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
360 : 3013354 : }
361 : :
362 : : /*
363 : : * Create a prefetcher that is ready to begin prefetching blocks referenced by
364 : : * WAL records.
365 : : */
366 : : XLogPrefetcher *
367 : 1081 : XLogPrefetcherAllocate(XLogReaderState *reader)
368 : : {
369 : : XLogPrefetcher *prefetcher;
370 : : HASHCTL ctl;
371 : :
146 michael@paquier.xyz 372 :GNC 1081 : prefetcher = palloc0_object(XLogPrefetcher);
1489 tmunro@postgresql.or 373 :CBC 1081 : prefetcher->reader = reader;
374 : :
632 heikki.linnakangas@i 375 : 1081 : ctl.keysize = sizeof(RelFileLocator);
376 : 1081 : ctl.entrysize = sizeof(XLogPrefetcherFilter);
1489 tmunro@postgresql.or 377 : 1081 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378 : : &ctl, HASH_ELEM | HASH_BLOBS);
379 : 1081 : dlist_init(&prefetcher->filter_queue);
380 : :
381 : 1081 : SharedStats->wal_distance = 0;
382 : 1081 : SharedStats->block_distance = 0;
383 : 1081 : SharedStats->io_depth = 0;
384 : :
385 : : /* First usage will cause streaming_read to be allocated. */
386 : 1081 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
387 : :
388 : 1081 : return prefetcher;
389 : : }
390 : :
391 : : /*
392 : : * Destroy a prefetcher and release all resources.
393 : : */
394 : : void
395 : 1010 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
396 : : {
397 : 1010 : lrq_free(prefetcher->streaming_read);
398 : 1010 : hash_destroy(prefetcher->filter_table);
399 : 1010 : pfree(prefetcher);
400 : 1010 : }
401 : :
402 : : /*
403 : : * Provide access to the reader.
404 : : */
405 : : XLogReaderState *
406 : 2946394 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
407 : : {
408 : 2946394 : return prefetcher->reader;
409 : : }
410 : :
411 : : /*
412 : : * Update the statistics visible in the pg_stat_recovery_prefetch view.
413 : : */
414 : : void
415 : 1102978 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
416 : : {
417 : : uint32 io_depth;
418 : : uint32 completed;
419 : : int64 wal_distance;
420 : :
421 : :
422 : : /* How far ahead of replay are we now? */
423 [ + + ]: 1102978 : if (prefetcher->reader->decode_queue_tail)
424 : : {
425 : 1088410 : wal_distance =
426 : 1088410 : prefetcher->reader->decode_queue_tail->lsn -
427 : 1088410 : prefetcher->reader->decode_queue_head->lsn;
428 : : }
429 : : else
430 : : {
431 : 14568 : wal_distance = 0;
432 : : }
433 : :
434 : : /* How many IOs are currently in flight and completed? */
435 : 1102978 : io_depth = lrq_inflight(prefetcher->streaming_read);
436 : 1102978 : completed = lrq_completed(prefetcher->streaming_read);
437 : :
438 : : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
439 : 1102978 : SharedStats->io_depth = io_depth;
440 : 1102978 : SharedStats->block_distance = io_depth + completed;
441 : 1102978 : SharedStats->wal_distance = wal_distance;
442 : :
443 : 1102978 : prefetcher->next_stats_shm_lsn =
444 : 1102978 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
445 : 1102978 : }
446 : :
447 : : /*
448 : : * A callback that examines the next block reference in the WAL, and possibly
449 : : * starts an IO so that a later read will be fast.
450 : : *
451 : : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
452 : : *
453 : : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
454 : : * that isn't in the buffer pool, and the kernel has been asked to start
455 : : * reading it to make a future read system call faster. An LSN is written to
456 : : * *lsn, and the I/O will be considered to have completed once that LSN is
457 : : * replayed.
458 : : *
459 : : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
460 : : * that it was already in the buffer pool, or we decided for various reasons
461 : : * not to prefetch.
462 : : */
463 : : static LsnReadQueueNextStatus
464 : 3842463 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
465 : : {
466 : 3842463 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
467 : 3842463 : XLogReaderState *reader = prefetcher->reader;
468 : 3842463 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
469 : :
470 : : /*
471 : : * We keep track of the record and block we're up to between calls with
472 : : * prefetcher->record and prefetcher->next_block_id.
473 : : */
474 : : for (;;)
475 : 2943893 : {
476 : : DecodedXLogRecord *record;
477 : :
478 : : /* Try to read a new future record, if we don't already have one. */
479 [ + + ]: 6786356 : if (prefetcher->record == NULL)
480 : : {
481 : : bool nonblocking;
482 : :
483 : : /*
484 : : * If there are already records or an error queued up that could
485 : : * be replayed, we don't want to block here. Otherwise, it's OK
486 : : * to block waiting for more data: presumably the caller has
487 : : * nothing else to do.
488 : : */
489 : 3756522 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
490 : :
491 : : /* Readahead is disabled until we replay past a certain point. */
1479 492 [ + + + + ]: 3756522 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
1489 493 : 791754 : return LRQ_NEXT_AGAIN;
494 : :
495 : 2964768 : record = XLogReadAhead(prefetcher->reader, nonblocking);
496 [ + + ]: 2964704 : if (record == NULL)
497 : : {
498 : : /*
499 : : * We can't read any more, due to an error or lack of data in
500 : : * nonblocking mode. Don't try to read ahead again until
501 : : * we've replayed everything already decoded.
502 : : */
1479 503 [ + + + - ]: 18462 : if (nonblocking && prefetcher->reader->decode_queue_tail)
504 : 18248 : prefetcher->no_readahead_until =
505 : 18248 : prefetcher->reader->decode_queue_tail->lsn;
506 : :
1489 507 : 18462 : return LRQ_NEXT_AGAIN;
508 : : }
509 : :
510 : : /*
511 : : * If prefetching is disabled, we don't need to analyze the record
512 : : * or issue any prefetches. We just need to cause one record to
513 : : * be decoded.
514 : : */
515 [ + - - + ]: 2946242 : if (!RecoveryPrefetchEnabled())
516 : : {
1489 tmunro@postgresql.or 517 :UBC 0 : *lsn = InvalidXLogRecPtr;
518 : 0 : return LRQ_NEXT_NO_IO;
519 : : }
520 : :
521 : : /* We have a new record to process. */
1489 tmunro@postgresql.or 522 :CBC 2946242 : prefetcher->record = record;
523 : 2946242 : prefetcher->next_block_id = 0;
524 : : }
525 : : else
526 : : {
527 : : /* Continue to process from last call, or last loop. */
528 : 3029834 : record = prefetcher->record;
529 : : }
530 : :
531 : : /*
532 : : * Check for operations that require us to filter out block ranges, or
533 : : * pause readahead completely.
534 : : */
535 [ + - ]: 5976076 : if (replaying_lsn < record->lsn)
536 : : {
537 : 5976076 : uint8 rmid = record->header.xl_rmid;
538 : 5976076 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
539 : :
540 [ + + ]: 5976076 : if (rmid == RM_XLOG_ID)
541 : : {
542 [ + + + + ]: 167394 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
543 : : record_type == XLOG_END_OF_RECOVERY)
544 : : {
545 : : /*
546 : : * These records might change the TLI. Avoid potential
547 : : * bugs if we were to allow "read TLI" and "replay TLI" to
548 : : * differ without more analysis.
549 : : */
550 : 1871 : prefetcher->no_readahead_until = record->lsn;
551 : :
552 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
553 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
554 : : "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
555 : : LSN_FORMAT_ARGS(record->lsn));
556 : : #endif
557 : :
558 : : /* Fall through so we move past this record. */
559 : : }
560 : : }
561 [ + + ]: 5808682 : else if (rmid == RM_DBASE_ID)
562 : : {
563 : : /*
564 : : * When databases are created with the file-copy strategy,
565 : : * there are no WAL records to tell us about the creation of
566 : : * individual relations.
567 : : */
568 [ + + ]: 42 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
569 : : {
570 : 5 : xl_dbase_create_file_copy_rec *xlrec =
571 : : (xl_dbase_create_file_copy_rec *) record->main_data;
1399 rhaas@postgresql.org 572 : 5 : RelFileLocator rlocator =
573 : 5 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
574 : :
575 : : /*
576 : : * Don't try to prefetch anything in this database until
577 : : * it has been created, or we might confuse the blocks of
578 : : * different generations, if a database OID or
579 : : * relfilenumber is reused. It's also more efficient than
580 : : * discovering that relations don't exist on disk yet with
581 : : * ENOENT errors.
582 : : */
583 : 5 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
584 : :
585 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
586 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
587 : : "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
588 : : rlocator.dbOid,
589 : : LSN_FORMAT_ARGS(record->lsn));
590 : : #endif
591 : : }
592 : : }
1489 tmunro@postgresql.or 593 [ + + ]: 5808640 : else if (rmid == RM_SMGR_ID)
594 : : {
595 [ + + ]: 17770 : if (record_type == XLOG_SMGR_CREATE)
596 : : {
597 : 17714 : xl_smgr_create *xlrec = (xl_smgr_create *)
598 : : record->main_data;
599 : :
600 [ + + ]: 17714 : if (xlrec->forkNum == MAIN_FORKNUM)
601 : : {
602 : : /*
603 : : * Don't prefetch anything for this whole relation
604 : : * until it has been created. Otherwise we might
605 : : * confuse the blocks of different generations, if a
606 : : * relfilenumber is reused. This also avoids the need
607 : : * to discover the problem via extra syscalls that
608 : : * report ENOENT.
609 : : */
1399 rhaas@postgresql.org 610 : 15916 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
611 : : record->lsn);
612 : :
613 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
614 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
615 : : "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
616 : : xlrec->rlocator.spcOid,
617 : : xlrec->rlocator.dbOid,
618 : : xlrec->rlocator.relNumber,
619 : : LSN_FORMAT_ARGS(record->lsn));
620 : : #endif
621 : : }
622 : : }
1489 tmunro@postgresql.or 623 [ + - ]: 56 : else if (record_type == XLOG_SMGR_TRUNCATE)
624 : : {
625 : 56 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
626 : : record->main_data;
627 : :
628 : : /*
629 : : * Don't consider prefetching anything in the truncated
630 : : * range until the truncation has been performed.
631 : : */
1399 rhaas@postgresql.org 632 : 56 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
633 : : xlrec->blkno,
634 : : record->lsn);
635 : :
636 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
637 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
638 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
639 : : xlrec->rlocator.spcOid,
640 : : xlrec->rlocator.dbOid,
641 : : xlrec->rlocator.relNumber,
642 : : xlrec->blkno,
643 : : LSN_FORMAT_ARGS(record->lsn));
644 : : #endif
645 : : }
646 : : }
647 : : }
648 : :
649 : : /* Scan the block references, starting where we left off last time. */
1489 tmunro@postgresql.or 650 [ + + ]: 5978585 : while (prefetcher->next_block_id <= record->max_block_id)
651 : : {
652 : 3032343 : int block_id = prefetcher->next_block_id++;
653 : 3032343 : DecodedBkpBlock *block = &record->blocks[block_id];
654 : : SMgrRelation reln;
655 : : PrefetchBufferResult result;
656 : :
657 [ + + ]: 3032343 : if (!block->in_use)
658 : 2256 : continue;
659 : :
1351 john.naylor@postgres 660 [ - + ]: 3030087 : Assert(!BufferIsValid(block->prefetch_buffer));
661 : :
662 : : /*
663 : : * Record the LSN of this record. When it's replayed,
664 : : * LsnReadQueue will consider any IOs submitted for earlier LSNs
665 : : * to be finished.
666 : : */
1489 tmunro@postgresql.or 667 : 3030087 : *lsn = record->lsn;
668 : :
669 : : /* We don't try to prefetch anything but the main fork for now. */
670 [ + + ]: 3030087 : if (block->forknum != MAIN_FORKNUM)
671 : : {
672 : 3029834 : return LRQ_NEXT_NO_IO;
673 : : }
674 : :
675 : : /*
676 : : * If there is a full page image attached, we won't be reading the
677 : : * page, so don't bother trying to prefetch.
678 : : */
679 [ + + ]: 3013607 : if (block->has_image)
680 : : {
681 : 2475465 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
682 : 2475465 : return LRQ_NEXT_NO_IO;
683 : : }
684 : :
685 : : /* There is no point in reading a page that will be zeroed. */
686 [ + + ]: 538142 : if (block->flags & BKPBLOCK_WILL_INIT)
687 : : {
688 : 6479 : XLogPrefetchIncrement(&SharedStats->skip_init);
689 : 6479 : return LRQ_NEXT_NO_IO;
690 : : }
691 : :
692 : : /* Should we skip prefetching this block due to a filter? */
1399 rhaas@postgresql.org 693 [ + + ]: 531663 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
694 : : {
1489 tmunro@postgresql.or 695 : 74617 : XLogPrefetchIncrement(&SharedStats->skip_new);
696 : 74617 : return LRQ_NEXT_NO_IO;
697 : : }
698 : :
699 : : /* There is no point in repeatedly prefetching the same block. */
700 [ + + ]: 1178400 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
701 : : {
702 [ + + ]: 1164825 : if (block->blkno == prefetcher->recent_block[i] &&
1399 rhaas@postgresql.org 703 [ + + + + : 461640 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
+ - ]
704 : : {
705 : : /*
706 : : * XXX If we also remembered where it was, we could set
707 : : * recent_buffer so that recovery could skip smgropen()
708 : : * and a buffer table lookup.
709 : : */
1489 tmunro@postgresql.or 710 : 443471 : XLogPrefetchIncrement(&SharedStats->skip_rep);
711 : 443471 : return LRQ_NEXT_NO_IO;
712 : : }
713 : : }
1399 rhaas@postgresql.org 714 : 13575 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
1489 tmunro@postgresql.or 715 : 13575 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
716 : 13575 : prefetcher->recent_idx =
717 : 13575 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
718 : :
719 : : /*
720 : : * We could try to have a fast path for repeated references to the
721 : : * same relation (with some scheme to handle invalidations
722 : : * safely), but for now we'll call smgropen() every time.
723 : : */
793 heikki.linnakangas@i 724 : 13575 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
725 : :
726 : : /*
727 : : * If the relation file doesn't exist on disk, for example because
728 : : * we're replaying after a crash and the file will be created and
729 : : * then unlinked by WAL that hasn't been replayed yet, suppress
730 : : * further prefetching in the relation until this record is
731 : : * replayed.
732 : : */
1489 tmunro@postgresql.or 733 [ + + ]: 13575 : if (!smgrexists(reln, MAIN_FORKNUM))
734 : : {
735 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
736 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
737 : : "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
738 : : reln->smgr_rlocator.locator.spcOid,
739 : : reln->smgr_rlocator.locator.dbOid,
740 : : reln->smgr_rlocator.locator.relNumber,
741 : : LSN_FORMAT_ARGS(record->lsn));
742 : : #endif
1399 rhaas@postgresql.org 743 : 6 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
744 : : record->lsn);
1489 tmunro@postgresql.or 745 : 6 : XLogPrefetchIncrement(&SharedStats->skip_new);
746 : 6 : return LRQ_NEXT_NO_IO;
747 : : }
748 : :
749 : : /*
750 : : * If the relation isn't big enough to contain the referenced
751 : : * block yet, suppress prefetching of this block and higher until
752 : : * this record is replayed.
753 : : */
754 [ + + ]: 13569 : if (block->blkno >= smgrnblocks(reln, block->forknum))
755 : : {
756 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
757 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
758 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
759 : : reln->smgr_rlocator.locator.spcOid,
760 : : reln->smgr_rlocator.locator.dbOid,
761 : : reln->smgr_rlocator.locator.relNumber,
762 : : block->blkno,
763 : : LSN_FORMAT_ARGS(record->lsn));
764 : : #endif
1399 rhaas@postgresql.org 765 : 1466 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
766 : : record->lsn);
1489 tmunro@postgresql.or 767 : 1466 : XLogPrefetchIncrement(&SharedStats->skip_new);
768 : 1466 : return LRQ_NEXT_NO_IO;
769 : : }
770 : :
771 : : /* Try to initiate prefetching. */
772 : 12103 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
773 [ + + ]: 12103 : if (BufferIsValid(result.recent_buffer))
774 : : {
775 : : /* Cache hit, nothing to do. */
776 : 4727 : XLogPrefetchIncrement(&SharedStats->hit);
777 : 4727 : block->prefetch_buffer = result.recent_buffer;
778 : 4727 : return LRQ_NEXT_NO_IO;
779 : : }
780 [ + + ]: 7376 : else if (result.initiated_io)
781 : : {
782 : : /* Cache miss, I/O (presumably) started. */
783 : 7123 : XLogPrefetchIncrement(&SharedStats->prefetch);
784 : 7123 : block->prefetch_buffer = InvalidBuffer;
785 : 7123 : return LRQ_NEXT_IO;
786 : : }
1123 787 [ - + ]: 253 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
788 : : {
789 : : /*
790 : : * This shouldn't be possible, because we already determined
791 : : * that the relation exists on disk and is big enough.
792 : : * Something is wrong with the cache invalidation for
793 : : * smgrexists(), smgrnblocks(), or the file was unlinked or
794 : : * truncated beneath our feet?
795 : : */
1489 tmunro@postgresql.or 796 [ # # ]:UBC 0 : elog(ERROR,
797 : : "could not prefetch relation %u/%u/%u block %u",
798 : : reln->smgr_rlocator.locator.spcOid,
799 : : reln->smgr_rlocator.locator.dbOid,
800 : : reln->smgr_rlocator.locator.relNumber,
801 : : block->blkno);
802 : : }
803 : : }
804 : :
805 : : /*
806 : : * Several callsites need to be able to read exactly one record
807 : : * without any internal readahead. Examples: xlog.c reading
808 : : * checkpoint records with emode set to PANIC, which might otherwise
809 : : * cause XLogPageRead() to panic on some future page, and xlog.c
810 : : * determining where to start writing WAL next, which depends on the
811 : : * contents of the reader's internal buffer after reading one record.
812 : : * Therefore, don't even think about prefetching until the first
813 : : * record after XLogPrefetcherBeginRead() has been consumed.
814 : : */
1489 tmunro@postgresql.or 815 [ + - ]:CBC 2946242 : if (prefetcher->reader->decode_queue_tail &&
816 [ + + ]: 2946242 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
817 : 2349 : return LRQ_NEXT_AGAIN;
818 : :
819 : : /* Advance to the next record. */
820 : 2943893 : prefetcher->record = NULL;
821 : : }
822 : : pg_unreachable();
823 : : }
824 : :
825 : : /*
826 : : * Expose statistics about recovery prefetching.
827 : : */
828 : : Datum
829 : 8 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
830 : : {
831 : : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
832 : 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
833 : : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
834 : : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
835 : :
1295 michael@paquier.xyz 836 : 8 : InitMaterializedSRF(fcinfo, 0);
837 : :
1489 tmunro@postgresql.or 838 [ + + ]: 88 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
839 : 80 : nulls[i] = false;
840 : :
841 : 8 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
842 : 8 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
843 : 8 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
844 : 8 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
845 : 8 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
846 : 8 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
847 : 8 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
848 : 8 : values[7] = Int32GetDatum(SharedStats->wal_distance);
849 : 8 : values[8] = Int32GetDatum(SharedStats->block_distance);
850 : 8 : values[9] = Int32GetDatum(SharedStats->io_depth);
851 : 8 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
852 : :
853 : 8 : return (Datum) 0;
854 : : }
855 : :
856 : : /*
857 : : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
858 : : * has been replayed.
859 : : */
860 : : static inline void
1399 rhaas@postgresql.org 861 : 17449 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
862 : : BlockNumber blockno, XLogRecPtr lsn)
863 : : {
864 : : XLogPrefetcherFilter *filter;
865 : : bool found;
866 : :
867 : 17449 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
1489 tmunro@postgresql.or 868 [ + + ]: 17449 : if (!found)
869 : : {
870 : : /*
871 : : * Don't allow any prefetching of this block or higher until replayed.
872 : : */
873 : 17440 : filter->filter_until_replayed = lsn;
874 : 17440 : filter->filter_from_block = blockno;
875 : 17440 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
876 : : }
877 : : else
878 : : {
879 : : /*
880 : : * We were already filtering this rlocator. Extend the filter's
881 : : * lifetime to cover this WAL record, but leave the lower of the block
882 : : * numbers there because we don't want to have to track individual
883 : : * blocks.
884 : : */
885 : 9 : filter->filter_until_replayed = lsn;
886 : 9 : dlist_delete(&filter->link);
887 : 9 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
888 : 9 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
889 : : }
890 : 17449 : }
891 : :
892 : : /*
893 : : * Have we replayed any records that caused us to begin filtering a block
894 : : * range? That means that relations should have been created, extended or
895 : : * dropped as required, so we can stop filtering out accesses to a given
896 : : * relfilenumber.
897 : : */
898 : : static inline void
899 : 2946534 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
900 : : {
901 [ + + ]: 2963973 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
902 : : {
903 : 438158 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
904 : : link,
905 : : &prefetcher->filter_queue);
906 : :
907 [ + + ]: 438158 : if (filter->filter_until_replayed >= replaying_lsn)
908 : 420719 : break;
909 : :
910 : 17439 : dlist_delete(&filter->link);
911 : 17439 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
912 : : }
913 : 2946534 : }
914 : :
915 : : /*
916 : : * Check if a given block should be skipped due to a filter.
917 : : */
918 : : static inline bool
1399 rhaas@postgresql.org 919 : 531663 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
920 : : BlockNumber blockno)
921 : : {
922 : : /*
923 : : * Test for empty queue first, because we expect it to be empty most of
924 : : * the time and we can avoid the hash table lookup in that case.
925 : : */
1489 tmunro@postgresql.or 926 [ + + ]: 531663 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
927 : : {
928 : : XLogPrefetcherFilter *filter;
929 : :
930 : : /* See if the block range is filtered. */
1399 rhaas@postgresql.org 931 : 95647 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1489 tmunro@postgresql.or 932 [ + + + + ]: 95647 : if (filter && filter->filter_from_block <= blockno)
933 : : {
934 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
935 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
936 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
937 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
938 : : LSN_FORMAT_ARGS(filter->filter_until_replayed),
939 : : filter->filter_from_block);
940 : : #endif
941 : 74617 : return true;
942 : : }
943 : :
944 : : /* See if the whole database is filtered. */
1399 rhaas@postgresql.org 945 : 21030 : rlocator.relNumber = InvalidRelFileNumber;
946 : 21030 : rlocator.spcOid = InvalidOid;
947 : 21030 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1489 tmunro@postgresql.or 948 [ - + ]: 21030 : if (filter)
949 : : {
950 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
951 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
952 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
953 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
954 : : LSN_FORMAT_ARGS(filter->filter_until_replayed));
955 : : #endif
1489 tmunro@postgresql.or 956 :UBC 0 : return true;
957 : : }
958 : : }
959 : :
1489 tmunro@postgresql.or 960 :CBC 457046 : return false;
961 : : }
962 : :
963 : : /*
964 : : * A wrapper for XLogBeginRead() that also resets the prefetcher.
965 : : */
966 : : void
967 : 2351 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
968 : : {
969 : : /* This will forget about any in-flight IO. */
970 : 2351 : prefetcher->reconfigure_count--;
971 : :
972 : : /* Book-keeping to avoid readahead on first read. */
973 : 2351 : prefetcher->begin_ptr = recPtr;
974 : :
96 alvherre@kurilemu.de 975 :GNC 2351 : prefetcher->no_readahead_until = InvalidXLogRecPtr;
976 : :
977 : : /* This will forget about any queued up records in the decoder. */
1489 tmunro@postgresql.or 978 :CBC 2351 : XLogBeginRead(prefetcher->reader, recPtr);
979 : 2351 : }
980 : :
981 : : /*
982 : : * A wrapper for XLogReadRecord() that provides the same interface, but also
983 : : * tries to initiate I/O for blocks referenced in future WAL records.
984 : : */
985 : : XLogRecord *
986 : 2946534 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
987 : : {
988 : : DecodedXLogRecord *record;
989 : : XLogRecPtr replayed_up_to;
990 : :
991 : : /*
992 : : * See if it's time to reset the prefetching machinery, because a relevant
993 : : * GUC was changed.
994 : : */
995 [ + + ]: 2946534 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
996 : : {
997 : : uint32 max_distance;
998 : : uint32 max_inflight;
999 : :
1000 [ + + ]: 2364 : if (prefetcher->streaming_read)
1001 : 1283 : lrq_free(prefetcher->streaming_read);
1002 : :
1003 [ + - + - ]: 2364 : if (RecoveryPrefetchEnabled())
1004 : : {
1335 1005 [ - + ]: 2364 : Assert(maintenance_io_concurrency > 0);
1006 : 2364 : max_inflight = maintenance_io_concurrency;
1489 1007 : 2364 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1008 : : }
1009 : : else
1010 : : {
1489 tmunro@postgresql.or 1011 :UBC 0 : max_inflight = 1;
1012 : 0 : max_distance = 1;
1013 : : }
1014 : :
1489 tmunro@postgresql.or 1015 :CBC 2364 : prefetcher->streaming_read = lrq_alloc(max_distance,
1016 : : max_inflight,
1017 : : (uintptr_t) prefetcher,
1018 : : XLogPrefetcherNextBlock);
1019 : :
1020 : 2364 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1021 : : }
1022 : :
1023 : : /*
1024 : : * Release last returned record, if there is one, as it's now been
1025 : : * replayed.
1026 : : */
1335 1027 : 2946534 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1028 : :
1029 : : /*
1030 : : * Can we drop any filters yet? If we were waiting for a relation to be
1031 : : * created or extended, it is now OK to access blocks in the covered
1032 : : * range.
1033 : : */
1034 : 2946534 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1035 : :
1036 : : /*
1037 : : * All IO initiated by earlier WAL is now completed. This might trigger
1038 : : * further prefetching.
1039 : : */
1040 : 2946534 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1041 : :
1042 : : /*
1043 : : * If there's nothing queued yet, then start prefetching to cause at least
1044 : : * one record to be queued.
1045 : : */
1489 1046 [ + + ]: 2946470 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1047 : : {
1335 1048 [ - + ]: 24 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1049 [ - + ]: 24 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
1489 1050 : 24 : lrq_prefetch(prefetcher->streaming_read);
1051 : : }
1052 : :
1053 : : /* Read the next record. */
1054 : 2946470 : record = XLogNextRecord(prefetcher->reader, errmsg);
1055 [ + + ]: 2946470 : if (!record)
1056 : 300 : return NULL;
1057 : :
1058 : : /*
1059 : : * The record we just got is the "current" one, for the benefit of the
1060 : : * XLogRecXXX() macros.
1061 : : */
1062 [ - + ]: 2946170 : Assert(record == prefetcher->reader->record);
1063 : :
1064 : : /*
1065 : : * If maintenance_io_concurrency is set very low, we might have started
1066 : : * prefetching some but not all of the blocks referenced in the record
1067 : : * we're about to return. Forget about the rest of the blocks in this
1068 : : * record by dropping the prefetcher's reference to it.
1069 : : */
1335 1070 [ + + ]: 2946170 : if (record == prefetcher->record)
1071 : 2349 : prefetcher->record = NULL;
1072 : :
1073 : : /*
1074 : : * See if it's time to compute some statistics, because enough WAL has
1075 : : * been processed.
1076 : : */
1489 1077 [ + + ]: 2946170 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1078 : 1087400 : XLogPrefetcherComputeStats(prefetcher);
1079 : :
1080 [ - + ]: 2946170 : Assert(record == prefetcher->reader->record);
1081 : :
1082 : 2946170 : return &record->header;
1083 : : }
1084 : :
1085 : : bool
1086 : 1286 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1087 : : {
1088 : : #ifndef USE_PREFETCH
1089 : : if (*new_value == RECOVERY_PREFETCH_ON)
1090 : : {
1091 : : GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1092 : : return false;
1093 : : }
1094 : : #endif
1095 : :
1096 : 1286 : return true;
1097 : : }
1098 : :
1099 : : void
1100 : 1286 : assign_recovery_prefetch(int new_value, void *extra)
1101 : : {
1102 : : /* Reconfigure prefetching, because a setting it depends on changed. */
1103 : 1286 : recovery_prefetch = new_value;
1104 [ - + ]: 1286 : if (AmStartupProcess())
1489 tmunro@postgresql.or 1105 :UBC 0 : XLogPrefetchReconfigure();
1489 tmunro@postgresql.or 1106 :CBC 1286 : }
|