Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogwait.c
4 : : * Implements waiting for WAL operations to reach specific LSNs.
5 : : *
6 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/access/transam/xlogwait.c
10 : : *
11 : : * NOTES
12 : : * This file implements waiting for WAL operations to reach specific LSNs
13 : : * on both physical standby and primary servers. The core idea is simple:
14 : : * every process that wants to wait publishes the LSN it needs to the
15 : : * shared memory, and the appropriate process (startup on standby,
16 : : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : : * once that LSN has been reached.
18 : : *
19 : : * The shared memory used by this module comprises a procInfos
20 : : * per-backend array with the information of the awaited LSN for each
21 : : * of the backend processes. The elements of that array are organized
22 : : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : : * allows for very fast finding of the least awaited LSN for each type.
24 : : *
25 : : * In addition, the least-awaited LSN for each type is cached in the
26 : : * minWaitedLSN array. The waiter process publishes information about
27 : : * itself to the shared memory and waits on the latch until it is woken
28 : : * up by the appropriate process, standby is promoted, or the postmaster
29 : : * dies. Then, it cleans information about itself in the shared memory.
30 : : *
31 : : * On standby servers:
32 : : * - After replaying a WAL record, the startup process performs a fast
33 : : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : : * whose awaited LSNs are reached.
36 : : * - After receiving WAL, the walreceiver process performs similar checks
37 : : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : : * and WRITE heaps, respectively.
39 : : *
40 : : * On primary servers: After flushing WAL, the WAL writer or backend
41 : : * process performs a similar check against the flush LSN and wakes up
42 : : * waiters whose target flush LSNs have been reached.
43 : : *
44 : : *-------------------------------------------------------------------------
45 : : */
46 : :
47 : : #include "postgres.h"
48 : :
49 : : #include <float.h>
50 : :
51 : : #include "access/xlog.h"
52 : : #include "access/xlogrecovery.h"
53 : : #include "access/xlogwait.h"
54 : : #include "miscadmin.h"
55 : : #include "pgstat.h"
56 : : #include "replication/walreceiver.h"
57 : : #include "storage/latch.h"
58 : : #include "storage/proc.h"
59 : : #include "storage/shmem.h"
60 : : #include "utils/fmgrprotos.h"
61 : : #include "utils/pg_lsn.h"
62 : : #include "utils/snapmgr.h"
63 : : #include "utils/wait_event.h"
64 : :
65 : :
66 : : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
67 : : void *arg);
68 : :
69 : : struct WaitLSNState *waitLSNState = NULL;
70 : :
71 : : /*
72 : : * Wait event for each WaitLSNType, used with WaitLatch() to report
73 : : * the wait in pg_stat_activity.
74 : : */
75 : : static const uint32 WaitLSNWaitEvents[] = {
76 : : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
77 : : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
78 : : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79 : : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
80 : : };
81 : :
82 : : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
83 : : "WaitLSNWaitEvents must match WaitLSNType enum");
84 : :
85 : : /*
86 : : * Get the current LSN for the specified wait type.
87 : : */
88 : : XLogRecPtr
69 akorotkov@postgresql 89 :GNC 105 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
90 : : {
91 [ - + ]: 105 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
92 : :
93 [ + + + + : 105 : switch (lsnType)
- ]
94 : : {
95 : 37 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
96 : 37 : return GetXLogReplayRecPtr(NULL);
97 : :
98 : 29 : case WAIT_LSN_TYPE_STANDBY_WRITE:
99 : 29 : return GetWalRcvWriteRecPtr();
100 : :
101 : 33 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
102 : 33 : return GetWalRcvFlushRecPtr(NULL, NULL);
103 : :
104 : 6 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
105 : 6 : return GetFlushRecPtr(NULL);
106 : : }
107 : :
69 akorotkov@postgresql 108 [ # # ]:UNC 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
109 : : pg_unreachable();
110 : : }
111 : :
112 : : /* Report the amount of shared memory space needed for WaitLSNState. */
113 : : Size
132 akorotkov@postgresql 114 :GNC 3297 : WaitLSNShmemSize(void)
115 : : {
116 : : Size size;
117 : :
118 : 3297 : size = offsetof(WaitLSNState, procInfos);
119 : 3297 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
120 : 3297 : return size;
121 : : }
122 : :
123 : : /* Initialize the WaitLSNState in the shared memory. */
124 : : void
125 : 1150 : WaitLSNShmemInit(void)
126 : : {
127 : : bool found;
128 : :
129 : 1150 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
130 : : WaitLSNShmemSize(),
131 : : &found);
132 [ + - ]: 1150 : if (!found)
133 : : {
134 : : int i;
135 : :
136 : : /* Initialize heaps and tracking */
137 [ + + ]: 5750 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
138 : : {
139 : 4600 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
117 140 : 4600 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
141 : : }
142 : :
143 : : /* Initialize process info array */
132 144 : 1150 : memset(&waitLSNState->procInfos, 0,
145 : 1150 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
146 : : }
147 : 1150 : }
148 : :
149 : : /*
150 : : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
151 : : * LSN, so that the waiter with smallest LSN is at the top.
152 : : */
153 : : static int
154 : 27 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
155 : : {
117 156 : 27 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
157 : 27 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
158 : :
132 159 [ + + ]: 27 : if (aproc->waitLSN < bproc->waitLSN)
160 : 15 : return 1;
161 [ + + ]: 12 : else if (aproc->waitLSN > bproc->waitLSN)
162 : 9 : return -1;
163 : : else
164 : 3 : return 0;
165 : : }
166 : :
167 : : /*
168 : : * Update minimum waited LSN for the specified LSN type
169 : : */
170 : : static void
171 : 2889 : updateMinWaitedLSN(WaitLSNType lsnType)
172 : : {
173 : 2889 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
174 : 2889 : int i = (int) lsnType;
175 : :
91 176 [ + - - + ]: 2889 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
177 : :
132 178 [ + + ]: 2889 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
179 : : {
180 : 47 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
117 181 : 47 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
182 : :
132 183 : 47 : minWaitedLSN = procInfo->waitLSN;
184 : : }
185 : 2889 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
186 : 2889 : }
187 : :
188 : : /*
189 : : * Add current process to appropriate waiters heap based on LSN type
190 : : */
191 : : static void
192 : 43 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
193 : : {
194 : 43 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
195 : 43 : int i = (int) lsnType;
196 : :
91 197 [ + - - + ]: 43 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
198 : :
132 199 : 43 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
200 : :
201 : 43 : procInfo->procno = MyProcNumber;
202 : 43 : procInfo->waitLSN = lsn;
117 203 : 43 : procInfo->lsnType = lsnType;
204 : :
205 [ - + ]: 43 : Assert(!procInfo->inHeap);
206 : 43 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
207 : 43 : procInfo->inHeap = true;
132 208 : 43 : updateMinWaitedLSN(lsnType);
209 : :
210 : 43 : LWLockRelease(WaitLSNLock);
211 : 43 : }
212 : :
213 : : /*
214 : : * Remove current process from appropriate waiters heap based on LSN type
215 : : */
216 : : static void
217 : 43 : deleteLSNWaiter(WaitLSNType lsnType)
218 : : {
219 : 43 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
220 : 43 : int i = (int) lsnType;
221 : :
91 222 [ + - - + ]: 43 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
223 : :
132 224 : 43 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
225 : :
117 226 [ - + ]: 43 : Assert(procInfo->lsnType == lsnType);
227 : :
228 [ + + ]: 43 : if (procInfo->inHeap)
229 : : {
230 : 17 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
231 : 17 : procInfo->inHeap = false;
132 232 : 17 : updateMinWaitedLSN(lsnType);
233 : : }
234 : :
235 : 43 : LWLockRelease(WaitLSNLock);
236 : 43 : }
237 : :
238 : : /*
239 : : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
240 : : * on the stack. It should be enough to take single iteration for most cases.
241 : : */
242 : : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
243 : :
244 : : /*
245 : : * Remove waiters whose LSN has been reached from the heap and set their
246 : : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
247 : : * and set latches for all waiters.
248 : : *
249 : : * This function first accumulates waiters to wake up into an array, then
250 : : * wakes them up without holding a WaitLSNLock. The array size is static and
251 : : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
252 : : * to wake up all the waiters at once in the vast majority of cases. However,
253 : : * if there are more waiters, this function will loop to process them in
254 : : * multiple chunks.
255 : : */
256 : : static void
257 : 2829 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
258 : : {
259 : : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
260 : : int numWakeUpProcs;
261 : 2829 : int i = (int) lsnType;
262 : :
91 263 [ + - - + ]: 2829 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
264 : :
265 : : do
266 : : {
267 : : int j;
268 : :
132 269 : 2829 : numWakeUpProcs = 0;
270 : 2829 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
271 : :
272 : : /*
273 : : * Iterate the waiters heap until we find LSN not yet reached. Record
274 : : * process numbers to wake up, but send wakeups after releasing lock.
275 : : */
276 [ + + ]: 2855 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
277 : : {
278 : 30 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
279 : : WaitLSNProcInfo *procInfo;
280 : :
281 : : /* Get procInfo using appropriate heap node */
117 282 : 30 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
283 : :
129 alvherre@kurilemu.de 284 [ + + + + ]: 30 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
132 akorotkov@postgresql 285 : 4 : break;
286 : :
287 [ - + ]: 26 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
288 : 26 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
289 : 26 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
290 : :
291 : : /* Update appropriate flag */
117 292 : 26 : procInfo->inHeap = false;
293 : :
132 294 [ - + ]: 26 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
132 akorotkov@postgresql 295 :UNC 0 : break;
296 : : }
297 : :
132 akorotkov@postgresql 298 :GNC 2829 : updateMinWaitedLSN(lsnType);
299 : 2829 : LWLockRelease(WaitLSNLock);
300 : :
301 : : /*
302 : : * Set latches for processes whose waited LSNs have been reached.
303 : : * Since SetLatch() is a time-consuming operation, we do this outside
304 : : * of WaitLSNLock. This is safe because procLatch is never freed, so
305 : : * at worst we may set a latch for the wrong process or for no process
306 : : * at all, which is harmless.
307 : : */
68 308 [ + + ]: 2855 : for (j = 0; j < numWakeUpProcs; j++)
309 : 26 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
310 : :
132 311 [ - + ]: 2829 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
312 : 2829 : }
313 : :
314 : : /*
315 : : * Wake up processes waiting for LSN to reach currentLSN
316 : : */
317 : : void
318 : 2829 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
319 : : {
320 : 2829 : int i = (int) lsnType;
321 : :
91 322 [ + - - + ]: 2829 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
323 : :
324 : : /*
325 : : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
326 : : * "wake all waiters" (e.g., during promotion when recovery ends).
327 : : */
120 328 [ + + - + ]: 2841 : if (XLogRecPtrIsValid(currentLSN) &&
329 : 12 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
132 akorotkov@postgresql 330 :UNC 0 : return;
331 : :
132 akorotkov@postgresql 332 :GNC 2829 : wakeupWaiters(lsnType, currentLSN);
333 : : }
334 : :
335 : : /*
336 : : * Clean up LSN waiters for exiting process
337 : : */
338 : : void
339 : 43569 : WaitLSNCleanup(void)
340 : : {
341 [ + - ]: 43569 : if (waitLSNState)
342 : : {
343 : : /*
344 : : * We do a fast-path check of the inHeap flag without the lock. This
345 : : * flag is set to true only by the process itself. So, it's only
346 : : * possible to get a false positive. But that will be eliminated by a
347 : : * recheck inside deleteLSNWaiter().
348 : : */
117 349 [ - + ]: 43569 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
117 akorotkov@postgresql 350 :UNC 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
351 : : }
132 akorotkov@postgresql 352 :GNC 43569 : }
353 : :
354 : : /*
355 : : * Check if the given LSN type requires recovery to be in progress.
356 : : * Standby wait types (replay, write, flush) require recovery;
357 : : * primary wait types (flush) do not.
358 : : */
359 : : static inline bool
69 360 : 101 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
361 : : {
362 [ + + ]: 66 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
363 [ + + + + ]: 167 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
364 : : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
365 : : }
366 : :
367 : : /*
368 : : * Wait using MyLatch till the given LSN is reached, the replica gets
369 : : * promoted, or the postmaster dies.
370 : : *
371 : : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
372 : : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
373 : : * or replica got promoted before the target LSN reached.
374 : : */
375 : : WaitLSNResult
132 376 : 43 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
377 : : {
378 : : XLogRecPtr currentLSN;
379 : 43 : TimestampTz endtime = 0;
380 : 43 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
381 : :
382 : : /* Shouldn't be called when shmem isn't initialized */
383 [ - + ]: 43 : Assert(waitLSNState);
384 : :
385 : : /* Should have a valid proc number */
101 386 [ + - - + ]: 43 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
387 : :
132 388 [ + + ]: 43 : if (timeout > 0)
389 : : {
390 : 32 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
391 : 32 : wake_events |= WL_TIMEOUT;
392 : : }
393 : :
394 : : /*
395 : : * Add our process to the waiters heap. It might happen that target LSN
396 : : * gets reached before we do. The check at the beginning of the loop
397 : : * below prevents the race condition.
398 : : */
399 : 43 : addLSNWaiter(targetLSN, lsnType);
400 : :
401 : : for (;;)
402 : 58 : {
403 : : int rc;
404 : 101 : long delay_ms = -1;
405 : :
406 : : /* Get current LSN for the wait type */
69 407 : 101 : currentLSN = GetCurrentLSNForWaitType(lsnType);
408 : :
409 : : /* Check that recovery is still in-progress */
410 [ + + + + ]: 101 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
411 : : {
412 : : /*
413 : : * Recovery was ended, but check if target LSN was already
414 : : * reached.
415 : : */
132 416 : 6 : deleteLSNWaiter(lsnType);
417 : :
418 [ + + + + ]: 6 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
419 : 1 : return WAIT_LSN_RESULT_SUCCESS;
420 : 5 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
421 : : }
422 : : else
423 : : {
424 : : /* Check if the waited LSN has been reached */
425 [ + + ]: 95 : if (targetLSN <= currentLSN)
426 : 34 : break;
427 : : }
428 : :
429 [ + + ]: 61 : if (timeout > 0)
430 : : {
431 : 47 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
432 [ + + ]: 47 : if (delay_ms <= 0)
433 : 3 : break;
434 : : }
435 : :
436 [ + + ]: 58 : CHECK_FOR_INTERRUPTS();
437 : :
438 : 58 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
69 439 : 58 : WaitLSNWaitEvents[lsnType]);
440 : :
441 : : /*
442 : : * Emergency bailout if postmaster has died. This is to avoid the
443 : : * necessity for manual cleanup of all postmaster children.
444 : : */
132 445 [ - + ]: 58 : if (rc & WL_POSTMASTER_DEATH)
132 akorotkov@postgresql 446 [ # # ]:UNC 0 : ereport(FATAL,
447 : : errcode(ERRCODE_ADMIN_SHUTDOWN),
448 : : errmsg("terminating connection due to unexpected postmaster exit"),
449 : : errcontext("while waiting for LSN"));
450 : :
132 akorotkov@postgresql 451 [ + + ]:GNC 58 : if (rc & WL_LATCH_SET)
452 : 55 : ResetLatch(MyLatch);
453 : : }
454 : :
455 : : /*
456 : : * Delete our process from the shared memory heap. We might already be
457 : : * deleted by the startup process. The 'inHeap' flags prevents us from
458 : : * the double deletion.
459 : : */
460 : 37 : deleteLSNWaiter(lsnType);
461 : :
462 : : /*
463 : : * If we didn't reach the target LSN, we must be exited by timeout.
464 : : */
465 [ + + ]: 37 : if (targetLSN > currentLSN)
466 : 3 : return WAIT_LSN_RESULT_TIMEOUT;
467 : :
468 : 34 : return WAIT_LSN_RESULT_SUCCESS;
469 : : }
|