Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiverfuncs.c
4 : : *
5 : : * This file contains functions used by the startup process to communicate
6 : : * with the walreceiver process. Functions implementing walreceiver itself
7 : : * are in walreceiver.c.
8 : : *
9 : : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : : *
11 : : *
12 : : * IDENTIFICATION
13 : : * src/backend/replication/walreceiverfuncs.c
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : : #include "postgres.h"
18 : :
19 : : #include <sys/stat.h>
20 : : #include <sys/time.h>
21 : : #include <time.h>
22 : : #include <unistd.h>
23 : : #include <signal.h>
24 : :
25 : : #include "access/xlog_internal.h"
26 : : #include "access/xlogrecovery.h"
27 : : #include "pgstat.h"
28 : : #include "replication/walreceiver.h"
29 : : #include "storage/pmsignal.h"
30 : : #include "storage/proc.h"
31 : : #include "storage/shmem.h"
32 : : #include "utils/timestamp.h"
33 : : #include "utils/wait_event.h"
34 : :
35 : : WalRcvData *WalRcv = NULL;
36 : :
37 : : /*
38 : : * How long to wait for walreceiver to start up after requesting
39 : : * postmaster to launch it. In seconds.
40 : : */
41 : : #define WALRCV_STARTUP_TIMEOUT 10
42 : :
43 : : /* Report shared memory space needed by WalRcvShmemInit */
44 : : Size
5903 heikki.linnakangas@i 45 :CBC 4447 : WalRcvShmemSize(void)
46 : : {
5861 bruce@momjian.us 47 : 4447 : Size size = 0;
48 : :
5903 heikki.linnakangas@i 49 : 4447 : size = add_size(size, sizeof(WalRcvData));
50 : :
51 : 4447 : return size;
52 : : }
53 : :
54 : : /* Allocate and initialize walreceiver-related shared memory */
55 : : void
56 : 1150 : WalRcvShmemInit(void)
57 : : {
58 : : bool found;
59 : :
60 : 1150 : WalRcv = (WalRcvData *)
61 : 1150 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
62 : :
5800 tgl@sss.pgh.pa.us 63 [ + - ]: 1150 : if (!found)
64 : : {
65 : : /* First time through, so initialize */
66 [ + - + - : 1150 : MemSet(WalRcv, 0, WalRcvShmemSize());
+ - - + -
- ]
67 : 1150 : WalRcv->walRcvState = WALRCV_STOPPED;
1829 tmunro@postgresql.or 68 : 1150 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
5800 tgl@sss.pgh.pa.us 69 : 1150 : SpinLockInit(&WalRcv->mutex);
1851 fujii@postgresql.org 70 : 1150 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
499 heikki.linnakangas@i 71 : 1150 : WalRcv->procno = INVALID_PROC_NUMBER;
72 : : }
5903 73 : 1150 : }
74 : :
75 : : /* Is walreceiver running (or starting up)? */
76 : : bool
4840 77 : 1047 : WalRcvRunning(void)
78 : : {
3813 rhaas@postgresql.org 79 : 1047 : WalRcvData *walrcv = WalRcv;
80 : : WalRcvState state;
81 : : pg_time_t startTime;
82 : :
5903 heikki.linnakangas@i 83 [ - + ]: 1047 : SpinLockAcquire(&walrcv->mutex);
84 : :
5891 85 : 1047 : state = walrcv->walRcvState;
86 : 1047 : startTime = walrcv->startTime;
87 : :
88 : 1047 : SpinLockRelease(&walrcv->mutex);
89 : :
90 : : /*
91 : : * If it has taken too long for walreceiver to start up, give up. Setting
92 : : * the state to STOPPED ensures that if walreceiver later does start up
93 : : * after all, it will see that it's not supposed to be running and die
94 : : * without doing anything.
95 : : */
96 [ + + ]: 1047 : if (state == WALRCV_STARTING)
97 : : {
5861 bruce@momjian.us 98 : 1 : pg_time_t now = (pg_time_t) time(NULL);
99 : :
5891 heikki.linnakangas@i 100 [ - + ]: 1 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
101 : : {
1829 tmunro@postgresql.or 102 :UBC 0 : bool stopped = false;
103 : :
104 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
5891 heikki.linnakangas@i 105 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
106 : : {
107 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1829 tmunro@postgresql.or 108 : 0 : stopped = true;
109 : : }
5891 heikki.linnakangas@i 110 : 0 : SpinLockRelease(&walrcv->mutex);
111 : :
1829 tmunro@postgresql.or 112 [ # # ]: 0 : if (stopped)
113 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
114 : : }
115 : : }
116 : :
5891 heikki.linnakangas@i 117 [ + + ]:CBC 1047 : if (state != WALRCV_STOPPED)
118 : 42 : return true;
119 : : else
120 : 1005 : return false;
121 : : }
122 : :
123 : : /* Return the state of the walreceiver. */
124 : : WalRcvState
131 michael@paquier.xyz 125 :GNC 140 : WalRcvGetState(void)
126 : : {
127 : 140 : WalRcvData *walrcv = WalRcv;
128 : : WalRcvState state;
129 : :
130 : 140 : SpinLockAcquire(&walrcv->mutex);
131 : 140 : state = walrcv->walRcvState;
132 : 140 : SpinLockRelease(&walrcv->mutex);
133 : :
134 : 140 : return state;
135 : : }
136 : :
137 : : /*
138 : : * Is walreceiver running and streaming (or at least attempting to connect,
139 : : * or starting up)?
140 : : */
141 : : bool
4840 heikki.linnakangas@i 142 :CBC 28187 : WalRcvStreaming(void)
143 : : {
3813 rhaas@postgresql.org 144 : 28187 : WalRcvData *walrcv = WalRcv;
145 : : WalRcvState state;
146 : : pg_time_t startTime;
147 : :
4840 heikki.linnakangas@i 148 [ - + ]: 28187 : SpinLockAcquire(&walrcv->mutex);
149 : :
150 : 28187 : state = walrcv->walRcvState;
151 : 28187 : startTime = walrcv->startTime;
152 : :
153 : 28187 : SpinLockRelease(&walrcv->mutex);
154 : :
155 : : /*
156 : : * If it has taken too long for walreceiver to start up, give up. Setting
157 : : * the state to STOPPED ensures that if walreceiver later does start up
158 : : * after all, it will see that it's not supposed to be running and die
159 : : * without doing anything.
160 : : */
161 [ + + ]: 28187 : if (state == WALRCV_STARTING)
162 : : {
163 : 277 : pg_time_t now = (pg_time_t) time(NULL);
164 : :
165 [ - + ]: 277 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
166 : : {
1829 tmunro@postgresql.or 167 :UBC 0 : bool stopped = false;
168 : :
169 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
4840 heikki.linnakangas@i 170 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
171 : : {
172 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1829 tmunro@postgresql.or 173 : 0 : stopped = true;
174 : : }
4840 heikki.linnakangas@i 175 : 0 : SpinLockRelease(&walrcv->mutex);
176 : :
1829 tmunro@postgresql.or 177 [ # # ]: 0 : if (stopped)
178 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
179 : : }
180 : : }
181 : :
4840 heikki.linnakangas@i 182 [ + + + + :CBC 28187 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ + ]
51 michael@paquier.xyz 183 [ + + ]:GNC 3058 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
4840 heikki.linnakangas@i 184 :CBC 25134 : return true;
185 : : else
186 : 3053 : return false;
187 : : }
188 : :
189 : : /*
190 : : * Stop walreceiver (if running) and wait for it to die.
191 : : * Executed by the Startup process.
192 : : */
193 : : void
5903 194 : 1001 : ShutdownWalRcv(void)
195 : : {
3813 rhaas@postgresql.org 196 : 1001 : WalRcvData *walrcv = WalRcv;
5861 bruce@momjian.us 197 : 1001 : pid_t walrcvpid = 0;
1829 tmunro@postgresql.or 198 : 1001 : bool stopped = false;
199 : :
200 : : /*
201 : : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
202 : : * mode once it's finished, and will also request postmaster to not
203 : : * restart itself.
204 : : */
5903 heikki.linnakangas@i 205 [ - + ]: 1001 : SpinLockAcquire(&walrcv->mutex);
5861 bruce@momjian.us 206 [ + + + - : 1001 : switch (walrcv->walRcvState)
- ]
207 : : {
5891 heikki.linnakangas@i 208 : 959 : case WALRCV_STOPPED:
209 : 959 : break;
210 : 4 : case WALRCV_STARTING:
211 : 4 : walrcv->walRcvState = WALRCV_STOPPED;
1829 tmunro@postgresql.or 212 : 4 : stopped = true;
5891 heikki.linnakangas@i 213 : 4 : break;
214 : :
51 michael@paquier.xyz 215 :GNC 38 : case WALRCV_CONNECTING:
4840 heikki.linnakangas@i 216 :ECB (29) : case WALRCV_STREAMING:
217 : : case WALRCV_WAITING:
218 : : case WALRCV_RESTARTING:
5891 heikki.linnakangas@i 219 :CBC 38 : walrcv->walRcvState = WALRCV_STOPPING;
220 : : pg_fallthrough;
221 : 38 : case WALRCV_STOPPING:
222 : 38 : walrcvpid = walrcv->pid;
223 : 38 : break;
224 : : }
5903 225 : 1001 : SpinLockRelease(&walrcv->mutex);
226 : :
227 : : /* Unnecessary but consistent. */
1829 tmunro@postgresql.or 228 [ + + ]: 1001 : if (stopped)
229 : 4 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230 : :
231 : : /*
232 : : * Signal walreceiver process if it was still running.
233 : : */
5903 heikki.linnakangas@i 234 [ + + ]: 1001 : if (walrcvpid != 0)
235 : 38 : kill(walrcvpid, SIGTERM);
236 : :
237 : : /*
238 : : * Wait for walreceiver to acknowledge its death by setting state to
239 : : * WALRCV_STOPPED.
240 : : */
1829 tmunro@postgresql.or 241 : 1001 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
4840 heikki.linnakangas@i 242 [ + + ]: 1036 : while (WalRcvRunning())
1829 tmunro@postgresql.or 243 : 35 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
244 : : WAIT_EVENT_WAL_RECEIVER_EXIT);
245 : 1001 : ConditionVariableCancelSleep();
5903 heikki.linnakangas@i 246 : 1001 : }
247 : :
248 : : /*
249 : : * Request postmaster to start walreceiver.
250 : : *
251 : : * "recptr" indicates the position where streaming should begin. "conninfo"
252 : : * is a libpq connection string to use. "slotname" is, optionally, the name
253 : : * of a replication slot to acquire. "create_temp_slot" indicates to create
254 : : * a temporary slot when no "slotname" is given.
255 : : *
256 : : * WAL receivers do not directly load GUC parameters used for the connection
257 : : * to the primary, and rely on the values passed down by the caller of this
258 : : * routine instead. Hence, the addition of any new parameters should happen
259 : : * through this code path.
260 : : */
261 : : void
4426 rhaas@postgresql.org 262 : 188 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
263 : : const char *slotname, bool create_temp_slot)
264 : : {
3813 265 : 188 : WalRcvData *walrcv = WalRcv;
4840 heikki.linnakangas@i 266 : 188 : bool launch = false;
5861 bruce@momjian.us 267 : 188 : pg_time_t now = (pg_time_t) time(NULL);
268 : : ProcNumber walrcv_proc;
269 : :
270 : : /*
271 : : * We always start at the beginning of the segment. That prevents a broken
272 : : * segment (i.e., with no records in the first half of a segment) from
273 : : * being created by XLOG streaming, which might cause trouble later on if
274 : : * the segment is e.g archived.
275 : : */
3099 andres@anarazel.de 276 [ + - ]: 188 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
277 : 188 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
278 : :
5734 tgl@sss.pgh.pa.us 279 [ - + ]: 188 : SpinLockAcquire(&walrcv->mutex);
280 : :
281 : : /* It better be stopped if we try to restart it */
4840 heikki.linnakangas@i 282 [ + + - + ]: 188 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
283 : : walrcv->walRcvState == WALRCV_WAITING);
284 : :
5903 285 [ + - ]: 188 : if (conninfo != NULL)
396 peter@eisentraut.org 286 : 188 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
287 : : else
5903 heikki.linnakangas@i 288 :UBC 0 : walrcv->conninfo[0] = '\0';
289 : :
290 : : /*
291 : : * Use configured replication slot if present, and ignore the value of
292 : : * create_temp_slot as the slot name should be persistent. Otherwise, use
293 : : * create_temp_slot to determine whether this WAL receiver should create a
294 : : * temporary slot by itself and use it, or not.
295 : : */
2179 alvherre@alvh.no-ip. 296 [ + - + + ]:CBC 188 : if (slotname != NULL && slotname[0] != '\0')
297 : : {
396 peter@eisentraut.org 298 : 53 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
2179 alvherre@alvh.no-ip. 299 : 53 : walrcv->is_temp_slot = false;
300 : : }
301 : : else
302 : : {
4426 rhaas@postgresql.org 303 : 135 : walrcv->slotname[0] = '\0';
2179 alvherre@alvh.no-ip. 304 : 135 : walrcv->is_temp_slot = create_temp_slot;
305 : : }
306 : :
4840 heikki.linnakangas@i 307 [ + + ]: 188 : if (walrcv->walRcvState == WALRCV_STOPPED)
308 : : {
309 : 181 : launch = true;
310 : 181 : walrcv->walRcvState = WALRCV_STARTING;
311 : : }
312 : : else
313 : 7 : walrcv->walRcvState = WALRCV_RESTARTING;
5891 314 : 188 : walrcv->startTime = now;
315 : :
316 : : /*
317 : : * If this is the first startup of walreceiver (on this timeline),
318 : : * initialize flushedUpto and latestChunkStart to the starting point.
319 : : */
129 alvherre@kurilemu.de 320 [ + + - + ]:GNC 188 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
321 : : {
2167 tmunro@postgresql.or 322 :CBC 107 : walrcv->flushedUpto = recptr;
4694 heikki.linnakangas@i 323 : 107 : walrcv->receivedTLI = tli;
5493 324 : 107 : walrcv->latestChunkStart = recptr;
325 : : }
326 : 188 : walrcv->receiveStart = recptr;
4840 327 : 188 : walrcv->receiveStartTLI = tli;
328 : :
499 329 : 188 : walrcv_proc = walrcv->procno;
330 : :
5903 331 : 188 : SpinLockRelease(&walrcv->mutex);
332 : :
4840 333 [ + + ]: 188 : if (launch)
334 : 181 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
499 335 [ + - ]: 7 : else if (walrcv_proc != INVALID_PROC_NUMBER)
336 : 7 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
5903 337 : 188 : }
338 : :
339 : : /*
340 : : * Returns the last+1 byte position that walreceiver has flushed.
341 : : *
342 : : * Optionally, returns the previous chunk start, that is the first byte
343 : : * written in the most recent walreceiver flush cycle. Callers not
344 : : * interested in that value may pass NULL for latestChunkStart. Same for
345 : : * receiveTLI.
346 : : */
347 : : XLogRecPtr
2167 tmunro@postgresql.or 348 : 24693 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
349 : : {
3813 rhaas@postgresql.org 350 : 24693 : WalRcvData *walrcv = WalRcv;
351 : : XLogRecPtr recptr;
352 : :
5903 heikki.linnakangas@i 353 [ - + ]: 24693 : SpinLockAcquire(&walrcv->mutex);
2167 tmunro@postgresql.or 354 : 24693 : recptr = walrcv->flushedUpto;
5734 tgl@sss.pgh.pa.us 355 [ + + ]: 24693 : if (latestChunkStart)
356 : 23479 : *latestChunkStart = walrcv->latestChunkStart;
4840 heikki.linnakangas@i 357 [ + + ]: 24693 : if (receiveTLI)
358 : 24451 : *receiveTLI = walrcv->receivedTLI;
5903 359 : 24693 : SpinLockRelease(&walrcv->mutex);
360 : :
361 : 24693 : return recptr;
362 : : }
363 : :
364 : : /*
365 : : * Returns the last+1 byte position that walreceiver has written.
366 : : * This returns a recently written value without taking a lock.
367 : : */
368 : : XLogRecPtr
2167 tmunro@postgresql.or 369 :GBC 29 : GetWalRcvWriteRecPtr(void)
370 : : {
371 : 29 : WalRcvData *walrcv = WalRcv;
372 : :
373 : 29 : return pg_atomic_read_u64(&walrcv->writtenUpto);
374 : : }
375 : :
376 : : /*
377 : : * Returns the replication apply delay in ms or -1
378 : : * if the apply delay info is not available
379 : : */
380 : : int
5188 simon@2ndQuadrant.co 381 :CBC 410 : GetReplicationApplyDelay(void)
382 : : {
3813 rhaas@postgresql.org 383 : 410 : WalRcvData *walrcv = WalRcv;
384 : : XLogRecPtr receivePtr;
385 : : XLogRecPtr replayPtr;
386 : : TimestampTz chunkReplayStartTime;
387 : :
5188 simon@2ndQuadrant.co 388 [ - + ]: 410 : SpinLockAcquire(&walrcv->mutex);
2167 tmunro@postgresql.or 389 : 410 : receivePtr = walrcv->flushedUpto;
5188 simon@2ndQuadrant.co 390 : 410 : SpinLockRelease(&walrcv->mutex);
391 : :
4833 heikki.linnakangas@i 392 : 410 : replayPtr = GetXLogReplayRecPtr(NULL);
393 : :
4825 alvherre@alvh.no-ip. 394 [ + + ]: 410 : if (receivePtr == replayPtr)
5188 simon@2ndQuadrant.co 395 : 120 : return 0;
396 : :
3288 peter_e@gmx.net 397 : 290 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
398 : :
399 [ + + ]: 290 : if (chunkReplayStartTime == 0)
4019 ishii@postgresql.org 400 : 6 : return -1;
401 : :
1951 tgl@sss.pgh.pa.us 402 : 284 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
403 : : GetCurrentTimestamp());
404 : : }
405 : :
406 : : /*
407 : : * Returns the network latency in ms, note that this includes any
408 : : * difference in clock settings between the servers, as well as timezone.
409 : : */
410 : : int
5188 simon@2ndQuadrant.co 411 : 410 : GetReplicationTransferLatency(void)
412 : : {
3813 rhaas@postgresql.org 413 : 410 : WalRcvData *walrcv = WalRcv;
414 : : TimestampTz lastMsgSendTime;
415 : : TimestampTz lastMsgReceiptTime;
416 : :
5188 simon@2ndQuadrant.co 417 [ - + ]: 410 : SpinLockAcquire(&walrcv->mutex);
418 : 410 : lastMsgSendTime = walrcv->lastMsgSendTime;
419 : 410 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 : 410 : SpinLockRelease(&walrcv->mutex);
421 : :
1951 tgl@sss.pgh.pa.us 422 : 410 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 : : lastMsgReceiptTime);
424 : : }
|