Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogwait.c
4 : : * Implements waiting for WAL operations to reach specific LSNs.
5 : : *
6 : : * Copyright (c) 2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/access/transam/xlogwait.c
10 : : *
11 : : * NOTES
12 : : * This file implements waiting for WAL operations to reach specific LSNs
13 : : * on both physical standby and primary servers. The core idea is simple:
14 : : * every process that wants to wait publishes the LSN it needs to the
15 : : * shared memory, and the appropriate process (startup on standby, or
16 : : * WAL writer/backend on primary) wakes it once that LSN has been reached.
17 : : *
18 : : * The shared memory used by this module comprises a procInfos
19 : : * per-backend array with the information of the awaited LSN for each
20 : : * of the backend processes. The elements of that array are organized
21 : : * into a pairing heap waitersHeap, which allows for very fast finding
22 : : * of the least awaited LSN.
23 : : *
24 : : * In addition, the least-awaited LSN is cached as minWaitedLSN. The
25 : : * waiter process publishes information about itself to the shared
26 : : * memory and waits on the latch until it is woken up by the appropriate
27 : : * process, standby is promoted, or the postmaster dies. Then, it cleans
28 : : * information about itself in the shared memory.
29 : : *
30 : : * On standby servers: After replaying a WAL record, the startup process
31 : : * first performs a fast path check minWaitedLSN > replayLSN. If this
32 : : * check is negative, it checks waitersHeap and wakes up the backend
33 : : * whose awaited LSNs are reached.
34 : : *
35 : : * On primary servers: After flushing WAL, the WAL writer or backend
36 : : * process performs a similar check against the flush LSN and wakes up
37 : : * waiters whose target flush LSNs have been reached.
38 : : *
39 : : *-------------------------------------------------------------------------
40 : : */
41 : :
42 : : #include "postgres.h"
43 : :
44 : : #include <float.h>
45 : : #include <math.h>
46 : :
47 : : #include "access/xlog.h"
48 : : #include "access/xlogrecovery.h"
49 : : #include "access/xlogwait.h"
50 : : #include "miscadmin.h"
51 : : #include "pgstat.h"
52 : : #include "storage/latch.h"
53 : : #include "storage/proc.h"
54 : : #include "storage/shmem.h"
55 : : #include "utils/fmgrprotos.h"
56 : : #include "utils/pg_lsn.h"
57 : : #include "utils/snapmgr.h"
58 : :
59 : :
60 : : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
61 : : void *arg);
62 : :
63 : : struct WaitLSNState *waitLSNState = NULL;
64 : :
65 : : /* Report the amount of shared memory space needed for WaitLSNState. */
66 : : Size
45 akorotkov@postgresql 67 :GNC 3061 : WaitLSNShmemSize(void)
68 : : {
69 : : Size size;
70 : :
71 : 3061 : size = offsetof(WaitLSNState, procInfos);
72 : 3061 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
73 : 3061 : return size;
74 : : }
75 : :
76 : : /* Initialize the WaitLSNState in the shared memory. */
77 : : void
78 : 1071 : WaitLSNShmemInit(void)
79 : : {
80 : : bool found;
81 : :
82 : 1071 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
83 : : WaitLSNShmemSize(),
84 : : &found);
85 [ + - ]: 1071 : if (!found)
86 : : {
87 : : int i;
88 : :
89 : : /* Initialize heaps and tracking */
90 [ + + ]: 3213 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
91 : : {
92 : 2142 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
30 93 : 2142 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
94 : : }
95 : :
96 : : /* Initialize process info array */
45 97 : 1071 : memset(&waitLSNState->procInfos, 0,
98 : 1071 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
99 : : }
100 : 1071 : }
101 : :
102 : : /*
103 : : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
104 : : * LSN, so that the waiter with smallest LSN is at the top.
105 : : */
106 : : static int
107 : 8 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
108 : : {
30 109 : 8 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
110 : 8 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
111 : :
45 112 [ + + ]: 8 : if (aproc->waitLSN < bproc->waitLSN)
113 : 5 : return 1;
114 [ + - ]: 3 : else if (aproc->waitLSN > bproc->waitLSN)
115 : 3 : return -1;
116 : : else
45 akorotkov@postgresql 117 :UNC 0 : return 0;
118 : : }
119 : :
120 : : /*
121 : : * Update minimum waited LSN for the specified LSN type
122 : : */
123 : : static void
45 akorotkov@postgresql 124 :GNC 905 : updateMinWaitedLSN(WaitLSNType lsnType)
125 : : {
126 : 905 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
127 : 905 : int i = (int) lsnType;
128 : :
4 129 [ + - - + ]: 905 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
130 : :
45 131 [ + + ]: 905 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
132 : : {
133 : 21 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
30 134 : 21 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
135 : :
45 136 : 21 : minWaitedLSN = procInfo->waitLSN;
137 : : }
138 : 905 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
139 : 905 : }
140 : :
141 : : /*
142 : : * Add current process to appropriate waiters heap based on LSN type
143 : : */
144 : : static void
145 : 17 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
146 : : {
147 : 17 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
148 : 17 : int i = (int) lsnType;
149 : :
4 150 [ + - - + ]: 17 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
151 : :
45 152 : 17 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
153 : :
154 : 17 : procInfo->procno = MyProcNumber;
155 : 17 : procInfo->waitLSN = lsn;
30 156 : 17 : procInfo->lsnType = lsnType;
157 : :
158 [ - + ]: 17 : Assert(!procInfo->inHeap);
159 : 17 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
160 : 17 : procInfo->inHeap = true;
45 161 : 17 : updateMinWaitedLSN(lsnType);
162 : :
163 : 17 : LWLockRelease(WaitLSNLock);
164 : 17 : }
165 : :
166 : : /*
167 : : * Remove current process from appropriate waiters heap based on LSN type
168 : : */
169 : : static void
170 : 17 : deleteLSNWaiter(WaitLSNType lsnType)
171 : : {
172 : 17 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
173 : 17 : int i = (int) lsnType;
174 : :
4 175 [ + - - + ]: 17 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
176 : :
45 177 : 17 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178 : :
30 179 [ - + ]: 17 : Assert(procInfo->lsnType == lsnType);
180 : :
181 [ + + ]: 17 : if (procInfo->inHeap)
182 : : {
183 : 9 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
184 : 9 : procInfo->inHeap = false;
45 185 : 9 : updateMinWaitedLSN(lsnType);
186 : : }
187 : :
188 : 17 : LWLockRelease(WaitLSNLock);
189 : 17 : }
190 : :
191 : : /*
192 : : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
193 : : * on the stack. It should be enough to take single iteration for most cases.
194 : : */
195 : : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
196 : :
197 : : /*
198 : : * Remove waiters whose LSN has been reached from the heap and set their
199 : : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
200 : : * and set latches for all waiters.
201 : : *
202 : : * This function first accumulates waiters to wake up into an array, then
203 : : * wakes them up without holding a WaitLSNLock. The array size is static and
204 : : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
205 : : * to wake up all the waiters at once in the vast majority of cases. However,
206 : : * if there are more waiters, this function will loop to process them in
207 : : * multiple chunks.
208 : : */
209 : : static void
210 : 879 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
211 : : {
212 : : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
213 : : int numWakeUpProcs;
214 : 879 : int i = (int) lsnType;
215 : :
4 216 [ + - - + ]: 879 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
217 : :
218 : : do
219 : : {
45 220 : 879 : numWakeUpProcs = 0;
221 : 879 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
222 : :
223 : : /*
224 : : * Iterate the waiters heap until we find LSN not yet reached. Record
225 : : * process numbers to wake up, but send wakeups after releasing lock.
226 : : */
227 [ + + ]: 887 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
228 : : {
229 : 12 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
230 : : WaitLSNProcInfo *procInfo;
231 : :
232 : : /* Get procInfo using appropriate heap node */
30 233 : 12 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
234 : :
42 alvherre@kurilemu.de 235 [ + + + + ]: 12 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
45 akorotkov@postgresql 236 : 4 : break;
237 : :
238 [ - + ]: 8 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
239 : 8 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
240 : 8 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
241 : :
242 : : /* Update appropriate flag */
30 243 : 8 : procInfo->inHeap = false;
244 : :
45 245 [ - + ]: 8 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
45 akorotkov@postgresql 246 :UNC 0 : break;
247 : : }
248 : :
45 akorotkov@postgresql 249 :GNC 879 : updateMinWaitedLSN(lsnType);
250 : 879 : LWLockRelease(WaitLSNLock);
251 : :
252 : : /*
253 : : * Set latches for processes whose waited LSNs have been reached.
254 : : * Since SetLatch() is a time-consuming operation, we do this outside
255 : : * of WaitLSNLock. This is safe because procLatch is never freed, so
256 : : * at worst we may set a latch for the wrong process or for no process
257 : : * at all, which is harmless.
258 : : */
259 [ + + ]: 887 : for (i = 0; i < numWakeUpProcs; i++)
260 : 8 : SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
261 : :
262 [ - + ]: 879 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
263 : 879 : }
264 : :
265 : : /*
266 : : * Wake up processes waiting for LSN to reach currentLSN
267 : : */
268 : : void
269 : 879 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
270 : : {
271 : 879 : int i = (int) lsnType;
272 : :
4 273 [ + - - + ]: 879 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
274 : :
275 : : /*
276 : : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
277 : : * "wake all waiters" (e.g., during promotion when recovery ends).
278 : : */
33 279 [ + + - + ]: 886 : if (XLogRecPtrIsValid(currentLSN) &&
280 : 7 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
45 akorotkov@postgresql 281 :UNC 0 : return;
282 : :
45 akorotkov@postgresql 283 :GNC 879 : wakeupWaiters(lsnType, currentLSN);
284 : : }
285 : :
286 : : /*
287 : : * Clean up LSN waiters for exiting process
288 : : */
289 : : void
290 : 41409 : WaitLSNCleanup(void)
291 : : {
292 [ + - ]: 41409 : if (waitLSNState)
293 : : {
294 : : /*
295 : : * We do a fast-path check of the inHeap flag without the lock. This
296 : : * flag is set to true only by the process itself. So, it's only
297 : : * possible to get a false positive. But that will be eliminated by a
298 : : * recheck inside deleteLSNWaiter().
299 : : */
30 300 [ - + ]: 41409 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
30 akorotkov@postgresql 301 :UNC 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
302 : : }
45 akorotkov@postgresql 303 :GNC 41409 : }
304 : :
305 : : /*
306 : : * Wait using MyLatch till the given LSN is reached, the replica gets
307 : : * promoted, or the postmaster dies.
308 : : *
309 : : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
310 : : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
311 : : * or replica got promoted before the target LSN reached.
312 : : */
313 : : WaitLSNResult
314 : 17 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
315 : : {
316 : : XLogRecPtr currentLSN;
317 : 17 : TimestampTz endtime = 0;
318 : 17 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
319 : :
320 : : /* Shouldn't be called when shmem isn't initialized */
321 [ - + ]: 17 : Assert(waitLSNState);
322 : :
323 : : /* Should have a valid proc number */
14 324 [ + - - + ]: 17 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
325 : :
45 326 [ + + ]: 17 : if (timeout > 0)
327 : : {
328 : 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
329 : 8 : wake_events |= WL_TIMEOUT;
330 : : }
331 : :
332 : : /*
333 : : * Add our process to the waiters heap. It might happen that target LSN
334 : : * gets reached before we do. The check at the beginning of the loop
335 : : * below prevents the race condition.
336 : : */
337 : 17 : addLSNWaiter(targetLSN, lsnType);
338 : :
339 : : for (;;)
340 : 12 : {
341 : : int rc;
342 : 29 : long delay_ms = -1;
343 : :
344 [ + - ]: 29 : if (lsnType == WAIT_LSN_TYPE_REPLAY)
345 : 29 : currentLSN = GetXLogReplayRecPtr(NULL);
346 : : else
45 akorotkov@postgresql 347 :UNC 0 : currentLSN = GetFlushRecPtr(NULL);
348 : :
349 : : /* Check that recovery is still in-progress */
41 akorotkov@postgresql 350 [ + - + + ]:GNC 29 : if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
351 : : {
352 : : /*
353 : : * Recovery was ended, but check if target LSN was already
354 : : * reached.
355 : : */
45 356 : 4 : deleteLSNWaiter(lsnType);
357 : :
358 [ + + + + ]: 4 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
359 : 1 : return WAIT_LSN_RESULT_SUCCESS;
360 : 3 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
361 : : }
362 : : else
363 : : {
364 : : /* Check if the waited LSN has been reached */
365 [ + + ]: 25 : if (targetLSN <= currentLSN)
366 : 10 : break;
367 : : }
368 : :
369 [ + + ]: 15 : if (timeout > 0)
370 : : {
371 : 8 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
372 [ + + ]: 8 : if (delay_ms <= 0)
373 : 3 : break;
374 : : }
375 : :
376 [ - + ]: 12 : CHECK_FOR_INTERRUPTS();
377 : :
378 [ + - ]: 12 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
379 : : (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
380 : :
381 : : /*
382 : : * Emergency bailout if postmaster has died. This is to avoid the
383 : : * necessity for manual cleanup of all postmaster children.
384 : : */
385 [ - + ]: 12 : if (rc & WL_POSTMASTER_DEATH)
45 akorotkov@postgresql 386 [ # # ]:UNC 0 : ereport(FATAL,
387 : : errcode(ERRCODE_ADMIN_SHUTDOWN),
388 : : errmsg("terminating connection due to unexpected postmaster exit"),
389 : : errcontext("while waiting for LSN"));
390 : :
45 akorotkov@postgresql 391 [ + + ]:GNC 12 : if (rc & WL_LATCH_SET)
392 : 9 : ResetLatch(MyLatch);
393 : : }
394 : :
395 : : /*
396 : : * Delete our process from the shared memory heap. We might already be
397 : : * deleted by the startup process. The 'inHeap' flags prevents us from
398 : : * the double deletion.
399 : : */
400 : 13 : deleteLSNWaiter(lsnType);
401 : :
402 : : /*
403 : : * If we didn't reach the target LSN, we must be exited by timeout.
404 : : */
405 [ + + ]: 13 : if (targetLSN > currentLSN)
406 : 3 : return WAIT_LSN_RESULT_TIMEOUT;
407 : :
408 : 10 : return WAIT_LSN_RESULT_SUCCESS;
409 : : }
|