Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogwait.c
4 : : * Implements waiting for WAL operations to reach specific LSNs.
5 : : *
6 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/access/transam/xlogwait.c
10 : : *
11 : : * NOTES
12 : : * This file implements waiting for WAL operations to reach specific LSNs
13 : : * on both physical standby and primary servers. The core idea is simple:
14 : : * every process that wants to wait publishes the LSN it needs to the
15 : : * shared memory, and the appropriate process (startup on standby,
16 : : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : : * once that LSN has been reached.
18 : : *
19 : : * The shared memory used by this module comprises a procInfos
20 : : * per-backend array with the information of the awaited LSN for each
21 : : * of the backend processes. The elements of that array are organized
22 : : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : : * allows for very fast finding of the least awaited LSN for each type.
24 : : *
25 : : * In addition, the least-awaited LSN for each type is cached in the
26 : : * minWaitedLSN array. The waiter process publishes information about
27 : : * itself to the shared memory and waits on the latch until it is woken
28 : : * up by the appropriate process, standby is promoted, or the postmaster
29 : : * dies. Then, it cleans information about itself in the shared memory.
30 : : *
31 : : * On standby servers:
32 : : * - After replaying a WAL record, the startup process performs a fast
33 : : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : : * whose awaited LSNs are reached.
36 : : * - After receiving WAL, the walreceiver process performs similar checks
37 : : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : : * and WRITE heaps, respectively.
39 : : *
40 : : * On primary servers: After flushing WAL, the WAL writer or backend
41 : : * process performs a similar check against the flush LSN and wakes up
42 : : * waiters whose target flush LSNs have been reached.
43 : : *
44 : : *-------------------------------------------------------------------------
45 : : */
46 : :
47 : : #include "postgres.h"
48 : :
49 : : #include <float.h>
50 : :
51 : : #include "access/xlog.h"
52 : : #include "access/xlogrecovery.h"
53 : : #include "access/xlogwait.h"
54 : : #include "miscadmin.h"
55 : : #include "pgstat.h"
56 : : #include "replication/walreceiver.h"
57 : : #include "storage/latch.h"
58 : : #include "storage/proc.h"
59 : : #include "storage/shmem.h"
60 : : #include "storage/subsystems.h"
61 : : #include "utils/fmgrprotos.h"
62 : : #include "utils/pg_lsn.h"
63 : : #include "utils/snapmgr.h"
64 : : #include "utils/wait_event.h"
65 : :
66 : :
67 : : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
68 : : void *arg);
69 : :
70 : : struct WaitLSNState *waitLSNState = NULL;
71 : :
72 : : static void WaitLSNShmemRequest(void *arg);
73 : : static void WaitLSNShmemInit(void *arg);
74 : :
75 : : const ShmemCallbacks WaitLSNShmemCallbacks = {
76 : : .request_fn = WaitLSNShmemRequest,
77 : : .init_fn = WaitLSNShmemInit,
78 : : };
79 : :
80 : : /*
81 : : * Wait event for each WaitLSNType, used with WaitLatch() to report
82 : : * the wait in pg_stat_activity.
83 : : */
84 : : static const uint32 WaitLSNWaitEvents[] = {
85 : : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
86 : : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
87 : : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
88 : : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
89 : : };
90 : :
91 : : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
92 : : "WaitLSNWaitEvents must match WaitLSNType enum");
93 : :
94 : : /*
95 : : * Get the current LSN for the specified wait type. Provide memory
96 : : * barrier semantics before getting the value.
97 : : */
98 : : XLogRecPtr
120 akorotkov@postgresql 99 :GNC 328 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
100 : : {
101 [ - + ]: 328 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
102 : :
103 : : /*
104 : : * All of the cases below provide memory barrier semantics:
105 : : * GetWalRcvWriteRecPtr() and GetFlushRecPtr() have explicit barriers,
106 : : * while GetXLogReplayRecPtr() and GetWalRcvFlushRecPtr() use spinlocks.
107 : : */
108 [ + + + + : 328 : switch (lsnType)
- ]
109 : : {
110 : 212 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
111 : 212 : return GetXLogReplayRecPtr(NULL);
112 : :
113 : 47 : case WAIT_LSN_TYPE_STANDBY_WRITE:
114 : : {
2 115 : 47 : XLogRecPtr recptr = GetWalRcvWriteRecPtr();
116 : 47 : XLogRecPtr replay = GetXLogReplayRecPtr(NULL);
117 : :
118 : : /*
119 : : * Use the replay position as a floor. WAL up to the replay
120 : : * point is already on disk from a base backup, archive
121 : : * restore, or prior streaming, so there is no reason to wait
122 : : * for the walreceiver to re-receive it.
123 : : */
124 : 47 : return Max(recptr, replay);
125 : : }
126 : :
120 127 : 35 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
128 : : {
2 129 : 35 : XLogRecPtr recptr = GetWalRcvFlushRecPtr(NULL, NULL);
130 : 35 : XLogRecPtr replay = GetXLogReplayRecPtr(NULL);
131 : :
132 : : /* Same floor as standby_write; see comment above. */
133 : 35 : return Max(recptr, replay);
134 : : }
135 : :
120 136 : 34 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
137 : 34 : return GetFlushRecPtr(NULL);
138 : : }
139 : :
120 akorotkov@postgresql 140 [ # # ]:UNC 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
141 : : pg_unreachable();
142 : : }
143 : :
144 : : /* Register the shared memory space needed for WaitLSNState. */
145 : : static void
29 heikki.linnakangas@i 146 :GNC 1244 : WaitLSNShmemRequest(void *arg)
147 : : {
148 : : Size size;
149 : :
183 akorotkov@postgresql 150 : 1244 : size = offsetof(WaitLSNState, procInfos);
151 : 1244 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
29 heikki.linnakangas@i 152 : 1244 : ShmemRequestStruct(.name = "WaitLSNState",
153 : : .size = size,
154 : : .ptr = (void **) &waitLSNState,
155 : : );
183 akorotkov@postgresql 156 : 1244 : }
157 : :
158 : : /* Initialize the WaitLSNState in the shared memory. */
159 : : static void
29 heikki.linnakangas@i 160 : 1241 : WaitLSNShmemInit(void *arg)
161 : : {
162 : : /* Initialize heaps and tracking */
163 [ + + ]: 6205 : for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
164 : : {
165 : 4964 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
166 : 4964 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
167 : : }
168 : :
169 : : /* Initialize process info array */
170 : 1241 : memset(&waitLSNState->procInfos, 0,
171 : 1241 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
183 akorotkov@postgresql 172 : 1241 : }
173 : :
174 : : /*
175 : : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
176 : : * LSN, so that the waiter with smallest LSN is at the top.
177 : : */
178 : : static int
179 : 28 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
180 : : {
168 181 : 28 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
182 : 28 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
183 : :
183 184 [ + + ]: 28 : if (aproc->waitLSN < bproc->waitLSN)
185 : 15 : return 1;
186 [ + + ]: 13 : else if (aproc->waitLSN > bproc->waitLSN)
187 : 10 : return -1;
188 : : else
189 : 3 : return 0;
190 : : }
191 : :
192 : : /*
193 : : * Update minimum waited LSN for the specified LSN type
194 : : */
195 : : static void
196 : 3501 : updateMinWaitedLSN(WaitLSNType lsnType)
197 : : {
198 : 3501 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
199 : 3501 : int i = (int) lsnType;
200 : :
142 201 [ + - - + ]: 3501 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
202 : :
183 203 [ + + ]: 3501 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
204 : : {
205 : 248 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
168 206 : 248 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
207 : :
183 208 : 248 : minWaitedLSN = procInfo->waitLSN;
209 : : }
210 : : /* Pairs with pg_atomic_read_membarrier_u64() in WaitLSNWakeup(). */
2 211 : 3501 : pg_atomic_write_membarrier_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
183 212 : 3501 : }
213 : :
214 : : /*
215 : : * Add current process to appropriate waiters heap based on LSN type
216 : : */
217 : : static void
218 : 243 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
219 : : {
220 : 243 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
221 : 243 : int i = (int) lsnType;
222 : :
142 223 [ + - - + ]: 243 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
224 : :
183 225 : 243 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
226 : :
227 : 243 : procInfo->procno = MyProcNumber;
228 : 243 : procInfo->waitLSN = lsn;
168 229 : 243 : procInfo->lsnType = lsnType;
230 : :
231 [ - + ]: 243 : Assert(!procInfo->inHeap);
232 : 243 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
233 : 243 : procInfo->inHeap = true;
183 234 : 243 : updateMinWaitedLSN(lsnType);
235 : :
236 : 243 : LWLockRelease(WaitLSNLock);
237 : 243 : }
238 : :
239 : : /*
240 : : * Remove current process from appropriate waiters heap based on LSN type
241 : : */
242 : : static void
243 : 243 : deleteLSNWaiter(WaitLSNType lsnType)
244 : : {
245 : 243 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
246 : 243 : int i = (int) lsnType;
247 : :
142 248 [ + - - + ]: 243 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
249 : :
183 250 : 243 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
251 : :
168 252 [ - + ]: 243 : Assert(procInfo->lsnType == lsnType);
253 : :
254 [ + + ]: 243 : if (procInfo->inHeap)
255 : : {
256 : 197 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
257 : 197 : procInfo->inHeap = false;
183 258 : 197 : updateMinWaitedLSN(lsnType);
259 : : }
260 : :
261 : 243 : LWLockRelease(WaitLSNLock);
262 : 243 : }
263 : :
264 : : /*
265 : : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
266 : : * on the stack. It should be enough to take single iteration for most cases.
267 : : */
268 : : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
269 : :
270 : : /*
271 : : * Remove waiters whose LSN has been reached from the heap and set their
272 : : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
273 : : * and set latches for all waiters.
274 : : *
275 : : * This function first accumulates waiters to wake up into an array, then
276 : : * wakes them up without holding a WaitLSNLock. The array size is static and
277 : : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
278 : : * to wake up all the waiters at once in the vast majority of cases. However,
279 : : * if there are more waiters, this function will loop to process them in
280 : : * multiple chunks.
281 : : */
282 : : static void
283 : 3061 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
284 : : {
285 : : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
286 : : int numWakeUpProcs;
287 : 3061 : int i = (int) lsnType;
288 : :
142 289 [ + - - + ]: 3061 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
290 : :
291 : : do
292 : : {
293 : : int j;
294 : :
183 295 : 3061 : numWakeUpProcs = 0;
296 : 3061 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
297 : :
298 : : /*
299 : : * Iterate the waiters heap until we find LSN not yet reached. Record
300 : : * process numbers to wake up, but send wakeups after releasing lock.
301 : : */
302 [ + + ]: 3106 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
303 : : {
304 : 50 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
305 : : WaitLSNProcInfo *procInfo;
306 : :
307 : : /* Get procInfo using appropriate heap node */
168 308 : 50 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
309 : :
180 alvherre@kurilemu.de 310 [ + + + + ]: 50 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
183 akorotkov@postgresql 311 : 5 : break;
312 : :
313 [ - + ]: 45 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
314 : 45 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
315 : 45 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
316 : :
317 : : /* Update appropriate flag */
168 318 : 45 : procInfo->inHeap = false;
319 : :
183 320 [ - + ]: 45 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
183 akorotkov@postgresql 321 :UNC 0 : break;
322 : : }
323 : :
183 akorotkov@postgresql 324 :GNC 3061 : updateMinWaitedLSN(lsnType);
325 : 3061 : LWLockRelease(WaitLSNLock);
326 : :
327 : : /*
328 : : * Set latches for processes whose waited LSNs have been reached.
329 : : * Since SetLatch() is a time-consuming operation, we do this outside
330 : : * of WaitLSNLock. This is safe because procLatch is never freed, so
331 : : * at worst we may set a latch for the wrong process or for no process
332 : : * at all, which is harmless.
333 : : */
119 334 [ + + ]: 3106 : for (j = 0; j < numWakeUpProcs; j++)
335 : 45 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
336 : :
183 337 [ - + ]: 3061 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
338 : 3061 : }
339 : :
340 : : /*
341 : : * Wake up processes waiting for LSN to reach currentLSN
342 : : */
343 : : void
344 : 9168176 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
345 : : {
346 : 9168176 : int i = (int) lsnType;
347 : :
142 348 [ + - - + ]: 9168176 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
349 : :
350 : : /*
351 : : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
352 : : * "wake all waiters" (e.g., during promotion when recovery ends). Pairs
353 : : * with pg_atomic_write_membarrier_u64() in updateMinWaitedLSN().
354 : : */
171 355 [ + + + + ]: 18333322 : if (XLogRecPtrIsValid(currentLSN) &&
2 356 : 9165146 : pg_atomic_read_membarrier_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
183 357 : 9165115 : return;
358 : :
359 : 3061 : wakeupWaiters(lsnType, currentLSN);
360 : : }
361 : :
362 : : /*
363 : : * Clean up LSN waiters for exiting process
364 : : */
365 : : void
366 : 53757 : WaitLSNCleanup(void)
367 : : {
368 [ + - ]: 53757 : if (waitLSNState)
369 : : {
370 : : /*
371 : : * We do a fast-path check of the inHeap flag without the lock. This
372 : : * flag is set to true only by the process itself. So, it's only
373 : : * possible to get a false positive. But that will be eliminated by a
374 : : * recheck inside deleteLSNWaiter().
375 : : */
168 376 [ - + ]: 53757 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
168 akorotkov@postgresql 377 :UNC 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
378 : : }
183 akorotkov@postgresql 379 :GNC 53757 : }
380 : :
381 : : /*
382 : : * Check if the given LSN type requires recovery to be in progress.
383 : : * Standby wait types (replay, write, flush) require recovery;
384 : : * primary wait types (flush) do not.
385 : : */
386 : : static inline bool
120 387 : 324 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
388 : : {
389 [ + + ]: 114 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
390 [ + + + + ]: 438 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
391 : : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
392 : : }
393 : :
394 : : /*
395 : : * Wait using MyLatch till the given LSN is reached, the replica gets
396 : : * promoted, or the postmaster dies.
397 : : *
398 : : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
399 : : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
400 : : * or replica got promoted before the target LSN reached.
401 : : */
402 : : WaitLSNResult
183 403 : 243 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
404 : : {
405 : : XLogRecPtr currentLSN;
406 : 243 : TimestampTz endtime = 0;
407 : 243 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
408 : :
409 : : /* Shouldn't be called when shmem isn't initialized */
410 [ - + ]: 243 : Assert(waitLSNState);
411 : :
412 : : /* Should have a valid proc number */
152 413 [ + - - + ]: 243 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
414 : :
183 415 [ + + ]: 243 : if (timeout > 0)
416 : : {
417 : 232 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
418 : 232 : wake_events |= WL_TIMEOUT;
419 : : }
420 : :
421 : : /*
422 : : * Add our process to the waiters heap. It might happen that target LSN
423 : : * gets reached before we do. The check at the beginning of the loop
424 : : * below prevents the race condition.
425 : : */
426 : 243 : addLSNWaiter(targetLSN, lsnType);
427 : :
428 : : for (;;)
429 : 81 : {
430 : : int rc;
431 : 324 : long delay_ms = -1;
432 : :
433 : : /* Get current LSN for the wait type */
120 434 : 324 : currentLSN = GetCurrentLSNForWaitType(lsnType);
435 : :
436 : : /* Check that recovery is still in-progress */
437 [ + + + + ]: 324 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
438 : : {
439 : : /*
440 : : * Recovery was ended, but check if target LSN was already
441 : : * reached.
442 : : */
183 443 : 6 : deleteLSNWaiter(lsnType);
444 : :
445 [ + + + + ]: 6 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
446 : 1 : return WAIT_LSN_RESULT_SUCCESS;
447 : 5 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
448 : : }
449 : : else
450 : : {
451 : : /* Check if the waited LSN has been reached */
452 [ + + ]: 318 : if (targetLSN <= currentLSN)
453 : 230 : break;
454 : : }
455 : :
456 [ + + ]: 88 : if (timeout > 0)
457 : : {
458 : 75 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
459 [ + + ]: 75 : if (delay_ms <= 0)
460 : 7 : break;
461 : : }
462 : :
463 [ + + ]: 81 : CHECK_FOR_INTERRUPTS();
464 : :
465 : 81 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
120 466 : 81 : WaitLSNWaitEvents[lsnType]);
467 : :
468 : : /*
469 : : * Emergency bailout if postmaster has died. This is to avoid the
470 : : * necessity for manual cleanup of all postmaster children.
471 : : */
183 472 [ - + ]: 81 : if (rc & WL_POSTMASTER_DEATH)
183 akorotkov@postgresql 473 [ # # ]:UNC 0 : ereport(FATAL,
474 : : errcode(ERRCODE_ADMIN_SHUTDOWN),
475 : : errmsg("terminating connection due to unexpected postmaster exit"),
476 : : errcontext("while waiting for LSN"));
477 : :
2 akorotkov@postgresql 478 :GNC 81 : ResetLatch(MyLatch);
479 : : }
480 : :
481 : : /*
482 : : * Delete our process from the shared memory heap. We might already be
483 : : * deleted by the startup process. The 'inHeap' flags prevents us from
484 : : * the double deletion.
485 : : */
183 486 : 237 : deleteLSNWaiter(lsnType);
487 : :
488 : : /*
489 : : * If we didn't reach the target LSN, we must be exited by timeout.
490 : : */
491 [ + + ]: 237 : if (targetLSN > currentLSN)
492 : 7 : return WAIT_LSN_RESULT_TIMEOUT;
493 : :
494 : 230 : return WAIT_LSN_RESULT_SUCCESS;
495 : : }
|