Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiverfuncs.c
4 : : *
5 : : * This file contains functions used by the startup process to communicate
6 : : * with the walreceiver process. Functions implementing walreceiver itself
7 : : * are in walreceiver.c.
8 : : *
9 : : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
10 : : *
11 : : *
12 : : * IDENTIFICATION
13 : : * src/backend/replication/walreceiverfuncs.c
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : : #include "postgres.h"
18 : :
19 : : #include <sys/stat.h>
20 : : #include <sys/time.h>
21 : : #include <time.h>
22 : : #include <unistd.h>
23 : : #include <signal.h>
24 : :
25 : : #include "access/xlog_internal.h"
26 : : #include "access/xlogrecovery.h"
27 : : #include "pgstat.h"
28 : : #include "replication/walreceiver.h"
29 : : #include "storage/pmsignal.h"
30 : : #include "storage/proc.h"
31 : : #include "storage/shmem.h"
32 : : #include "utils/timestamp.h"
33 : :
34 : : WalRcvData *WalRcv = NULL;
35 : :
36 : : /*
37 : : * How long to wait for walreceiver to start up after requesting
38 : : * postmaster to launch it. In seconds.
39 : : */
40 : : #define WALRCV_STARTUP_TIMEOUT 10
41 : :
42 : : /* Report shared memory space needed by WalRcvShmemInit */
43 : : Size
5814 heikki.linnakangas@i 44 :CBC 4124 : WalRcvShmemSize(void)
45 : : {
5772 bruce@momjian.us 46 : 4124 : Size size = 0;
47 : :
5814 heikki.linnakangas@i 48 : 4124 : size = add_size(size, sizeof(WalRcvData));
49 : :
50 : 4124 : return size;
51 : : }
52 : :
53 : : /* Allocate and initialize walreceiver-related shared memory */
54 : : void
55 : 1069 : WalRcvShmemInit(void)
56 : : {
57 : : bool found;
58 : :
59 : 1069 : WalRcv = (WalRcvData *)
60 : 1069 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61 : :
5711 tgl@sss.pgh.pa.us 62 [ + - ]: 1069 : if (!found)
63 : : {
64 : : /* First time through, so initialize */
65 [ + - + - : 1069 : MemSet(WalRcv, 0, WalRcvShmemSize());
+ - - + -
- ]
66 : 1069 : WalRcv->walRcvState = WALRCV_STOPPED;
1740 tmunro@postgresql.or 67 : 1069 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
5711 tgl@sss.pgh.pa.us 68 : 1069 : SpinLockInit(&WalRcv->mutex);
1762 fujii@postgresql.org 69 : 1069 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
410 heikki.linnakangas@i 70 : 1069 : WalRcv->procno = INVALID_PROC_NUMBER;
71 : : }
5814 72 : 1069 : }
73 : :
74 : : /* Is walreceiver running (or starting up)? */
75 : : bool
4751 76 : 957 : WalRcvRunning(void)
77 : : {
3724 rhaas@postgresql.org 78 : 957 : WalRcvData *walrcv = WalRcv;
79 : : WalRcvState state;
80 : : pg_time_t startTime;
81 : :
5814 heikki.linnakangas@i 82 [ - + ]: 957 : SpinLockAcquire(&walrcv->mutex);
83 : :
5802 84 : 957 : state = walrcv->walRcvState;
85 : 957 : startTime = walrcv->startTime;
86 : :
87 : 957 : SpinLockRelease(&walrcv->mutex);
88 : :
89 : : /*
90 : : * If it has taken too long for walreceiver to start up, give up. Setting
91 : : * the state to STOPPED ensures that if walreceiver later does start up
92 : : * after all, it will see that it's not supposed to be running and die
93 : : * without doing anything.
94 : : */
95 [ + + ]: 957 : if (state == WALRCV_STARTING)
96 : : {
5772 bruce@momjian.us 97 : 1 : pg_time_t now = (pg_time_t) time(NULL);
98 : :
5802 heikki.linnakangas@i 99 [ - + ]: 1 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : : {
1740 tmunro@postgresql.or 101 :UBC 0 : bool stopped = false;
102 : :
103 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
5802 heikki.linnakangas@i 104 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : : {
106 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1740 tmunro@postgresql.or 107 : 0 : stopped = true;
108 : : }
5802 heikki.linnakangas@i 109 : 0 : SpinLockRelease(&walrcv->mutex);
110 : :
1740 tmunro@postgresql.or 111 [ # # ]: 0 : if (stopped)
112 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : : }
114 : : }
115 : :
5802 heikki.linnakangas@i 116 [ + + ]:CBC 957 : if (state != WALRCV_STOPPED)
117 : 34 : return true;
118 : : else
119 : 923 : return false;
120 : : }
121 : :
122 : : /* Return the state of the walreceiver. */
123 : : WalRcvState
42 michael@paquier.xyz 124 :GNC 110 : WalRcvGetState(void)
125 : : {
126 : 110 : WalRcvData *walrcv = WalRcv;
127 : : WalRcvState state;
128 : :
129 [ - + ]: 110 : SpinLockAcquire(&walrcv->mutex);
130 : 110 : state = walrcv->walRcvState;
131 : 110 : SpinLockRelease(&walrcv->mutex);
132 : :
133 : 110 : return state;
134 : : }
135 : :
136 : : /*
137 : : * Is walreceiver running and streaming (or at least attempting to connect,
138 : : * or starting up)?
139 : : */
140 : : bool
4751 heikki.linnakangas@i 141 :CBC 27146 : WalRcvStreaming(void)
142 : : {
3724 rhaas@postgresql.org 143 : 27146 : WalRcvData *walrcv = WalRcv;
144 : : WalRcvState state;
145 : : pg_time_t startTime;
146 : :
4751 heikki.linnakangas@i 147 [ - + ]: 27146 : SpinLockAcquire(&walrcv->mutex);
148 : :
149 : 27146 : state = walrcv->walRcvState;
150 : 27146 : startTime = walrcv->startTime;
151 : :
152 : 27146 : SpinLockRelease(&walrcv->mutex);
153 : :
154 : : /*
155 : : * If it has taken too long for walreceiver to start up, give up. Setting
156 : : * the state to STOPPED ensures that if walreceiver later does start up
157 : : * after all, it will see that it's not supposed to be running and die
158 : : * without doing anything.
159 : : */
160 [ + + ]: 27146 : if (state == WALRCV_STARTING)
161 : : {
162 : 236 : pg_time_t now = (pg_time_t) time(NULL);
163 : :
164 [ - + ]: 236 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 : : {
1740 tmunro@postgresql.or 166 :UBC 0 : bool stopped = false;
167 : :
168 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
4751 heikki.linnakangas@i 169 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
170 : : {
171 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1740 tmunro@postgresql.or 172 : 0 : stopped = true;
173 : : }
4751 heikki.linnakangas@i 174 : 0 : SpinLockRelease(&walrcv->mutex);
175 : :
1740 tmunro@postgresql.or 176 [ # # ]: 0 : if (stopped)
177 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
178 : : }
179 : : }
180 : :
4751 heikki.linnakangas@i 181 [ + + + + :CBC 27146 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ + ]
182 : : state == WALRCV_RESTARTING)
183 : 24373 : return true;
184 : : else
185 : 2773 : return false;
186 : : }
187 : :
188 : : /*
189 : : * Stop walreceiver (if running) and wait for it to die.
190 : : * Executed by the Startup process.
191 : : */
192 : : void
5814 193 : 922 : ShutdownWalRcv(void)
194 : : {
3724 rhaas@postgresql.org 195 : 922 : WalRcvData *walrcv = WalRcv;
5772 bruce@momjian.us 196 : 922 : pid_t walrcvpid = 0;
1740 tmunro@postgresql.or 197 : 922 : bool stopped = false;
198 : :
199 : : /*
200 : : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 : : * mode once it's finished, and will also request postmaster to not
202 : : * restart itself.
203 : : */
5814 heikki.linnakangas@i 204 [ - + ]: 922 : SpinLockAcquire(&walrcv->mutex);
5772 bruce@momjian.us 205 [ + + + - : 922 : switch (walrcv->walRcvState)
- ]
206 : : {
5802 heikki.linnakangas@i 207 : 888 : case WALRCV_STOPPED:
208 : 888 : break;
209 : 4 : case WALRCV_STARTING:
210 : 4 : walrcv->walRcvState = WALRCV_STOPPED;
1740 tmunro@postgresql.or 211 : 4 : stopped = true;
5802 heikki.linnakangas@i 212 : 4 : break;
213 : :
4751 214 : 30 : case WALRCV_STREAMING:
215 : : case WALRCV_WAITING:
216 : : case WALRCV_RESTARTING:
5802 217 : 30 : walrcv->walRcvState = WALRCV_STOPPING;
218 : : /* fall through */
219 : 30 : case WALRCV_STOPPING:
220 : 30 : walrcvpid = walrcv->pid;
221 : 30 : break;
222 : : }
5814 223 : 922 : SpinLockRelease(&walrcv->mutex);
224 : :
225 : : /* Unnecessary but consistent. */
1740 tmunro@postgresql.or 226 [ + + ]: 922 : if (stopped)
227 : 4 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
228 : :
229 : : /*
230 : : * Signal walreceiver process if it was still running.
231 : : */
5814 heikki.linnakangas@i 232 [ + + ]: 922 : if (walrcvpid != 0)
233 : 30 : kill(walrcvpid, SIGTERM);
234 : :
235 : : /*
236 : : * Wait for walreceiver to acknowledge its death by setting state to
237 : : * WALRCV_STOPPED.
238 : : */
1740 tmunro@postgresql.or 239 : 922 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
4751 heikki.linnakangas@i 240 [ + + ]: 952 : while (WalRcvRunning())
1740 tmunro@postgresql.or 241 : 30 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
242 : : WAIT_EVENT_WAL_RECEIVER_EXIT);
243 : 922 : ConditionVariableCancelSleep();
5814 heikki.linnakangas@i 244 : 922 : }
245 : :
246 : : /*
247 : : * Request postmaster to start walreceiver.
248 : : *
249 : : * "recptr" indicates the position where streaming should begin. "conninfo"
250 : : * is a libpq connection string to use. "slotname" is, optionally, the name
251 : : * of a replication slot to acquire. "create_temp_slot" indicates to create
252 : : * a temporary slot when no "slotname" is given.
253 : : *
254 : : * WAL receivers do not directly load GUC parameters used for the connection
255 : : * to the primary, and rely on the values passed down by the caller of this
256 : : * routine instead. Hence, the addition of any new parameters should happen
257 : : * through this code path.
258 : : */
259 : : void
4337 rhaas@postgresql.org 260 : 158 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
261 : : const char *slotname, bool create_temp_slot)
262 : : {
3724 263 : 158 : WalRcvData *walrcv = WalRcv;
4751 heikki.linnakangas@i 264 : 158 : bool launch = false;
5772 bruce@momjian.us 265 : 158 : pg_time_t now = (pg_time_t) time(NULL);
266 : : ProcNumber walrcv_proc;
267 : :
268 : : /*
269 : : * We always start at the beginning of the segment. That prevents a broken
270 : : * segment (i.e., with no records in the first half of a segment) from
271 : : * being created by XLOG streaming, which might cause trouble later on if
272 : : * the segment is e.g archived.
273 : : */
3010 andres@anarazel.de 274 [ + - ]: 158 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
275 : 158 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
276 : :
5645 tgl@sss.pgh.pa.us 277 [ - + ]: 158 : SpinLockAcquire(&walrcv->mutex);
278 : :
279 : : /* It better be stopped if we try to restart it */
4751 heikki.linnakangas@i 280 [ + + - + ]: 158 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
281 : : walrcv->walRcvState == WALRCV_WAITING);
282 : :
5814 283 [ + - ]: 158 : if (conninfo != NULL)
307 peter@eisentraut.org 284 : 158 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
285 : : else
5814 heikki.linnakangas@i 286 :UBC 0 : walrcv->conninfo[0] = '\0';
287 : :
288 : : /*
289 : : * Use configured replication slot if present, and ignore the value of
290 : : * create_temp_slot as the slot name should be persistent. Otherwise, use
291 : : * create_temp_slot to determine whether this WAL receiver should create a
292 : : * temporary slot by itself and use it, or not.
293 : : */
2090 alvherre@alvh.no-ip. 294 [ + - + + ]:CBC 158 : if (slotname != NULL && slotname[0] != '\0')
295 : : {
307 peter@eisentraut.org 296 : 45 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
2090 alvherre@alvh.no-ip. 297 : 45 : walrcv->is_temp_slot = false;
298 : : }
299 : : else
300 : : {
4337 rhaas@postgresql.org 301 : 113 : walrcv->slotname[0] = '\0';
2090 alvherre@alvh.no-ip. 302 : 113 : walrcv->is_temp_slot = create_temp_slot;
303 : : }
304 : :
4751 heikki.linnakangas@i 305 [ + + ]: 158 : if (walrcv->walRcvState == WALRCV_STOPPED)
306 : : {
307 : 152 : launch = true;
308 : 152 : walrcv->walRcvState = WALRCV_STARTING;
309 : : }
310 : : else
311 : 6 : walrcv->walRcvState = WALRCV_RESTARTING;
5802 312 : 158 : walrcv->startTime = now;
313 : :
314 : : /*
315 : : * If this is the first startup of walreceiver (on this timeline),
316 : : * initialize flushedUpto and latestChunkStart to the starting point.
317 : : */
40 alvherre@kurilemu.de 318 [ + + - + ]:GNC 158 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
319 : : {
2078 tmunro@postgresql.or 320 :CBC 94 : walrcv->flushedUpto = recptr;
4605 heikki.linnakangas@i 321 : 94 : walrcv->receivedTLI = tli;
5404 322 : 94 : walrcv->latestChunkStart = recptr;
323 : : }
324 : 158 : walrcv->receiveStart = recptr;
4751 325 : 158 : walrcv->receiveStartTLI = tli;
326 : :
410 327 : 158 : walrcv_proc = walrcv->procno;
328 : :
5814 329 : 158 : SpinLockRelease(&walrcv->mutex);
330 : :
4751 331 [ + + ]: 158 : if (launch)
332 : 152 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
410 333 [ + - ]: 6 : else if (walrcv_proc != INVALID_PROC_NUMBER)
334 : 6 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
5814 335 : 158 : }
336 : :
337 : : /*
338 : : * Returns the last+1 byte position that walreceiver has flushed.
339 : : *
340 : : * Optionally, returns the previous chunk start, that is the first byte
341 : : * written in the most recent walreceiver flush cycle. Callers not
342 : : * interested in that value may pass NULL for latestChunkStart. Same for
343 : : * receiveTLI.
344 : : */
345 : : XLogRecPtr
2078 tmunro@postgresql.or 346 : 23649 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
347 : : {
3724 rhaas@postgresql.org 348 : 23649 : WalRcvData *walrcv = WalRcv;
349 : : XLogRecPtr recptr;
350 : :
5814 heikki.linnakangas@i 351 [ - + ]: 23649 : SpinLockAcquire(&walrcv->mutex);
2078 tmunro@postgresql.or 352 : 23649 : recptr = walrcv->flushedUpto;
5645 tgl@sss.pgh.pa.us 353 [ + + ]: 23649 : if (latestChunkStart)
354 : 22688 : *latestChunkStart = walrcv->latestChunkStart;
4751 heikki.linnakangas@i 355 [ + + ]: 23649 : if (receiveTLI)
356 : 23447 : *receiveTLI = walrcv->receivedTLI;
5814 357 : 23649 : SpinLockRelease(&walrcv->mutex);
358 : :
359 : 23649 : return recptr;
360 : : }
361 : :
362 : : /*
363 : : * Returns the last+1 byte position that walreceiver has written.
364 : : * This returns a recently written value without taking a lock.
365 : : */
366 : : XLogRecPtr
2078 tmunro@postgresql.or 367 :UBC 0 : GetWalRcvWriteRecPtr(void)
368 : : {
369 : 0 : WalRcvData *walrcv = WalRcv;
370 : :
371 : 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
372 : : }
373 : :
374 : : /*
375 : : * Returns the replication apply delay in ms or -1
376 : : * if the apply delay info is not available
377 : : */
378 : : int
5099 simon@2ndQuadrant.co 379 :CBC 415 : GetReplicationApplyDelay(void)
380 : : {
3724 rhaas@postgresql.org 381 : 415 : WalRcvData *walrcv = WalRcv;
382 : : XLogRecPtr receivePtr;
383 : : XLogRecPtr replayPtr;
384 : : TimestampTz chunkReplayStartTime;
385 : :
5099 simon@2ndQuadrant.co 386 [ - + ]: 415 : SpinLockAcquire(&walrcv->mutex);
2078 tmunro@postgresql.or 387 : 415 : receivePtr = walrcv->flushedUpto;
5099 simon@2ndQuadrant.co 388 : 415 : SpinLockRelease(&walrcv->mutex);
389 : :
4744 heikki.linnakangas@i 390 : 415 : replayPtr = GetXLogReplayRecPtr(NULL);
391 : :
4736 alvherre@alvh.no-ip. 392 [ + + ]: 415 : if (receivePtr == replayPtr)
5099 simon@2ndQuadrant.co 393 : 148 : return 0;
394 : :
3199 peter_e@gmx.net 395 : 267 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
396 : :
397 [ + + ]: 267 : if (chunkReplayStartTime == 0)
3930 ishii@postgresql.org 398 : 1 : return -1;
399 : :
1862 tgl@sss.pgh.pa.us 400 : 266 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
401 : : GetCurrentTimestamp());
402 : : }
403 : :
404 : : /*
405 : : * Returns the network latency in ms, note that this includes any
406 : : * difference in clock settings between the servers, as well as timezone.
407 : : */
408 : : int
5099 simon@2ndQuadrant.co 409 : 415 : GetReplicationTransferLatency(void)
410 : : {
3724 rhaas@postgresql.org 411 : 415 : WalRcvData *walrcv = WalRcv;
412 : : TimestampTz lastMsgSendTime;
413 : : TimestampTz lastMsgReceiptTime;
414 : :
5099 simon@2ndQuadrant.co 415 [ - + ]: 415 : SpinLockAcquire(&walrcv->mutex);
416 : 415 : lastMsgSendTime = walrcv->lastMsgSendTime;
417 : 415 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
418 : 415 : SpinLockRelease(&walrcv->mutex);
419 : :
1862 tgl@sss.pgh.pa.us 420 : 415 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
421 : : lastMsgReceiptTime);
422 : : }
|