Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * syncrep.c
4 : : *
5 : : * Synchronous replication is new as of PostgreSQL 9.1.
6 : : *
7 : : * If requested, transaction commits wait until their commit LSN are
8 : : * acknowledged by the synchronous standbys.
9 : : *
10 : : * This module contains the code for waiting and release of backends.
11 : : * All code in this module executes on the primary. The core streaming
12 : : * replication transport remains within WALreceiver/WALsender modules.
13 : : *
14 : : * The essence of this design is that it isolates all logic about
15 : : * waiting/releasing onto the primary. The primary defines which standbys
16 : : * it wishes to wait for. The standbys are completely unaware of the
17 : : * durability requirements of transactions on the primary, reducing the
18 : : * complexity of the code and streamlining both standby operations and
19 : : * network bandwidth because there is no requirement to ship
20 : : * per-transaction state information.
21 : : *
22 : : * Replication is either synchronous or not synchronous (async). If it is
23 : : * async, we just fastpath out of here. If it is sync, then we wait for
24 : : * the write, flush or apply location on the standby before releasing
25 : : * the waiting backend. Further complexity in that interaction is
26 : : * expected in later releases.
27 : : *
28 : : * The best performing way to manage the waiting backends is to have a
29 : : * single ordered queue of waiting backends, so that we can avoid
30 : : * searching the through all waiters each time we receive a reply.
31 : : *
32 : : * In 9.5 or before only a single standby could be considered as
33 : : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : : * supported. The number of synchronous standbys that transactions
36 : : * must wait for replies from is specified in synchronous_standby_names.
37 : : * This parameter also specifies a list of standby names and the method
38 : : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : : *
40 : : * The method FIRST specifies a priority-based synchronous replication
41 : : * and makes transaction commits wait until their WAL records are
42 : : * replicated to the requested number of synchronous standbys chosen based
43 : : * on their priorities. The standbys whose names appear earlier in the list
44 : : * are given higher priority and will be considered as synchronous.
45 : : * Other standby servers appearing later in this list represent potential
46 : : * synchronous standbys. If any of the current synchronous standbys
47 : : * disconnects for whatever reason, it will be replaced immediately with
48 : : * the next-highest-priority standby.
49 : : *
50 : : * The method ANY specifies a quorum-based synchronous replication
51 : : * and makes transaction commits wait until their WAL records are
52 : : * replicated to at least the requested number of synchronous standbys
53 : : * in the list. All the standbys appearing in the list are considered as
54 : : * candidates for quorum synchronous standbys.
55 : : *
56 : : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : : * This is for backward compatibility with 9.6 or before where only a
58 : : * priority-based sync replication was supported.
59 : : *
60 : : * Before the standbys chosen from synchronous_standby_names can
61 : : * become the synchronous standbys they must have caught up with
62 : : * the primary; that may take some time. Once caught up,
63 : : * the standbys which are considered as synchronous at that moment
64 : : * will release waiters from the queue.
65 : : *
66 : : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
67 : : *
68 : : * IDENTIFICATION
69 : : * src/backend/replication/syncrep.c
70 : : *
71 : : *-------------------------------------------------------------------------
72 : : */
73 : : #include "postgres.h"
74 : :
75 : : #include <unistd.h>
76 : :
77 : : #include "access/xact.h"
78 : : #include "common/int.h"
79 : : #include "miscadmin.h"
80 : : #include "pgstat.h"
81 : : #include "replication/syncrep.h"
82 : : #include "replication/walsender.h"
83 : : #include "replication/walsender_private.h"
84 : : #include "storage/proc.h"
85 : : #include "tcop/tcopprot.h"
86 : : #include "utils/guc_hooks.h"
87 : : #include "utils/ps_status.h"
88 : :
89 : : /* User-settable parameters for sync rep */
90 : : char *SyncRepStandbyNames;
91 : :
92 : : #define SyncStandbysDefined() \
93 : : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94 : :
95 : : static bool announce_next_takeover = true;
96 : :
97 : : SyncRepConfigData *SyncRepConfig = NULL;
98 : : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99 : :
100 : : static void SyncRepQueueInsert(int mode);
101 : : static void SyncRepCancelWait(void);
102 : : static int SyncRepWakeQueue(bool all, int mode);
103 : :
104 : : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 : : XLogRecPtr *flushPtr,
106 : : XLogRecPtr *applyPtr,
107 : : bool *am_sync);
108 : : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 : : XLogRecPtr *flushPtr,
110 : : XLogRecPtr *applyPtr,
111 : : SyncRepStandbyData *sync_standbys,
112 : : int num_standbys);
113 : : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 : : XLogRecPtr *flushPtr,
115 : : XLogRecPtr *applyPtr,
116 : : SyncRepStandbyData *sync_standbys,
117 : : int num_standbys,
118 : : uint8 nth);
119 : : static int SyncRepGetStandbyPriority(void);
120 : : static int standby_priority_comparator(const void *a, const void *b);
121 : : static int cmp_lsn(const void *a, const void *b);
122 : :
123 : : #ifdef USE_ASSERT_CHECKING
124 : : static bool SyncRepQueueIsOrderedByLSN(int mode);
125 : : #endif
126 : :
127 : : /*
128 : : * ===========================================================
129 : : * Synchronous Replication functions for normal user backends
130 : : * ===========================================================
131 : : */
132 : :
133 : : /*
134 : : * Wait for synchronous replication, if requested by user.
135 : : *
136 : : * Initially backends start in state SYNC_REP_NOT_WAITING and then
137 : : * change that state to SYNC_REP_WAITING before adding ourselves
138 : : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139 : : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140 : : * This backend then resets its state to SYNC_REP_NOT_WAITING.
141 : : *
142 : : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
143 : : * represents a commit record. If it doesn't, then we wait only for the WAL
144 : : * to be flushed if synchronous_commit is set to the higher level of
145 : : * remote_apply, because only commit records provide apply feedback.
146 : : */
147 : : void
3549 rhaas@postgresql.org 148 :CBC 125486 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 : : {
150 : : int mode;
151 : :
152 : : /*
153 : : * This should be called while holding interrupts during a transaction
154 : : * commit to prevent the follow-up shared memory queue cleanups to be
155 : : * influenced by external interruptions.
156 : : */
2237 michael@paquier.xyz 157 [ - + ]: 125486 : Assert(InterruptHoldoffCount > 0);
158 : :
159 : : /*
160 : : * Fast exit if user has not requested sync replication, or there are no
161 : : * sync replication standby names defined.
162 : : *
163 : : * Since this routine gets called every commit time, it's important to
164 : : * exit quickly if sync replication is not requested.
165 : : *
166 : : * We check WalSndCtl->sync_standbys_status flag without the lock and exit
167 : : * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
168 : : * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
169 : : * replication requested).
170 : : *
171 : : * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
172 : : * while holding the lock, to check the flag and operate the sync rep
173 : : * queue atomically. This is necessary to avoid the race condition
174 : : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
175 : : * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
176 : : * don't touch the queue.
177 : : */
1931 fujii@postgresql.org 178 [ + + + + ]: 125486 : if (!SyncRepRequested() ||
249 michael@paquier.xyz 179 [ + + ]: 95306 : ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
180 : : (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
1931 fujii@postgresql.org 181 : 88498 : return;
182 : :
183 : : /* Cap the level for anything other than commit to remote flush only. */
3549 rhaas@postgresql.org 184 [ + + ]: 36988 : if (commit)
185 : 36971 : mode = SyncRepWaitMode;
186 : : else
187 : 17 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
188 : :
1063 andres@anarazel.de 189 [ - + ]: 36988 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
5388 rhaas@postgresql.org 190 [ - + ]: 36988 : Assert(WalSndCtl != NULL);
191 : :
192 : 36988 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
193 [ - + ]: 36988 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
194 : :
195 : : /*
196 : : * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
197 : : * SyncRepUpdateSyncStandbysDefined().
198 : : *
199 : : * Also check that the standby hasn't already replied. Unlikely race
200 : : * condition but we'll be fetching that cache line anyway so it's likely
201 : : * to be a low cost check.
202 : : *
203 : : * If the sync standby data has not been initialized yet
204 : : * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
205 : : * then do a direct GUC check.
206 : : */
249 michael@paquier.xyz 207 [ + + ]: 36988 : if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
208 : : {
209 [ + - ]: 39 : if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
210 [ + + ]: 39 : lsn <= WalSndCtl->lsn[mode])
211 : : {
212 : 1 : LWLockRelease(SyncRepLock);
213 : 1 : return;
214 : : }
215 : : }
216 [ - + ]: 36949 : else if (lsn <= WalSndCtl->lsn[mode])
217 : : {
218 : : /*
219 : : * The LSN is older than what we need to wait for. The sync standby
220 : : * data has not been initialized yet, but we are OK to not wait
221 : : * because we know that there is no point in doing so based on the
222 : : * LSN.
223 : : */
249 michael@paquier.xyz 224 :UBC 0 : LWLockRelease(SyncRepLock);
225 : 0 : return;
226 : : }
249 michael@paquier.xyz 227 [ + - + - ]:CBC 36949 : else if (!SyncStandbysDefined())
228 : : {
229 : : /*
230 : : * If we are here, the sync standby data has not been initialized yet,
231 : : * and the LSN is newer than what need to wait for, so we have fallen
232 : : * back to the best thing we could do in this case: a check on
233 : : * SyncStandbysDefined() to see if the GUC is set or not.
234 : : *
235 : : * When the GUC has a value, we wait until the checkpointer updates
236 : : * the status data because we cannot be sure yet if we should wait or
237 : : * not. Here, the GUC has *no* value, we are sure that there is no
238 : : * point to wait; this matters for example when initializing a
239 : : * cluster, where we should never wait, and no sync standbys is the
240 : : * default behavior.
241 : : */
5388 rhaas@postgresql.org 242 : 36949 : LWLockRelease(SyncRepLock);
243 : 36949 : return;
244 : : }
245 : :
246 : : /*
247 : : * Set our waitLSN so WALSender will know when to wake us, and add
248 : : * ourselves to the queue.
249 : : */
3549 250 : 38 : MyProc->waitLSN = lsn;
5388 251 : 38 : MyProc->syncRepState = SYNC_REP_WAITING;
5069 simon@2ndQuadrant.co 252 : 38 : SyncRepQueueInsert(mode);
253 [ - + ]: 38 : Assert(SyncRepQueueIsOrderedByLSN(mode));
5388 rhaas@postgresql.org 254 : 38 : LWLockRelease(SyncRepLock);
255 : :
256 : : /* Alter ps display to show waiting for sync rep. */
257 [ + - ]: 38 : if (update_process_title)
258 : : {
259 : : char buffer[32];
260 : :
162 alvherre@kurilemu.de 261 :GNC 38 : sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
1030 drowley@postgresql.o 262 :CBC 38 : set_ps_display_suffix(buffer);
263 : : }
264 : :
265 : : /*
266 : : * Wait for specified LSN to be confirmed.
267 : : *
268 : : * Each proc has its own wait latch, so we perform a normal latch
269 : : * check/wait loop here.
270 : : */
271 : : for (;;)
5399 simon@2ndQuadrant.co 272 : 38 : {
273 : : int rc;
274 : :
275 : : /* Must reset the latch before testing state. */
3989 andres@anarazel.de 276 : 76 : ResetLatch(MyLatch);
277 : :
278 : : /*
279 : : * Acquiring the lock is not needed, the latch ensures proper
280 : : * barriers. If it looks like we're done, we must really be done,
281 : : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
282 : : * it will never update it again, so we can't be seeing a stale value
283 : : * in that case.
284 : : */
3413 simon@2ndQuadrant.co 285 [ + + ]: 76 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
5388 rhaas@postgresql.org 286 : 38 : break;
287 : :
288 : : /*
289 : : * If a wait for synchronous replication is pending, we can neither
290 : : * acknowledge the commit nor raise ERROR or FATAL. The latter would
291 : : * lead the client to believe that the transaction aborted, which is
292 : : * not true: it's already committed locally. The former is no good
293 : : * either: the client has requested synchronous replication, and is
294 : : * entitled to assume that an acknowledged commit is also replicated,
295 : : * which might not be true. So in this case we issue a WARNING (which
296 : : * some clients may be able to interpret) and shut off further output.
297 : : * We do NOT reset ProcDiePending, so that the process will die after
298 : : * the commit is cleaned up.
299 : : */
300 [ - + ]: 38 : if (ProcDiePending)
301 : : {
5388 rhaas@postgresql.org 302 [ # # ]:UBC 0 : ereport(WARNING,
303 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
304 : : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
305 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
306 : 0 : whereToSendOutput = DestNone;
307 : 0 : SyncRepCancelWait();
308 : 0 : break;
309 : : }
310 : :
311 : : /*
312 : : * It's unclear what to do if a query cancel interrupt arrives. We
313 : : * can't actually abort at this point, but ignoring the interrupt
314 : : * altogether is not helpful, so we just terminate the wait with a
315 : : * suitable warning.
316 : : */
5388 rhaas@postgresql.org 317 [ - + ]:CBC 38 : if (QueryCancelPending)
318 : : {
5388 rhaas@postgresql.org 319 :UBC 0 : QueryCancelPending = false;
320 [ # # ]: 0 : ereport(WARNING,
321 : : (errmsg("canceling wait for synchronous replication due to user request"),
322 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
323 : 0 : SyncRepCancelWait();
324 : 0 : break;
325 : : }
326 : :
327 : : /*
328 : : * Wait on latch. Any condition that should wake us up will set the
329 : : * latch, so no need for timeout.
330 : : */
2580 tmunro@postgresql.or 331 :CBC 38 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
332 : : WAIT_EVENT_SYNC_REP);
333 : :
334 : : /*
335 : : * If the postmaster dies, we'll probably never get an acknowledgment,
336 : : * because all the wal sender processes will exit. So just bail out.
337 : : */
338 [ - + ]: 38 : if (rc & WL_POSTMASTER_DEATH)
339 : : {
5388 rhaas@postgresql.org 340 :UBC 0 : ProcDiePending = true;
341 : 0 : whereToSendOutput = DestNone;
342 : 0 : SyncRepCancelWait();
343 : 0 : break;
344 : : }
345 : : }
346 : :
347 : : /*
348 : : * WalSender has checked our LSN and has removed us from queue. Clean up
349 : : * state and leave. It's OK to reset these shared memory fields without
350 : : * holding SyncRepLock, because any walsenders will ignore us anyway when
351 : : * we're not on the queue. We need a read barrier to make sure we see the
352 : : * changes to the queue link (this might be unnecessary without
353 : : * assertions, but better safe than sorry).
354 : : */
3079 heikki.linnakangas@i 355 :CBC 38 : pg_read_barrier();
1063 andres@anarazel.de 356 [ - + ]: 38 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
5388 rhaas@postgresql.org 357 : 38 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
4923 heikki.linnakangas@i 358 : 38 : MyProc->waitLSN = 0;
359 : :
360 : : /* reset ps display to remove the suffix */
1030 drowley@postgresql.o 361 [ + - ]: 38 : if (update_process_title)
362 : 38 : set_ps_display_remove_suffix();
363 : : }
364 : :
365 : : /*
366 : : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
367 : : *
368 : : * Usually we will go at tail of queue, though it's possible that we arrive
369 : : * here out of order, so start at tail and work back to insertion point.
370 : : */
371 : : static void
5075 simon@2ndQuadrant.co 372 : 38 : SyncRepQueueInsert(int mode)
373 : : {
374 : : dlist_head *queue;
375 : : dlist_iter iter;
376 : :
377 [ + - - + ]: 38 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1063 andres@anarazel.de 378 : 38 : queue = &WalSndCtl->SyncRepQueue[mode];
379 : :
380 [ + - - + ]: 38 : dlist_reverse_foreach(iter, queue)
381 : : {
1063 andres@anarazel.de 382 :UBC 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
383 : :
384 : : /*
385 : : * Stop at the queue element that we should insert after to ensure the
386 : : * queue is ordered by LSN.
387 : : */
4736 alvherre@alvh.no-ip. 388 [ # # ]: 0 : if (proc->waitLSN < MyProc->waitLSN)
389 : : {
1063 andres@anarazel.de 390 : 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
391 : 0 : return;
392 : : }
393 : : }
394 : :
395 : : /*
396 : : * If we get here, the list was either empty, or this process needs to be
397 : : * at the head.
398 : : */
1063 andres@anarazel.de 399 :CBC 38 : dlist_push_head(queue, &MyProc->syncRepLinks);
400 : : }
401 : :
402 : : /*
403 : : * Acquire SyncRepLock and cancel any wait currently in progress.
404 : : */
405 : : static void
5388 rhaas@postgresql.org 406 :UBC 0 : SyncRepCancelWait(void)
407 : : {
408 : 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1063 andres@anarazel.de 409 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
410 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
5388 rhaas@postgresql.org 411 : 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
412 : 0 : LWLockRelease(SyncRepLock);
413 : 0 : }
414 : :
415 : : void
5242 tgl@sss.pgh.pa.us 416 :CBC 15451 : SyncRepCleanupAtProcExit(void)
417 : : {
418 : : /*
419 : : * First check if we are removed from the queue without the lock to not
420 : : * slow down backend exit.
421 : : */
1063 andres@anarazel.de 422 [ - + ]: 15451 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
423 : : {
5399 simon@2ndQuadrant.co 424 :UBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
425 : :
426 : : /* maybe we have just been removed, so recheck */
1063 andres@anarazel.de 427 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
428 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
429 : :
5399 simon@2ndQuadrant.co 430 : 0 : LWLockRelease(SyncRepLock);
431 : : }
5399 simon@2ndQuadrant.co 432 :CBC 15451 : }
433 : :
434 : : /*
435 : : * ===========================================================
436 : : * Synchronous Replication functions for wal sender processes
437 : : * ===========================================================
438 : : */
439 : :
440 : : /*
441 : : * Take any action required to initialise sync rep state from config
442 : : * data. Called at WALSender startup and after each SIGHUP.
443 : : */
444 : : void
445 : 687 : SyncRepInitConfig(void)
446 : : {
447 : : int priority;
448 : :
449 : : /*
450 : : * Determine if we are a potential sync standby and remember the result
451 : : * for handling replies from standby.
452 : : */
453 : 687 : priority = SyncRepGetStandbyPriority();
454 [ + + ]: 687 : if (MyWalSnd->sync_standby_priority != priority)
455 : : {
2068 tgl@sss.pgh.pa.us 456 [ - + ]: 17 : SpinLockAcquire(&MyWalSnd->mutex);
5399 simon@2ndQuadrant.co 457 : 17 : MyWalSnd->sync_standby_priority = priority;
2068 tgl@sss.pgh.pa.us 458 : 17 : SpinLockRelease(&MyWalSnd->mutex);
459 : :
5399 simon@2ndQuadrant.co 460 [ - + ]: 17 : ereport(DEBUG1,
461 : : (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
462 : : application_name, priority)));
463 : : }
464 : 687 : }
465 : :
466 : : /*
467 : : * Update the LSNs on each queue based upon our latest state. This
468 : : * implements a simple policy of first-valid-sync-standby-releases-waiter.
469 : : *
470 : : * Other policies are possible, which would change what we do here and
471 : : * perhaps also which information we store as well.
472 : : */
473 : : void
474 : 101401 : SyncRepReleaseWaiters(void)
475 : : {
476 : 101401 : volatile WalSndCtlData *walsndctl = WalSndCtl;
477 : : XLogRecPtr writePtr;
478 : : XLogRecPtr flushPtr;
479 : : XLogRecPtr applyPtr;
480 : : bool got_recptr;
481 : : bool am_sync;
5075 482 : 101401 : int numwrite = 0;
483 : 101401 : int numflush = 0;
3549 rhaas@postgresql.org 484 : 101401 : int numapply = 0;
485 : :
486 : : /*
487 : : * If this WALSender is serving a standby that is not on the list of
488 : : * potential sync standbys then we have nothing to do. If we are still
489 : : * starting up, still running base backup or the current flush position is
490 : : * still invalid, then leave quickly also. Streaming or stopping WAL
491 : : * senders are allowed to release waiters.
492 : : */
5399 simon@2ndQuadrant.co 493 [ + + ]: 101401 : if (MyWalSnd->sync_standby_priority == 0 ||
2574 michael@paquier.xyz 494 [ + + ]: 132 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
495 [ + + ]: 30 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
40 alvherre@kurilemu.de 496 [ - + ]:GNC 122 : !XLogRecPtrIsValid(MyWalSnd->flush))
497 : : {
3541 fujii@postgresql.org 498 :CBC 101279 : announce_next_takeover = true;
5399 simon@2ndQuadrant.co 499 : 101280 : return;
500 : : }
501 : :
502 : : /*
503 : : * We're a potential sync standby. Release waiters if there are enough
504 : : * sync standbys and we are considered as sync.
505 : : */
506 : 122 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
507 : :
508 : : /*
509 : : * Check whether we are a sync standby or not, and calculate the synced
510 : : * positions among all sync standbys. (Note: although this step does not
511 : : * of itself require holding SyncRepLock, it seems like a good idea to do
512 : : * it after acquiring the lock. This ensures that the WAL pointers we use
513 : : * to release waiters are newer than any previous execution of this
514 : : * routine used.)
515 : : */
3284 fujii@postgresql.org 516 : 122 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
517 : :
518 : : /*
519 : : * If we are managing a sync standby, though we weren't prior to this,
520 : : * then announce we are now a sync standby.
521 : : */
3541 522 [ + + + + ]: 122 : if (announce_next_takeover && am_sync)
523 : : {
524 : 11 : announce_next_takeover = false;
525 : :
3284 526 [ + - ]: 11 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
527 [ + - ]: 11 : ereport(LOG,
528 : : (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
529 : : application_name, MyWalSnd->sync_standby_priority)));
530 : : else
3284 fujii@postgresql.org 531 [ # # ]:UBC 0 : ereport(LOG,
532 : : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
533 : : application_name)));
534 : : }
535 : :
536 : : /*
537 : : * If the number of sync standbys is less than requested or we aren't
538 : : * managing a sync standby then just leave.
539 : : */
3284 fujii@postgresql.org 540 [ + + - + ]:CBC 122 : if (!got_recptr || !am_sync)
541 : : {
5399 simon@2ndQuadrant.co 542 : 1 : LWLockRelease(SyncRepLock);
3541 fujii@postgresql.org 543 : 1 : announce_next_takeover = !am_sync;
5399 simon@2ndQuadrant.co 544 : 1 : return;
545 : : }
546 : :
547 : : /*
548 : : * Set the lsn first so that when we wake backends they will release up to
549 : : * this location.
550 : : */
3541 fujii@postgresql.org 551 [ + + ]: 121 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
552 : : {
553 : 45 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
5075 simon@2ndQuadrant.co 554 : 45 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
555 : : }
3541 fujii@postgresql.org 556 [ + + ]: 121 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
557 : : {
558 : 49 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
5075 simon@2ndQuadrant.co 559 : 49 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
560 : : }
3541 fujii@postgresql.org 561 [ + + ]: 121 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
562 : : {
563 : 45 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
3549 rhaas@postgresql.org 564 : 45 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
565 : : }
566 : :
5399 simon@2ndQuadrant.co 567 : 121 : LWLockRelease(SyncRepLock);
568 : :
162 alvherre@kurilemu.de 569 [ - + ]:GNC 121 : elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
570 : : numwrite, LSN_FORMAT_ARGS(writePtr),
571 : : numflush, LSN_FORMAT_ARGS(flushPtr),
572 : : numapply, LSN_FORMAT_ARGS(applyPtr));
573 : : }
574 : :
575 : : /*
576 : : * Calculate the synced Write, Flush and Apply positions among sync standbys.
577 : : *
578 : : * Return false if the number of sync standbys is less than
579 : : * synchronous_standby_names specifies. Otherwise return true and
580 : : * store the positions into *writePtr, *flushPtr and *applyPtr.
581 : : *
582 : : * On return, *am_sync is set to true if this walsender is connecting to
583 : : * sync standby. Otherwise it's set to false.
584 : : */
585 : : static bool
3284 fujii@postgresql.org 586 :CBC 122 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
587 : : XLogRecPtr *applyPtr, bool *am_sync)
588 : : {
589 : : SyncRepStandbyData *sync_standbys;
590 : : int num_standbys;
591 : : int i;
592 : :
593 : : /* Initialize default results */
3541 594 : 122 : *writePtr = InvalidXLogRecPtr;
595 : 122 : *flushPtr = InvalidXLogRecPtr;
596 : 122 : *applyPtr = InvalidXLogRecPtr;
597 : 122 : *am_sync = false;
598 : :
599 : : /* Quick out if not even configured to be synchronous */
2068 tgl@sss.pgh.pa.us 600 [ - + ]: 122 : if (SyncRepConfig == NULL)
2068 tgl@sss.pgh.pa.us 601 :UBC 0 : return false;
602 : :
603 : : /* Get standbys that are considered as synchronous at this moment */
2068 tgl@sss.pgh.pa.us 604 :CBC 122 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
605 : :
606 : : /* Am I among the candidate sync standbys? */
607 [ + + ]: 123 : for (i = 0; i < num_standbys; i++)
608 : : {
609 [ + + ]: 122 : if (sync_standbys[i].is_me)
610 : : {
611 : 121 : *am_sync = true;
612 : 121 : break;
613 : : }
614 : : }
615 : :
616 : : /*
617 : : * Nothing more to do if we are not managing a sync standby or there are
618 : : * not enough synchronous standbys.
619 : : */
3520 620 [ + + ]: 122 : if (!(*am_sync) ||
2068 621 [ - + ]: 121 : num_standbys < SyncRepConfig->num_sync)
622 : : {
623 : 1 : pfree(sync_standbys);
3541 fujii@postgresql.org 624 : 1 : return false;
625 : : }
626 : :
627 : : /*
628 : : * In a priority-based sync replication, the synced positions are the
629 : : * oldest ones among sync standbys. In a quorum-based, they are the Nth
630 : : * latest ones.
631 : : *
632 : : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
633 : : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
634 : : * because it's a bit more efficient.
635 : : *
636 : : * XXX If the numbers of current and requested sync standbys are the same,
637 : : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
638 : : * positions even in a quorum-based sync replication.
639 : : */
3284 640 [ + - ]: 121 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
641 : : {
642 : 121 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
643 : : sync_standbys, num_standbys);
644 : : }
645 : : else
646 : : {
3284 fujii@postgresql.org 647 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
648 : : sync_standbys, num_standbys,
2068 tgl@sss.pgh.pa.us 649 : 0 : SyncRepConfig->num_sync);
650 : : }
651 : :
2068 tgl@sss.pgh.pa.us 652 :CBC 121 : pfree(sync_standbys);
3284 fujii@postgresql.org 653 : 121 : return true;
654 : : }
655 : :
656 : : /*
657 : : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
658 : : */
659 : : static void
2068 tgl@sss.pgh.pa.us 660 : 121 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
661 : : XLogRecPtr *flushPtr,
662 : : XLogRecPtr *applyPtr,
663 : : SyncRepStandbyData *sync_standbys,
664 : : int num_standbys)
665 : : {
666 : : int i;
667 : :
668 : : /*
669 : : * Scan through all sync standbys and calculate the oldest Write, Flush
670 : : * and Apply positions. We assume *writePtr et al were initialized to
671 : : * InvalidXLogRecPtr.
672 : : */
673 [ + + ]: 242 : for (i = 0; i < num_standbys; i++)
674 : : {
675 : 121 : XLogRecPtr write = sync_standbys[i].write;
676 : 121 : XLogRecPtr flush = sync_standbys[i].flush;
677 : 121 : XLogRecPtr apply = sync_standbys[i].apply;
678 : :
40 alvherre@kurilemu.de 679 [ - + - - ]:GNC 121 : if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
3541 fujii@postgresql.org 680 :CBC 121 : *writePtr = write;
40 alvherre@kurilemu.de 681 [ - + - - ]:GNC 121 : if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
3541 fujii@postgresql.org 682 :CBC 121 : *flushPtr = flush;
40 alvherre@kurilemu.de 683 [ - + - - ]:GNC 121 : if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
3541 fujii@postgresql.org 684 :CBC 121 : *applyPtr = apply;
685 : : }
3284 686 : 121 : }
687 : :
688 : : /*
689 : : * Calculate the Nth latest Write, Flush and Apply positions among sync
690 : : * standbys.
691 : : */
692 : : static void
2068 tgl@sss.pgh.pa.us 693 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
694 : : XLogRecPtr *flushPtr,
695 : : XLogRecPtr *applyPtr,
696 : : SyncRepStandbyData *sync_standbys,
697 : : int num_standbys,
698 : : uint8 nth)
699 : : {
700 : : XLogRecPtr *write_array;
701 : : XLogRecPtr *flush_array;
702 : : XLogRecPtr *apply_array;
703 : : int i;
704 : :
705 : : /* Should have enough candidates, or somebody messed up */
706 [ # # # # ]: 0 : Assert(nth > 0 && nth <= num_standbys);
707 : :
6 michael@paquier.xyz 708 :UNC 0 : write_array = palloc_array(XLogRecPtr, num_standbys);
709 : 0 : flush_array = palloc_array(XLogRecPtr, num_standbys);
710 : 0 : apply_array = palloc_array(XLogRecPtr, num_standbys);
711 : :
2068 tgl@sss.pgh.pa.us 712 [ # # ]:UBC 0 : for (i = 0; i < num_standbys; i++)
713 : : {
714 : 0 : write_array[i] = sync_standbys[i].write;
715 : 0 : flush_array[i] = sync_standbys[i].flush;
716 : 0 : apply_array[i] = sync_standbys[i].apply;
717 : : }
718 : :
719 : : /* Sort each array in descending order */
720 : 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
721 : 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
722 : 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
723 : :
724 : : /* Get Nth latest Write, Flush, Apply positions */
3284 fujii@postgresql.org 725 : 0 : *writePtr = write_array[nth - 1];
726 : 0 : *flushPtr = flush_array[nth - 1];
727 : 0 : *applyPtr = apply_array[nth - 1];
728 : :
729 : 0 : pfree(write_array);
730 : 0 : pfree(flush_array);
731 : 0 : pfree(apply_array);
732 : 0 : }
733 : :
734 : : /*
735 : : * Compare lsn in order to sort array in descending order.
736 : : */
737 : : static int
738 : 0 : cmp_lsn(const void *a, const void *b)
739 : : {
3135 bruce@momjian.us 740 : 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
741 : 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
742 : :
669 nathan@postgresql.or 743 : 0 : return pg_cmp_u64(lsn2, lsn1);
744 : : }
745 : :
746 : : /*
747 : : * Return data about walsenders that are candidates to be sync standbys.
748 : : *
749 : : * *standbys is set to a palloc'd array of structs of per-walsender data,
750 : : * and the number of valid entries (candidate sync senders) is returned.
751 : : * (This might be more or fewer than num_sync; caller must check.)
752 : : */
753 : : int
2068 tgl@sss.pgh.pa.us 754 :CBC 810 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
755 : : {
756 : : int i;
757 : : int n;
758 : :
759 : : /* Create result array */
6 michael@paquier.xyz 760 :GNC 810 : *standbys = palloc_array(SyncRepStandbyData, max_wal_senders);
761 : :
762 : : /* Quick exit if sync replication is not requested */
3284 fujii@postgresql.org 763 [ + + ]:CBC 810 : if (SyncRepConfig == NULL)
2068 tgl@sss.pgh.pa.us 764 : 671 : return 0;
765 : :
766 : : /* Collect raw data from shared memory */
767 : 139 : n = 0;
3284 fujii@postgresql.org 768 [ + + ]: 1529 : for (i = 0; i < max_wal_senders; i++)
769 : : {
770 : : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
771 : : * rearrangement */
772 : : SyncRepStandbyData *stby;
773 : : WalSndState state; /* not included in SyncRepStandbyData */
774 : :
775 : 1390 : walsnd = &WalSndCtl->walsnds[i];
2068 tgl@sss.pgh.pa.us 776 : 1390 : stby = *standbys + n;
777 : :
3091 alvherre@alvh.no-ip. 778 [ - + ]: 1390 : SpinLockAcquire(&walsnd->mutex);
2068 tgl@sss.pgh.pa.us 779 : 1390 : stby->pid = walsnd->pid;
3091 alvherre@alvh.no-ip. 780 : 1390 : state = walsnd->state;
2068 tgl@sss.pgh.pa.us 781 : 1390 : stby->write = walsnd->write;
782 : 1390 : stby->flush = walsnd->flush;
783 : 1390 : stby->apply = walsnd->apply;
784 : 1390 : stby->sync_standby_priority = walsnd->sync_standby_priority;
3091 alvherre@alvh.no-ip. 785 : 1390 : SpinLockRelease(&walsnd->mutex);
786 : :
787 : : /* Must be active */
2068 tgl@sss.pgh.pa.us 788 [ + + ]: 1390 : if (stby->pid == 0)
3284 fujii@postgresql.org 789 : 1222 : continue;
790 : :
791 : : /* Must be streaming or stopping */
2574 michael@paquier.xyz 792 [ + + - + ]: 168 : if (state != WALSNDSTATE_STREAMING &&
793 : : state != WALSNDSTATE_STOPPING)
3284 fujii@postgresql.org 794 :UBC 0 : continue;
795 : :
796 : : /* Must be synchronous */
2068 tgl@sss.pgh.pa.us 797 [ + + ]:CBC 168 : if (stby->sync_standby_priority == 0)
3284 fujii@postgresql.org 798 : 7 : continue;
799 : :
800 : : /* Must have a valid flush position */
40 alvherre@kurilemu.de 801 [ - + ]:GNC 161 : if (!XLogRecPtrIsValid(stby->flush))
3284 fujii@postgresql.org 802 :UBC 0 : continue;
803 : :
804 : : /* OK, it's a candidate */
2068 tgl@sss.pgh.pa.us 805 :CBC 161 : stby->walsnd_index = i;
806 : 161 : stby->is_me = (walsnd == MyWalSnd);
807 : 161 : n++;
808 : : }
809 : :
810 : : /*
811 : : * In quorum mode, we return all the candidates. In priority mode, if we
812 : : * have too many candidates then return only the num_sync ones of highest
813 : : * priority.
814 : : */
815 [ + + ]: 139 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
816 [ + + ]: 138 : n > SyncRepConfig->num_sync)
817 : : {
818 : : /* Sort by priority ... */
819 : 9 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
820 : : standby_priority_comparator);
821 : : /* ... then report just the first num_sync ones */
822 : 9 : n = SyncRepConfig->num_sync;
823 : : }
824 : :
825 : 139 : return n;
826 : : }
827 : :
828 : : /*
829 : : * qsort comparator to sort SyncRepStandbyData entries by priority
830 : : */
831 : : static int
832 : 20 : standby_priority_comparator(const void *a, const void *b)
833 : : {
834 : 20 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
835 : 20 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
836 : :
837 : : /* First, sort by increasing priority value */
838 [ + + ]: 20 : if (sa->sync_standby_priority != sb->sync_standby_priority)
839 : 9 : return sa->sync_standby_priority - sb->sync_standby_priority;
840 : :
841 : : /*
842 : : * We might have equal priority values; arbitrarily break ties by position
843 : : * in the WalSnd array. (This is utterly bogus, since that is arrival
844 : : * order dependent, but there are regression tests that rely on it.)
845 : : */
846 : 11 : return sa->walsnd_index - sb->walsnd_index;
847 : : }
848 : :
849 : :
850 : : /*
851 : : * Check if we are in the list of sync standbys, and if so, determine
852 : : * priority sequence. Return priority if set, or zero to indicate that
853 : : * we are not a potential sync standby.
854 : : *
855 : : * Compare the parameter SyncRepStandbyNames against the application_name
856 : : * for this WALSender, or allow any name if we find a wildcard "*".
857 : : */
858 : : static int
5399 simon@2ndQuadrant.co 859 : 687 : SyncRepGetStandbyPriority(void)
860 : : {
861 : : const char *standby_name;
862 : : int priority;
863 : 687 : bool found = false;
864 : :
865 : : /*
866 : : * Since synchronous cascade replication is not allowed, we always set the
867 : : * priority of cascading walsender to zero.
868 : : */
5264 869 [ + + ]: 687 : if (am_cascading_walsender)
870 : 13 : return 0;
871 : :
3520 tgl@sss.pgh.pa.us 872 [ + - + + : 674 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
- + ]
5399 simon@2ndQuadrant.co 873 : 650 : return 0;
874 : :
3520 tgl@sss.pgh.pa.us 875 : 24 : standby_name = SyncRepConfig->member_names;
876 [ + + ]: 32 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
877 : : {
5399 simon@2ndQuadrant.co 878 [ + + ]: 31 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
3520 tgl@sss.pgh.pa.us 879 [ + + ]: 18 : strcmp(standby_name, "*") == 0)
880 : : {
5399 simon@2ndQuadrant.co 881 : 23 : found = true;
882 : 23 : break;
883 : : }
3520 tgl@sss.pgh.pa.us 884 : 8 : standby_name += strlen(standby_name) + 1;
885 : : }
886 : :
3156 fujii@postgresql.org 887 [ + + ]: 24 : if (!found)
888 : 1 : return 0;
889 : :
890 : : /*
891 : : * In quorum-based sync replication, all the standbys in the list have the
892 : : * same priority, one.
893 : : */
894 [ + - ]: 23 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
895 : : }
896 : :
897 : : /*
898 : : * Walk the specified queue from head. Set the state of any backends that
899 : : * need to be woken, remove them from the queue, and then wake them.
900 : : * Pass all = true to wake whole queue; otherwise, just wake up to
901 : : * the walsender's LSN.
902 : : *
903 : : * The caller must hold SyncRepLock in exclusive mode.
904 : : */
905 : : static int
5075 simon@2ndQuadrant.co 906 : 142 : SyncRepWakeQueue(bool all, int mode)
907 : : {
5399 908 : 142 : volatile WalSndCtlData *walsndctl = WalSndCtl;
5364 bruce@momjian.us 909 : 142 : int numprocs = 0;
910 : : dlist_mutable_iter iter;
911 : :
5075 simon@2ndQuadrant.co 912 [ + - - + ]: 142 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
2237 michael@paquier.xyz 913 [ - + ]: 142 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
5075 simon@2ndQuadrant.co 914 [ - + ]: 142 : Assert(SyncRepQueueIsOrderedByLSN(mode));
915 : :
1063 andres@anarazel.de 916 [ + - + + ]: 164 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
917 : : {
942 tgl@sss.pgh.pa.us 918 : 26 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
919 : :
920 : : /*
921 : : * Assume the queue is ordered by LSN
922 : : */
4736 alvherre@alvh.no-ip. 923 [ + - + + ]: 26 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
5399 simon@2ndQuadrant.co 924 : 4 : return numprocs;
925 : :
926 : : /*
927 : : * Remove from queue.
928 : : */
1063 andres@anarazel.de 929 : 22 : dlist_delete_thoroughly(&proc->syncRepLinks);
930 : :
931 : : /*
932 : : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
933 : : * make sure that it sees the queue link being removed before the
934 : : * syncRepState change.
935 : : */
3079 heikki.linnakangas@i 936 : 22 : pg_write_barrier();
937 : :
938 : : /*
939 : : * Set state to complete; see SyncRepWaitForLSN() for discussion of
940 : : * the various states.
941 : : */
1063 andres@anarazel.de 942 : 22 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
943 : :
944 : : /*
945 : : * Wake only when we have set state and removed from queue.
946 : : */
947 : 22 : SetLatch(&(proc->procLatch));
948 : :
5399 simon@2ndQuadrant.co 949 : 22 : numprocs++;
950 : : }
951 : :
952 : 138 : return numprocs;
953 : : }
954 : :
955 : : /*
956 : : * The checkpointer calls this as needed to update the shared
957 : : * sync_standbys_status flag, so that backends don't remain permanently wedged
958 : : * if synchronous_standby_names is unset. It's safe to check the current value
959 : : * without the lock, because it's only ever updated by one process. But we
960 : : * must take the lock to change it.
961 : : */
962 : : void
5388 rhaas@postgresql.org 963 : 580 : SyncRepUpdateSyncStandbysDefined(void)
964 : : {
965 [ + - + + ]: 580 : bool sync_standbys_defined = SyncStandbysDefined();
966 : :
249 michael@paquier.xyz 967 : 580 : if (sync_standbys_defined !=
968 [ + + ]: 580 : ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
969 : : {
5388 rhaas@postgresql.org 970 : 13 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
971 : :
972 : : /*
973 : : * If synchronous_standby_names has been reset to empty, it's futile
974 : : * for backends to continue waiting. Since the user no longer wants
975 : : * synchronous replication, we'd better wake them up.
976 : : */
977 [ + + ]: 13 : if (!sync_standbys_defined)
978 : : {
979 : : int i;
980 : :
5075 simon@2ndQuadrant.co 981 [ + + ]: 4 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
982 : 3 : SyncRepWakeQueue(true, i);
983 : : }
984 : :
985 : : /*
986 : : * Only allow people to join the queue when there are synchronous
987 : : * standbys defined. Without this interlock, there's a race
988 : : * condition: we might wake up all the current waiters; then, some
989 : : * backend that hasn't yet reloaded its config might go to sleep on
990 : : * the queue (and never wake up). This prevents that.
991 : : */
249 michael@paquier.xyz 992 [ + + ]: 13 : WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
993 : : (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
994 : :
995 : 13 : LWLockRelease(SyncRepLock);
996 : : }
997 [ + + ]: 567 : else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
998 : : {
999 : 512 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1000 : :
1001 : : /*
1002 : : * Note that there is no need to wake up the queues here. We would
1003 : : * reach this path only if SyncStandbysDefined() returns false, or it
1004 : : * would mean that some backends are waiting with the GUC set. See
1005 : : * SyncRepWaitForLSN().
1006 : : */
1007 [ + - - + ]: 512 : Assert(!SyncStandbysDefined());
1008 : :
1009 : : /*
1010 : : * Even if there is no sync standby defined, let the readers of this
1011 : : * information know that the sync standby data has been initialized.
1012 : : * This can just be done once, hence the previous check on
1013 : : * SYNC_STANDBY_INIT to avoid useless work.
1014 : : */
1015 : 512 : WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
1016 : :
5388 rhaas@postgresql.org 1017 : 512 : LWLockRelease(SyncRepLock);
1018 : : }
1019 : 580 : }
1020 : :
1021 : : #ifdef USE_ASSERT_CHECKING
1022 : : static bool
5075 simon@2ndQuadrant.co 1023 : 180 : SyncRepQueueIsOrderedByLSN(int mode)
1024 : : {
1025 : : XLogRecPtr lastLSN;
1026 : : dlist_iter iter;
1027 : :
1028 [ + - - + ]: 180 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1029 : :
4923 heikki.linnakangas@i 1030 : 180 : lastLSN = 0;
1031 : :
1063 andres@anarazel.de 1032 [ + - + + ]: 244 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
1033 : : {
1034 : 64 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
1035 : :
1036 : : /*
1037 : : * Check the queue is ordered by LSN and that multiple procs don't
1038 : : * have matching LSNs
1039 : : */
4736 alvherre@alvh.no-ip. 1040 [ - + ]: 64 : if (proc->waitLSN <= lastLSN)
5399 simon@2ndQuadrant.co 1041 :UBC 0 : return false;
1042 : :
5399 simon@2ndQuadrant.co 1043 :CBC 64 : lastLSN = proc->waitLSN;
1044 : : }
1045 : :
1046 : 180 : return true;
1047 : : }
1048 : : #endif
1049 : :
1050 : : /*
1051 : : * ===========================================================
1052 : : * Synchronous Replication functions executed by any process
1053 : : * ===========================================================
1054 : : */
1055 : :
1056 : : bool
5367 tgl@sss.pgh.pa.us 1057 : 1194 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1058 : : {
3541 fujii@postgresql.org 1059 [ + - + + ]: 1194 : if (*newval != NULL && (*newval)[0] != '\0')
5399 simon@2ndQuadrant.co 1060 : 75 : {
1061 : : yyscan_t scanner;
1062 : : int parse_rc;
1063 : : SyncRepConfigData *pconf;
1064 : :
1065 : : /* Result of parsing is returned in one of these two variables */
326 peter@eisentraut.org 1066 : 75 : SyncRepConfigData *syncrep_parse_result = NULL;
1067 : 75 : char *syncrep_parse_error_msg = NULL;
1068 : :
1069 : : /* Parse the synchronous_standby_names string */
379 1070 : 75 : syncrep_scanner_init(*newval, &scanner);
326 1071 : 75 : parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
379 1072 : 75 : syncrep_scanner_finish(scanner);
1073 : :
3520 tgl@sss.pgh.pa.us 1074 [ + - - + ]: 75 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1075 : : {
3541 fujii@postgresql.org 1076 :UBC 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
3520 tgl@sss.pgh.pa.us 1077 [ # # ]: 0 : if (syncrep_parse_error_msg)
1078 : 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1079 : : else
384 alvherre@alvh.no-ip. 1080 : 0 : GUC_check_errdetail("\"%s\" parser failed.",
1081 : : "synchronous_standby_names");
3541 fujii@postgresql.org 1082 : 0 : return false;
1083 : : }
1084 : :
3286 fujii@postgresql.org 1085 [ - + ]:CBC 75 : if (syncrep_parse_result->num_sync <= 0)
1086 : : {
3286 fujii@postgresql.org 1087 :UBC 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1088 : 0 : syncrep_parse_result->num_sync);
1089 : 0 : return false;
1090 : : }
1091 : :
1092 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
1093 : : pconf = (SyncRepConfigData *)
1159 tgl@sss.pgh.pa.us 1094 :CBC 75 : guc_malloc(LOG, syncrep_parse_result->config_size);
3520 1095 [ - + ]: 75 : if (pconf == NULL)
3520 tgl@sss.pgh.pa.us 1096 :UBC 0 : return false;
3520 tgl@sss.pgh.pa.us 1097 :CBC 75 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1098 : :
383 peter@eisentraut.org 1099 : 75 : *extra = pconf;
1100 : :
1101 : : /*
1102 : : * We need not explicitly clean up syncrep_parse_result. It, and any
1103 : : * other cruft generated during parsing, will be freed when the
1104 : : * current memory context is deleted. (This code is generally run in
1105 : : * a short-lived context used for config file processing, so that will
1106 : : * not be very long.)
1107 : : */
1108 : : }
1109 : : else
3520 tgl@sss.pgh.pa.us 1110 : 1119 : *extra = NULL;
1111 : :
5367 1112 : 1194 : return true;
1113 : : }
1114 : :
1115 : : void
3520 1116 : 1184 : assign_synchronous_standby_names(const char *newval, void *extra)
1117 : : {
1118 : 1184 : SyncRepConfig = (SyncRepConfigData *) extra;
1119 : 1184 : }
1120 : :
1121 : : void
5075 simon@2ndQuadrant.co 1122 : 1833 : assign_synchronous_commit(int newval, void *extra)
1123 : : {
1124 [ - + + + ]: 1833 : switch (newval)
1125 : : {
5075 simon@2ndQuadrant.co 1126 :UBC 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1127 : 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1128 : 0 : break;
5075 simon@2ndQuadrant.co 1129 :CBC 1242 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1130 : 1242 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1131 : 1242 : break;
3549 rhaas@postgresql.org 1132 : 2 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1133 : 2 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1134 : 2 : break;
5075 simon@2ndQuadrant.co 1135 : 589 : default:
1136 : 589 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1137 : 589 : break;
1138 : : }
1139 : 1833 : }
|