Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiverfuncs.c
4 : : *
5 : : * This file contains functions used by the startup process to communicate
6 : : * with the walreceiver process. Functions implementing walreceiver itself
7 : : * are in walreceiver.c.
8 : : *
9 : : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
10 : : *
11 : : *
12 : : * IDENTIFICATION
13 : : * src/backend/replication/walreceiverfuncs.c
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : : #include "postgres.h"
18 : :
19 : : #include <sys/stat.h>
20 : : #include <sys/time.h>
21 : : #include <time.h>
22 : : #include <unistd.h>
23 : : #include <signal.h>
24 : :
25 : : #include "access/xlog_internal.h"
26 : : #include "access/xlogrecovery.h"
27 : : #include "pgstat.h"
28 : : #include "replication/walreceiver.h"
29 : : #include "storage/pmsignal.h"
30 : : #include "storage/proc.h"
31 : : #include "storage/shmem.h"
32 : : #include "utils/timestamp.h"
33 : :
34 : : WalRcvData *WalRcv = NULL;
35 : :
36 : : /*
37 : : * How long to wait for walreceiver to start up after requesting
38 : : * postmaster to launch it. In seconds.
39 : : */
40 : : #define WALRCV_STARTUP_TIMEOUT 10
41 : :
42 : : /* Report shared memory space needed by WalRcvShmemInit */
43 : : Size
5713 heikki.linnakangas@i 44 :CBC 3967 : WalRcvShmemSize(void)
45 : : {
5671 bruce@momjian.us 46 : 3967 : Size size = 0;
47 : :
5713 heikki.linnakangas@i 48 : 3967 : size = add_size(size, sizeof(WalRcvData));
49 : :
50 : 3967 : return size;
51 : : }
52 : :
53 : : /* Allocate and initialize walreceiver-related shared memory */
54 : : void
55 : 1029 : WalRcvShmemInit(void)
56 : : {
57 : : bool found;
58 : :
59 : 1029 : WalRcv = (WalRcvData *)
60 : 1029 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61 : :
5610 tgl@sss.pgh.pa.us 62 [ + - ]: 1029 : if (!found)
63 : : {
64 : : /* First time through, so initialize */
65 [ + - + - : 1029 : MemSet(WalRcv, 0, WalRcvShmemSize());
+ - - + -
- ]
66 : 1029 : WalRcv->walRcvState = WALRCV_STOPPED;
1639 tmunro@postgresql.or 67 : 1029 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
5610 tgl@sss.pgh.pa.us 68 : 1029 : SpinLockInit(&WalRcv->mutex);
1661 fujii@postgresql.org 69 : 1029 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
309 heikki.linnakangas@i 70 : 1029 : WalRcv->procno = INVALID_PROC_NUMBER;
71 : : }
5713 72 : 1029 : }
73 : :
74 : : /* Is walreceiver running (or starting up)? */
75 : : bool
4650 76 : 1034 : WalRcvRunning(void)
77 : : {
3623 rhaas@postgresql.org 78 : 1034 : WalRcvData *walrcv = WalRcv;
79 : : WalRcvState state;
80 : : pg_time_t startTime;
81 : :
5713 heikki.linnakangas@i 82 [ - + ]: 1034 : SpinLockAcquire(&walrcv->mutex);
83 : :
5701 84 : 1034 : state = walrcv->walRcvState;
85 : 1034 : startTime = walrcv->startTime;
86 : :
87 : 1034 : SpinLockRelease(&walrcv->mutex);
88 : :
89 : : /*
90 : : * If it has taken too long for walreceiver to start up, give up. Setting
91 : : * the state to STOPPED ensures that if walreceiver later does start up
92 : : * after all, it will see that it's not supposed to be running and die
93 : : * without doing anything.
94 : : */
95 [ + + ]: 1034 : if (state == WALRCV_STARTING)
96 : : {
5671 bruce@momjian.us 97 : 1 : pg_time_t now = (pg_time_t) time(NULL);
98 : :
5701 heikki.linnakangas@i 99 [ - + ]: 1 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : : {
1639 tmunro@postgresql.or 101 :UBC 0 : bool stopped = false;
102 : :
103 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
5701 heikki.linnakangas@i 104 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : : {
106 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1639 tmunro@postgresql.or 107 : 0 : stopped = true;
108 : : }
5701 heikki.linnakangas@i 109 : 0 : SpinLockRelease(&walrcv->mutex);
110 : :
1639 tmunro@postgresql.or 111 [ # # ]: 0 : if (stopped)
112 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : : }
114 : : }
115 : :
5701 heikki.linnakangas@i 116 [ + + ]:CBC 1034 : if (state != WALRCV_STOPPED)
117 : 35 : return true;
118 : : else
119 : 999 : return false;
120 : : }
121 : :
122 : : /*
123 : : * Is walreceiver running and streaming (or at least attempting to connect,
124 : : * or starting up)?
125 : : */
126 : : bool
4650 127 : 27870 : WalRcvStreaming(void)
128 : : {
3623 rhaas@postgresql.org 129 : 27870 : WalRcvData *walrcv = WalRcv;
130 : : WalRcvState state;
131 : : pg_time_t startTime;
132 : :
4650 heikki.linnakangas@i 133 [ + + ]: 27870 : SpinLockAcquire(&walrcv->mutex);
134 : :
135 : 27870 : state = walrcv->walRcvState;
136 : 27870 : startTime = walrcv->startTime;
137 : :
138 : 27870 : SpinLockRelease(&walrcv->mutex);
139 : :
140 : : /*
141 : : * If it has taken too long for walreceiver to start up, give up. Setting
142 : : * the state to STOPPED ensures that if walreceiver later does start up
143 : : * after all, it will see that it's not supposed to be running and die
144 : : * without doing anything.
145 : : */
146 [ + + ]: 27870 : if (state == WALRCV_STARTING)
147 : : {
148 : 253 : pg_time_t now = (pg_time_t) time(NULL);
149 : :
150 [ - + ]: 253 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
151 : : {
1639 tmunro@postgresql.or 152 :UBC 0 : bool stopped = false;
153 : :
154 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
4650 heikki.linnakangas@i 155 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
156 : : {
157 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1639 tmunro@postgresql.or 158 : 0 : stopped = true;
159 : : }
4650 heikki.linnakangas@i 160 : 0 : SpinLockRelease(&walrcv->mutex);
161 : :
1639 tmunro@postgresql.or 162 [ # # ]: 0 : if (stopped)
163 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
164 : : }
165 : : }
166 : :
4650 heikki.linnakangas@i 167 [ + + + + :CBC 27870 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
- + ]
168 : : state == WALRCV_RESTARTING)
169 : 25277 : return true;
170 : : else
171 : 2593 : return false;
172 : : }
173 : :
174 : : /*
175 : : * Stop walreceiver (if running) and wait for it to die.
176 : : * Executed by the Startup process.
177 : : */
178 : : void
5713 179 : 998 : ShutdownWalRcv(void)
180 : : {
3623 rhaas@postgresql.org 181 : 998 : WalRcvData *walrcv = WalRcv;
5671 bruce@momjian.us 182 : 998 : pid_t walrcvpid = 0;
1639 tmunro@postgresql.or 183 : 998 : bool stopped = false;
184 : :
185 : : /*
186 : : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187 : : * mode once it's finished, and will also request postmaster to not
188 : : * restart itself.
189 : : */
5713 heikki.linnakangas@i 190 [ - + ]: 998 : SpinLockAcquire(&walrcv->mutex);
5671 bruce@momjian.us 191 [ + + + - : 998 : switch (walrcv->walRcvState)
- ]
192 : : {
5701 heikki.linnakangas@i 193 : 961 : case WALRCV_STOPPED:
194 : 961 : break;
195 : 3 : case WALRCV_STARTING:
196 : 3 : walrcv->walRcvState = WALRCV_STOPPED;
1639 tmunro@postgresql.or 197 : 3 : stopped = true;
5701 heikki.linnakangas@i 198 : 3 : break;
199 : :
4650 200 : 34 : case WALRCV_STREAMING:
201 : : case WALRCV_WAITING:
202 : : case WALRCV_RESTARTING:
5701 203 : 34 : walrcv->walRcvState = WALRCV_STOPPING;
204 : : /* fall through */
205 : 34 : case WALRCV_STOPPING:
206 : 34 : walrcvpid = walrcv->pid;
207 : 34 : break;
208 : : }
5713 209 : 998 : SpinLockRelease(&walrcv->mutex);
210 : :
211 : : /* Unnecessary but consistent. */
1639 tmunro@postgresql.or 212 [ + + ]: 998 : if (stopped)
213 : 3 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
214 : :
215 : : /*
216 : : * Signal walreceiver process if it was still running.
217 : : */
5713 heikki.linnakangas@i 218 [ + + ]: 998 : if (walrcvpid != 0)
219 : 34 : kill(walrcvpid, SIGTERM);
220 : :
221 : : /*
222 : : * Wait for walreceiver to acknowledge its death by setting state to
223 : : * WALRCV_STOPPED.
224 : : */
1639 tmunro@postgresql.or 225 : 998 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
4650 heikki.linnakangas@i 226 [ + + ]: 1029 : while (WalRcvRunning())
1639 tmunro@postgresql.or 227 : 31 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
228 : : WAIT_EVENT_WAL_RECEIVER_EXIT);
229 : 998 : ConditionVariableCancelSleep();
5713 heikki.linnakangas@i 230 : 998 : }
231 : :
232 : : /*
233 : : * Request postmaster to start walreceiver.
234 : : *
235 : : * "recptr" indicates the position where streaming should begin. "conninfo"
236 : : * is a libpq connection string to use. "slotname" is, optionally, the name
237 : : * of a replication slot to acquire. "create_temp_slot" indicates to create
238 : : * a temporary slot when no "slotname" is given.
239 : : *
240 : : * WAL receivers do not directly load GUC parameters used for the connection
241 : : * to the primary, and rely on the values passed down by the caller of this
242 : : * routine instead. Hence, the addition of any new parameters should happen
243 : : * through this code path.
244 : : */
245 : : void
4236 rhaas@postgresql.org 246 : 160 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
247 : : const char *slotname, bool create_temp_slot)
248 : : {
3623 249 : 160 : WalRcvData *walrcv = WalRcv;
4650 heikki.linnakangas@i 250 : 160 : bool launch = false;
5671 bruce@momjian.us 251 : 160 : pg_time_t now = (pg_time_t) time(NULL);
252 : : ProcNumber walrcv_proc;
253 : :
254 : : /*
255 : : * We always start at the beginning of the segment. That prevents a broken
256 : : * segment (i.e., with no records in the first half of a segment) from
257 : : * being created by XLOG streaming, which might cause trouble later on if
258 : : * the segment is e.g archived.
259 : : */
2909 andres@anarazel.de 260 [ + - ]: 160 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
261 : 160 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
262 : :
5544 tgl@sss.pgh.pa.us 263 [ - + ]: 160 : SpinLockAcquire(&walrcv->mutex);
264 : :
265 : : /* It better be stopped if we try to restart it */
4650 heikki.linnakangas@i 266 [ - + - - ]: 160 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
267 : : walrcv->walRcvState == WALRCV_WAITING);
268 : :
5713 269 [ + - ]: 160 : if (conninfo != NULL)
206 peter@eisentraut.org 270 : 160 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
271 : : else
5713 heikki.linnakangas@i 272 :UBC 0 : walrcv->conninfo[0] = '\0';
273 : :
274 : : /*
275 : : * Use configured replication slot if present, and ignore the value of
276 : : * create_temp_slot as the slot name should be persistent. Otherwise, use
277 : : * create_temp_slot to determine whether this WAL receiver should create a
278 : : * temporary slot by itself and use it, or not.
279 : : */
1989 alvherre@alvh.no-ip. 280 [ + - + + ]:CBC 160 : if (slotname != NULL && slotname[0] != '\0')
281 : : {
206 peter@eisentraut.org 282 : 44 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
1989 alvherre@alvh.no-ip. 283 : 44 : walrcv->is_temp_slot = false;
284 : : }
285 : : else
286 : : {
4236 rhaas@postgresql.org 287 : 116 : walrcv->slotname[0] = '\0';
1989 alvherre@alvh.no-ip. 288 : 116 : walrcv->is_temp_slot = create_temp_slot;
289 : : }
290 : :
4650 heikki.linnakangas@i 291 [ + - ]: 160 : if (walrcv->walRcvState == WALRCV_STOPPED)
292 : : {
293 : 160 : launch = true;
294 : 160 : walrcv->walRcvState = WALRCV_STARTING;
295 : : }
296 : : else
4650 heikki.linnakangas@i 297 :UBC 0 : walrcv->walRcvState = WALRCV_RESTARTING;
5701 heikki.linnakangas@i 298 :CBC 160 : walrcv->startTime = now;
299 : :
300 : : /*
301 : : * If this is the first startup of walreceiver (on this timeline),
302 : : * initialize flushedUpto and latestChunkStart to the starting point.
303 : : */
4504 304 [ + + - + ]: 160 : if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
305 : : {
1977 tmunro@postgresql.or 306 : 92 : walrcv->flushedUpto = recptr;
4504 heikki.linnakangas@i 307 : 92 : walrcv->receivedTLI = tli;
5303 308 : 92 : walrcv->latestChunkStart = recptr;
309 : : }
310 : 160 : walrcv->receiveStart = recptr;
4650 311 : 160 : walrcv->receiveStartTLI = tli;
312 : :
309 313 : 160 : walrcv_proc = walrcv->procno;
314 : :
5713 315 : 160 : SpinLockRelease(&walrcv->mutex);
316 : :
4650 317 [ + - ]: 160 : if (launch)
318 : 160 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
309 heikki.linnakangas@i 319 [ # # ]:UBC 0 : else if (walrcv_proc != INVALID_PROC_NUMBER)
320 : 0 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
5713 heikki.linnakangas@i 321 :CBC 160 : }
322 : :
323 : : /*
324 : : * Returns the last+1 byte position that walreceiver has flushed.
325 : : *
326 : : * Optionally, returns the previous chunk start, that is the first byte
327 : : * written in the most recent walreceiver flush cycle. Callers not
328 : : * interested in that value may pass NULL for latestChunkStart. Same for
329 : : * receiveTLI.
330 : : */
331 : : XLogRecPtr
1977 tmunro@postgresql.or 332 : 24620 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
333 : : {
3623 rhaas@postgresql.org 334 : 24620 : WalRcvData *walrcv = WalRcv;
335 : : XLogRecPtr recptr;
336 : :
5713 heikki.linnakangas@i 337 [ + + ]: 24620 : SpinLockAcquire(&walrcv->mutex);
1977 tmunro@postgresql.or 338 : 24620 : recptr = walrcv->flushedUpto;
5544 tgl@sss.pgh.pa.us 339 [ + + ]: 24620 : if (latestChunkStart)
340 : 23660 : *latestChunkStart = walrcv->latestChunkStart;
4650 heikki.linnakangas@i 341 [ + + ]: 24620 : if (receiveTLI)
342 : 24420 : *receiveTLI = walrcv->receivedTLI;
5713 343 : 24620 : SpinLockRelease(&walrcv->mutex);
344 : :
345 : 24620 : return recptr;
346 : : }
347 : :
348 : : /*
349 : : * Returns the last+1 byte position that walreceiver has written.
350 : : * This returns a recently written value without taking a lock.
351 : : */
352 : : XLogRecPtr
1977 tmunro@postgresql.or 353 :UBC 0 : GetWalRcvWriteRecPtr(void)
354 : : {
355 : 0 : WalRcvData *walrcv = WalRcv;
356 : :
357 : 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
358 : : }
359 : :
360 : : /*
361 : : * Returns the replication apply delay in ms or -1
362 : : * if the apply delay info is not available
363 : : */
364 : : int
4998 simon@2ndQuadrant.co 365 :CBC 401 : GetReplicationApplyDelay(void)
366 : : {
3623 rhaas@postgresql.org 367 : 401 : WalRcvData *walrcv = WalRcv;
368 : : XLogRecPtr receivePtr;
369 : : XLogRecPtr replayPtr;
370 : : TimestampTz chunkReplayStartTime;
371 : :
4998 simon@2ndQuadrant.co 372 [ - + ]: 401 : SpinLockAcquire(&walrcv->mutex);
1977 tmunro@postgresql.or 373 : 401 : receivePtr = walrcv->flushedUpto;
4998 simon@2ndQuadrant.co 374 : 401 : SpinLockRelease(&walrcv->mutex);
375 : :
4643 heikki.linnakangas@i 376 : 401 : replayPtr = GetXLogReplayRecPtr(NULL);
377 : :
4635 alvherre@alvh.no-ip. 378 [ + + ]: 401 : if (receivePtr == replayPtr)
4998 simon@2ndQuadrant.co 379 : 119 : return 0;
380 : :
3098 peter_e@gmx.net 381 : 282 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
382 : :
383 [ + + ]: 282 : if (chunkReplayStartTime == 0)
3829 ishii@postgresql.org 384 : 7 : return -1;
385 : :
1761 tgl@sss.pgh.pa.us 386 : 275 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
387 : : GetCurrentTimestamp());
388 : : }
389 : :
390 : : /*
391 : : * Returns the network latency in ms, note that this includes any
392 : : * difference in clock settings between the servers, as well as timezone.
393 : : */
394 : : int
4998 simon@2ndQuadrant.co 395 : 401 : GetReplicationTransferLatency(void)
396 : : {
3623 rhaas@postgresql.org 397 : 401 : WalRcvData *walrcv = WalRcv;
398 : : TimestampTz lastMsgSendTime;
399 : : TimestampTz lastMsgReceiptTime;
400 : :
4998 simon@2ndQuadrant.co 401 [ - + ]: 401 : SpinLockAcquire(&walrcv->mutex);
402 : 401 : lastMsgSendTime = walrcv->lastMsgSendTime;
403 : 401 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
404 : 401 : SpinLockRelease(&walrcv->mutex);
405 : :
1761 tgl@sss.pgh.pa.us 406 : 401 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
407 : : lastMsgReceiptTime);
408 : : }
|