Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * syncrep.c
4 : : *
5 : : * Synchronous replication is new as of PostgreSQL 9.1.
6 : : *
7 : : * If requested, transaction commits wait until their commit LSN are
8 : : * acknowledged by the synchronous standbys.
9 : : *
10 : : * This module contains the code for waiting and release of backends.
11 : : * All code in this module executes on the primary. The core streaming
12 : : * replication transport remains within WALreceiver/WALsender modules.
13 : : *
14 : : * The essence of this design is that it isolates all logic about
15 : : * waiting/releasing onto the primary. The primary defines which standbys
16 : : * it wishes to wait for. The standbys are completely unaware of the
17 : : * durability requirements of transactions on the primary, reducing the
18 : : * complexity of the code and streamlining both standby operations and
19 : : * network bandwidth because there is no requirement to ship
20 : : * per-transaction state information.
21 : : *
22 : : * Replication is either synchronous or not synchronous (async). If it is
23 : : * async, we just fastpath out of here. If it is sync, then we wait for
24 : : * the write, flush or apply location on the standby before releasing
25 : : * the waiting backend. Further complexity in that interaction is
26 : : * expected in later releases.
27 : : *
28 : : * The best performing way to manage the waiting backends is to have a
29 : : * single ordered queue of waiting backends, so that we can avoid
30 : : * searching the through all waiters each time we receive a reply.
31 : : *
32 : : * In 9.5 or before only a single standby could be considered as
33 : : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : : * supported. The number of synchronous standbys that transactions
36 : : * must wait for replies from is specified in synchronous_standby_names.
37 : : * This parameter also specifies a list of standby names and the method
38 : : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : : *
40 : : * The method FIRST specifies a priority-based synchronous replication
41 : : * and makes transaction commits wait until their WAL records are
42 : : * replicated to the requested number of synchronous standbys chosen based
43 : : * on their priorities. The standbys whose names appear earlier in the list
44 : : * are given higher priority and will be considered as synchronous.
45 : : * Other standby servers appearing later in this list represent potential
46 : : * synchronous standbys. If any of the current synchronous standbys
47 : : * disconnects for whatever reason, it will be replaced immediately with
48 : : * the next-highest-priority standby.
49 : : *
50 : : * The method ANY specifies a quorum-based synchronous replication
51 : : * and makes transaction commits wait until their WAL records are
52 : : * replicated to at least the requested number of synchronous standbys
53 : : * in the list. All the standbys appearing in the list are considered as
54 : : * candidates for quorum synchronous standbys.
55 : : *
56 : : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : : * This is for backward compatibility with 9.6 or before where only a
58 : : * priority-based sync replication was supported.
59 : : *
60 : : * Before the standbys chosen from synchronous_standby_names can
61 : : * become the synchronous standbys they must have caught up with
62 : : * the primary; that may take some time. Once caught up,
63 : : * the standbys which are considered as synchronous at that moment
64 : : * will release waiters from the queue.
65 : : *
66 : : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
67 : : *
68 : : * IDENTIFICATION
69 : : * src/backend/replication/syncrep.c
70 : : *
71 : : *-------------------------------------------------------------------------
72 : : */
73 : : #include "postgres.h"
74 : :
75 : : #include <unistd.h>
76 : :
77 : : #include "access/xact.h"
78 : : #include "common/int.h"
79 : : #include "miscadmin.h"
80 : : #include "pgstat.h"
81 : : #include "replication/syncrep.h"
82 : : #include "replication/walsender.h"
83 : : #include "replication/walsender_private.h"
84 : : #include "storage/proc.h"
85 : : #include "tcop/tcopprot.h"
86 : : #include "utils/guc_hooks.h"
87 : : #include "utils/ps_status.h"
88 : : #include "utils/wait_event.h"
89 : :
90 : : /* User-settable parameters for sync rep */
91 : : char *SyncRepStandbyNames;
92 : :
93 : : #define SyncStandbysDefined() \
94 : : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95 : :
96 : : static bool announce_next_takeover = true;
97 : :
98 : : SyncRepConfigData *SyncRepConfig = NULL;
99 : : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
100 : :
101 : : static void SyncRepQueueInsert(int mode);
102 : : static void SyncRepCancelWait(void);
103 : : static int SyncRepWakeQueue(bool all, int mode);
104 : :
105 : : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
106 : : XLogRecPtr *flushPtr,
107 : : XLogRecPtr *applyPtr,
108 : : bool *am_sync);
109 : : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
110 : : XLogRecPtr *flushPtr,
111 : : XLogRecPtr *applyPtr,
112 : : SyncRepStandbyData *sync_standbys,
113 : : int num_standbys);
114 : : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
115 : : XLogRecPtr *flushPtr,
116 : : XLogRecPtr *applyPtr,
117 : : SyncRepStandbyData *sync_standbys,
118 : : int num_standbys,
119 : : uint8 nth);
120 : : static int SyncRepGetStandbyPriority(void);
121 : : static int standby_priority_comparator(const void *a, const void *b);
122 : : static int cmp_lsn(const void *a, const void *b);
123 : :
124 : : #ifdef USE_ASSERT_CHECKING
125 : : static bool SyncRepQueueIsOrderedByLSN(int mode);
126 : : #endif
127 : :
128 : : /*
129 : : * ===========================================================
130 : : * Synchronous Replication functions for normal user backends
131 : : * ===========================================================
132 : : */
133 : :
134 : : /*
135 : : * Wait for synchronous replication, if requested by user.
136 : : *
137 : : * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 : : * change that state to SYNC_REP_WAITING before adding ourselves
139 : : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 : : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 : : * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 : : *
143 : : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 : : * represents a commit record. If it doesn't, then we wait only for the WAL
145 : : * to be flushed if synchronous_commit is set to the higher level of
146 : : * remote_apply, because only commit records provide apply feedback.
147 : : */
148 : : void
3689 rhaas@postgresql.org 149 :CBC 153127 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
150 : : {
151 : : int mode;
152 : :
153 : : /*
154 : : * This should be called while holding interrupts during a transaction
155 : : * commit to prevent the follow-up shared memory queue cleanups to be
156 : : * influenced by external interruptions.
157 : : */
2377 michael@paquier.xyz 158 [ - + ]: 153127 : Assert(InterruptHoldoffCount > 0);
159 : :
160 : : /*
161 : : * Fast exit if user has not requested sync replication, or there are no
162 : : * sync replication standby names defined.
163 : : *
164 : : * Since this routine gets called every commit time, it's important to
165 : : * exit quickly if sync replication is not requested.
166 : : *
167 : : * We check WalSndCtl->sync_standbys_status flag without the lock and exit
168 : : * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
169 : : * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
170 : : * replication requested).
171 : : *
172 : : * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
173 : : * while holding the lock, to check the flag and operate the sync rep
174 : : * queue atomically. This is necessary to avoid the race condition
175 : : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
176 : : * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
177 : : * don't touch the queue.
178 : : */
2071 fujii@postgresql.org 179 [ + + + + ]: 153127 : if (!SyncRepRequested() ||
389 michael@paquier.xyz 180 [ + + ]: 99968 : ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
181 : : (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
2071 fujii@postgresql.org 182 : 115134 : return;
183 : :
184 : : /* Cap the level for anything other than commit to remote flush only. */
3689 rhaas@postgresql.org 185 [ + + ]: 37993 : if (commit)
186 : 37975 : mode = SyncRepWaitMode;
187 : : else
188 : 18 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
189 : :
1203 andres@anarazel.de 190 [ - + ]: 37993 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
5528 rhaas@postgresql.org 191 [ - + ]: 37993 : Assert(WalSndCtl != NULL);
192 : :
193 : 37993 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
194 [ - + ]: 37993 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
195 : :
196 : : /*
197 : : * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
198 : : * SyncRepUpdateSyncStandbysDefined().
199 : : *
200 : : * Also check that the standby hasn't already replied. Unlikely race
201 : : * condition but we'll be fetching that cache line anyway so it's likely
202 : : * to be a low cost check.
203 : : *
204 : : * If the sync standby data has not been initialized yet
205 : : * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
206 : : * then do a direct GUC check.
207 : : */
389 michael@paquier.xyz 208 [ + + ]: 37993 : if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
209 : : {
210 [ + - ]: 40 : if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
211 [ + + ]: 40 : lsn <= WalSndCtl->lsn[mode])
212 : : {
213 : 6 : LWLockRelease(SyncRepLock);
214 : 6 : return;
215 : : }
216 : : }
217 [ - + ]: 37953 : else if (lsn <= WalSndCtl->lsn[mode])
218 : : {
219 : : /*
220 : : * The LSN is older than what we need to wait for. The sync standby
221 : : * data has not been initialized yet, but we are OK to not wait
222 : : * because we know that there is no point in doing so based on the
223 : : * LSN.
224 : : */
389 michael@paquier.xyz 225 :UBC 0 : LWLockRelease(SyncRepLock);
226 : 0 : return;
227 : : }
389 michael@paquier.xyz 228 [ + - + - ]:CBC 37953 : else if (!SyncStandbysDefined())
229 : : {
230 : : /*
231 : : * If we are here, the sync standby data has not been initialized yet,
232 : : * and the LSN is newer than what need to wait for, so we have fallen
233 : : * back to the best thing we could do in this case: a check on
234 : : * SyncStandbysDefined() to see if the GUC is set or not.
235 : : *
236 : : * When the GUC has a value, we wait until the checkpointer updates
237 : : * the status data because we cannot be sure yet if we should wait or
238 : : * not. Here, the GUC has *no* value, we are sure that there is no
239 : : * point to wait; this matters for example when initializing a
240 : : * cluster, where we should never wait, and no sync standbys is the
241 : : * default behavior.
242 : : */
5528 rhaas@postgresql.org 243 : 37953 : LWLockRelease(SyncRepLock);
244 : 37953 : return;
245 : : }
246 : :
247 : : /*
248 : : * Set our waitLSN so WALSender will know when to wake us, and add
249 : : * ourselves to the queue.
250 : : */
3689 251 : 34 : MyProc->waitLSN = lsn;
5528 252 : 34 : MyProc->syncRepState = SYNC_REP_WAITING;
5209 simon@2ndQuadrant.co 253 : 34 : SyncRepQueueInsert(mode);
254 [ - + ]: 34 : Assert(SyncRepQueueIsOrderedByLSN(mode));
5528 rhaas@postgresql.org 255 : 34 : LWLockRelease(SyncRepLock);
256 : :
257 : : /* Alter ps display to show waiting for sync rep. */
258 [ + - ]: 34 : if (update_process_title)
259 : : {
260 : : char buffer[32];
261 : :
302 alvherre@kurilemu.de 262 :GNC 34 : sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
1170 drowley@postgresql.o 263 :CBC 34 : set_ps_display_suffix(buffer);
264 : : }
265 : :
266 : : /*
267 : : * Wait for specified LSN to be confirmed.
268 : : *
269 : : * Each proc has its own wait latch, so we perform a normal latch
270 : : * check/wait loop here.
271 : : */
272 : : for (;;)
5539 simon@2ndQuadrant.co 273 : 34 : {
274 : : int rc;
275 : :
276 : : /* Must reset the latch before testing state. */
4129 andres@anarazel.de 277 : 68 : ResetLatch(MyLatch);
278 : :
279 : : /*
280 : : * Acquiring the lock is not needed, the latch ensures proper
281 : : * barriers. If it looks like we're done, we must really be done,
282 : : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
283 : : * it will never update it again, so we can't be seeing a stale value
284 : : * in that case.
285 : : */
3553 simon@2ndQuadrant.co 286 [ + + ]: 68 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
5528 rhaas@postgresql.org 287 : 34 : break;
288 : :
289 : : /*
290 : : * If a wait for synchronous replication is pending, we can neither
291 : : * acknowledge the commit nor raise ERROR or FATAL. The latter would
292 : : * lead the client to believe that the transaction aborted, which is
293 : : * not true: it's already committed locally. The former is no good
294 : : * either: the client has requested synchronous replication, and is
295 : : * entitled to assume that an acknowledged commit is also replicated,
296 : : * which might not be true. So in this case we issue a WARNING (which
297 : : * some clients may be able to interpret) and shut off further output.
298 : : * We do NOT reset ProcDiePending, so that the process will die after
299 : : * the commit is cleaned up.
300 : : */
301 [ - + ]: 34 : if (ProcDiePending)
302 : : {
21 andrew@dunslane.net 303 [ # # ]:UNC 0 : if (ProcDieSenderPid != 0)
304 [ # # ]: 0 : ereport(WARNING,
305 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
306 : : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
307 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby."),
308 : : errdetail_log("The transaction has already committed locally, but might not have been replicated to the standby. Signal sent by PID %d, UID %d.",
309 : : (int) ProcDieSenderPid,
310 : : (int) ProcDieSenderUid)));
311 : : else
312 [ # # ]: 0 : ereport(WARNING,
313 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
314 : : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
315 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
5528 rhaas@postgresql.org 316 :UBC 0 : whereToSendOutput = DestNone;
317 : 0 : SyncRepCancelWait();
318 : 0 : break;
319 : : }
320 : :
321 : : /*
322 : : * It's unclear what to do if a query cancel interrupt arrives. We
323 : : * can't actually abort at this point, but ignoring the interrupt
324 : : * altogether is not helpful, so we just terminate the wait with a
325 : : * suitable warning.
326 : : */
5528 rhaas@postgresql.org 327 [ - + ]:CBC 34 : if (QueryCancelPending)
328 : : {
5528 rhaas@postgresql.org 329 :UBC 0 : QueryCancelPending = false;
330 [ # # ]: 0 : ereport(WARNING,
331 : : (errmsg("canceling wait for synchronous replication due to user request"),
332 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
333 : 0 : SyncRepCancelWait();
334 : 0 : break;
335 : : }
336 : :
337 : : /*
338 : : * Wait on latch. Any condition that should wake us up will set the
339 : : * latch, so no need for timeout.
340 : : */
2720 tmunro@postgresql.or 341 :CBC 34 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
342 : : WAIT_EVENT_SYNC_REP);
343 : :
344 : : /*
345 : : * If the postmaster dies, we'll probably never get an acknowledgment,
346 : : * because all the wal sender processes will exit. So just bail out.
347 : : */
348 [ - + ]: 34 : if (rc & WL_POSTMASTER_DEATH)
349 : : {
5528 rhaas@postgresql.org 350 :UBC 0 : ProcDiePending = true;
351 : 0 : whereToSendOutput = DestNone;
352 : 0 : SyncRepCancelWait();
353 : 0 : break;
354 : : }
355 : : }
356 : :
357 : : /*
358 : : * WalSender has checked our LSN and has removed us from queue. Clean up
359 : : * state and leave. It's OK to reset these shared memory fields without
360 : : * holding SyncRepLock, because any walsenders will ignore us anyway when
361 : : * we're not on the queue. We need a read barrier to make sure we see the
362 : : * changes to the queue link (this might be unnecessary without
363 : : * assertions, but better safe than sorry).
364 : : */
3219 heikki.linnakangas@i 365 :CBC 34 : pg_read_barrier();
1203 andres@anarazel.de 366 [ - + ]: 34 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
5528 rhaas@postgresql.org 367 : 34 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
96 alvherre@kurilemu.de 368 :GNC 34 : MyProc->waitLSN = InvalidXLogRecPtr;
369 : :
370 : : /* reset ps display to remove the suffix */
1170 drowley@postgresql.o 371 [ + - ]:CBC 34 : if (update_process_title)
372 : 34 : set_ps_display_remove_suffix();
373 : : }
374 : :
375 : : /*
376 : : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
377 : : *
378 : : * Usually we will go at tail of queue, though it's possible that we arrive
379 : : * here out of order, so start at tail and work back to insertion point.
380 : : */
381 : : static void
5215 simon@2ndQuadrant.co 382 : 34 : SyncRepQueueInsert(int mode)
383 : : {
384 : : dlist_head *queue;
385 : : dlist_iter iter;
386 : :
387 [ + - - + ]: 34 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1203 andres@anarazel.de 388 : 34 : queue = &WalSndCtl->SyncRepQueue[mode];
389 : :
390 [ + - - + ]: 34 : dlist_reverse_foreach(iter, queue)
391 : : {
1203 andres@anarazel.de 392 :UBC 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
393 : :
394 : : /*
395 : : * Stop at the queue element that we should insert after to ensure the
396 : : * queue is ordered by LSN.
397 : : */
4876 alvherre@alvh.no-ip. 398 [ # # ]: 0 : if (proc->waitLSN < MyProc->waitLSN)
399 : : {
1203 andres@anarazel.de 400 : 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
401 : 0 : return;
402 : : }
403 : : }
404 : :
405 : : /*
406 : : * If we get here, the list was either empty, or this process needs to be
407 : : * at the head.
408 : : */
1203 andres@anarazel.de 409 :CBC 34 : dlist_push_head(queue, &MyProc->syncRepLinks);
410 : : }
411 : :
412 : : /*
413 : : * Acquire SyncRepLock and cancel any wait currently in progress.
414 : : */
415 : : static void
5528 rhaas@postgresql.org 416 :UBC 0 : SyncRepCancelWait(void)
417 : : {
418 : 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1203 andres@anarazel.de 419 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
420 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
5528 rhaas@postgresql.org 421 : 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
422 : 0 : LWLockRelease(SyncRepLock);
423 : 0 : }
424 : :
425 : : void
5382 tgl@sss.pgh.pa.us 426 :CBC 18582 : SyncRepCleanupAtProcExit(void)
427 : : {
428 : : /*
429 : : * First check if we are removed from the queue without the lock to not
430 : : * slow down backend exit.
431 : : */
1203 andres@anarazel.de 432 [ - + ]: 18582 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
433 : : {
5539 simon@2ndQuadrant.co 434 :UBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
435 : :
436 : : /* maybe we have just been removed, so recheck */
1203 andres@anarazel.de 437 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
438 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
439 : :
5539 simon@2ndQuadrant.co 440 : 0 : LWLockRelease(SyncRepLock);
441 : : }
5539 simon@2ndQuadrant.co 442 :CBC 18582 : }
443 : :
444 : : /*
445 : : * ===========================================================
446 : : * Synchronous Replication functions for wal sender processes
447 : : * ===========================================================
448 : : */
449 : :
450 : : /*
451 : : * Take any action required to initialise sync rep state from config
452 : : * data. Called at WALSender startup and after each SIGHUP.
453 : : */
454 : : void
455 : 783 : SyncRepInitConfig(void)
456 : : {
457 : : int priority;
458 : :
459 : : /*
460 : : * Determine if we are a potential sync standby and remember the result
461 : : * for handling replies from standby.
462 : : */
463 : 783 : priority = SyncRepGetStandbyPriority();
464 [ + + ]: 783 : if (MyWalSnd->sync_standby_priority != priority)
465 : : {
2208 tgl@sss.pgh.pa.us 466 [ - + ]: 18 : SpinLockAcquire(&MyWalSnd->mutex);
5539 simon@2ndQuadrant.co 467 : 18 : MyWalSnd->sync_standby_priority = priority;
2208 tgl@sss.pgh.pa.us 468 : 18 : SpinLockRelease(&MyWalSnd->mutex);
469 : :
5539 simon@2ndQuadrant.co 470 [ - + ]: 18 : ereport(DEBUG1,
471 : : (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
472 : : application_name, priority)));
473 : : }
474 : 783 : }
475 : :
476 : : /*
477 : : * Update the LSNs on each queue based upon our latest state. This
478 : : * implements a simple policy of first-valid-sync-standby-releases-waiter.
479 : : *
480 : : * Other policies are possible, which would change what we do here and
481 : : * perhaps also which information we store as well.
482 : : */
483 : : void
484 : 107118 : SyncRepReleaseWaiters(void)
485 : : {
486 : 107118 : volatile WalSndCtlData *walsndctl = WalSndCtl;
487 : : XLogRecPtr writePtr;
488 : : XLogRecPtr flushPtr;
489 : : XLogRecPtr applyPtr;
490 : : bool got_recptr;
491 : : bool am_sync;
5215 492 : 107118 : int numwrite = 0;
493 : 107118 : int numflush = 0;
3689 rhaas@postgresql.org 494 : 107118 : int numapply = 0;
495 : :
496 : : /*
497 : : * If this WALSender is serving a standby that is not on the list of
498 : : * potential sync standbys then we have nothing to do. If we are still
499 : : * starting up, still running base backup or the current flush position is
500 : : * still invalid, then leave quickly also. Streaming or stopping WAL
501 : : * senders are allowed to release waiters.
502 : : */
5539 simon@2ndQuadrant.co 503 [ + + ]: 107118 : if (MyWalSnd->sync_standby_priority == 0 ||
2714 michael@paquier.xyz 504 [ + + ]: 166 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
505 [ + + ]: 40 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
180 alvherre@kurilemu.de 506 [ - + ]:GNC 148 : !XLogRecPtrIsValid(MyWalSnd->flush))
507 : : {
3681 fujii@postgresql.org 508 :CBC 106970 : announce_next_takeover = true;
5539 simon@2ndQuadrant.co 509 : 106975 : return;
510 : : }
511 : :
512 : : /*
513 : : * We're a potential sync standby. Release waiters if there are enough
514 : : * sync standbys and we are considered as sync.
515 : : */
516 : 148 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
517 : :
518 : : /*
519 : : * Check whether we are a sync standby or not, and calculate the synced
520 : : * positions among all sync standbys. (Note: although this step does not
521 : : * of itself require holding SyncRepLock, it seems like a good idea to do
522 : : * it after acquiring the lock. This ensures that the WAL pointers we use
523 : : * to release waiters are newer than any previous execution of this
524 : : * routine used.)
525 : : */
3424 fujii@postgresql.org 526 : 148 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
527 : :
528 : : /*
529 : : * If we are managing a sync standby, though we weren't prior to this,
530 : : * then announce we are now a sync standby.
531 : : */
3681 532 [ + + + + ]: 148 : if (announce_next_takeover && am_sync)
533 : : {
534 : 13 : announce_next_takeover = false;
535 : :
3424 536 [ + - ]: 13 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
537 [ + - ]: 13 : ereport(LOG,
538 : : (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
539 : : application_name, MyWalSnd->sync_standby_priority)));
540 : : else
3424 fujii@postgresql.org 541 [ # # ]:UBC 0 : ereport(LOG,
542 : : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
543 : : application_name)));
544 : : }
545 : :
546 : : /*
547 : : * If the number of sync standbys is less than requested or we aren't
548 : : * managing a sync standby then just leave.
549 : : */
3424 fujii@postgresql.org 550 [ + + - + ]:CBC 148 : if (!got_recptr || !am_sync)
551 : : {
5539 simon@2ndQuadrant.co 552 : 5 : LWLockRelease(SyncRepLock);
3681 fujii@postgresql.org 553 : 5 : announce_next_takeover = !am_sync;
5539 simon@2ndQuadrant.co 554 : 5 : return;
555 : : }
556 : :
557 : : /*
558 : : * Set the lsn first so that when we wake backends they will release up to
559 : : * this location.
560 : : */
3681 fujii@postgresql.org 561 [ + + ]: 143 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
562 : : {
563 : 53 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
5215 simon@2ndQuadrant.co 564 : 53 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
565 : : }
3681 fujii@postgresql.org 566 [ + + ]: 143 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
567 : : {
568 : 58 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
5215 simon@2ndQuadrant.co 569 : 58 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
570 : : }
3681 fujii@postgresql.org 571 [ + + ]: 143 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
572 : : {
573 : 53 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
3689 rhaas@postgresql.org 574 : 53 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
575 : : }
576 : :
5539 simon@2ndQuadrant.co 577 : 143 : LWLockRelease(SyncRepLock);
578 : :
302 alvherre@kurilemu.de 579 [ - + ]:GNC 143 : elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
580 : : numwrite, LSN_FORMAT_ARGS(writePtr),
581 : : numflush, LSN_FORMAT_ARGS(flushPtr),
582 : : numapply, LSN_FORMAT_ARGS(applyPtr));
583 : : }
584 : :
585 : : /*
586 : : * Calculate the synced Write, Flush and Apply positions among sync standbys.
587 : : *
588 : : * Return false if the number of sync standbys is less than
589 : : * synchronous_standby_names specifies. Otherwise return true and
590 : : * store the positions into *writePtr, *flushPtr and *applyPtr.
591 : : *
592 : : * On return, *am_sync is set to true if this walsender is connecting to
593 : : * sync standby. Otherwise it's set to false.
594 : : */
595 : : static bool
3424 fujii@postgresql.org 596 :CBC 148 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
597 : : XLogRecPtr *applyPtr, bool *am_sync)
598 : : {
599 : : SyncRepStandbyData *sync_standbys;
600 : : int num_standbys;
601 : : int i;
602 : :
603 : : /* Initialize default results */
3681 604 : 148 : *writePtr = InvalidXLogRecPtr;
605 : 148 : *flushPtr = InvalidXLogRecPtr;
606 : 148 : *applyPtr = InvalidXLogRecPtr;
607 : 148 : *am_sync = false;
608 : :
609 : : /* Quick out if not even configured to be synchronous */
2208 tgl@sss.pgh.pa.us 610 [ - + ]: 148 : if (SyncRepConfig == NULL)
2208 tgl@sss.pgh.pa.us 611 :UBC 0 : return false;
612 : :
613 : : /* Get standbys that are considered as synchronous at this moment */
2208 tgl@sss.pgh.pa.us 614 :CBC 148 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
615 : :
616 : : /* Am I among the candidate sync standbys? */
617 [ + + ]: 156 : for (i = 0; i < num_standbys; i++)
618 : : {
619 [ + + ]: 152 : if (sync_standbys[i].is_me)
620 : : {
621 : 144 : *am_sync = true;
622 : 144 : break;
623 : : }
624 : : }
625 : :
626 : : /*
627 : : * Nothing more to do if we are not managing a sync standby or there are
628 : : * not enough synchronous standbys.
629 : : */
3660 630 [ + + ]: 148 : if (!(*am_sync) ||
2208 631 [ + + ]: 144 : num_standbys < SyncRepConfig->num_sync)
632 : : {
633 : 5 : pfree(sync_standbys);
3681 fujii@postgresql.org 634 : 5 : return false;
635 : : }
636 : :
637 : : /*
638 : : * In a priority-based sync replication, the synced positions are the
639 : : * oldest ones among sync standbys. In a quorum-based, they are the Nth
640 : : * latest ones.
641 : : *
642 : : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
643 : : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
644 : : * because it's a bit more efficient.
645 : : *
646 : : * XXX If the numbers of current and requested sync standbys are the same,
647 : : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
648 : : * positions even in a quorum-based sync replication.
649 : : */
3424 650 [ + - ]: 143 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
651 : : {
652 : 143 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
653 : : sync_standbys, num_standbys);
654 : : }
655 : : else
656 : : {
3424 fujii@postgresql.org 657 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
658 : : sync_standbys, num_standbys,
2208 tgl@sss.pgh.pa.us 659 : 0 : SyncRepConfig->num_sync);
660 : : }
661 : :
2208 tgl@sss.pgh.pa.us 662 :CBC 143 : pfree(sync_standbys);
3424 fujii@postgresql.org 663 : 143 : return true;
664 : : }
665 : :
666 : : /*
667 : : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
668 : : */
669 : : static void
2208 tgl@sss.pgh.pa.us 670 : 143 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
671 : : XLogRecPtr *flushPtr,
672 : : XLogRecPtr *applyPtr,
673 : : SyncRepStandbyData *sync_standbys,
674 : : int num_standbys)
675 : : {
676 : : int i;
677 : :
678 : : /*
679 : : * Scan through all sync standbys and calculate the oldest Write, Flush
680 : : * and Apply positions. We assume *writePtr et al were initialized to
681 : : * InvalidXLogRecPtr.
682 : : */
683 [ + + ]: 289 : for (i = 0; i < num_standbys; i++)
684 : : {
685 : 146 : XLogRecPtr write = sync_standbys[i].write;
686 : 146 : XLogRecPtr flush = sync_standbys[i].flush;
687 : 146 : XLogRecPtr apply = sync_standbys[i].apply;
688 : :
180 alvherre@kurilemu.de 689 [ + + - + ]:GNC 146 : if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
3681 fujii@postgresql.org 690 :CBC 143 : *writePtr = write;
180 alvherre@kurilemu.de 691 [ + + - + ]:GNC 146 : if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
3681 fujii@postgresql.org 692 :CBC 143 : *flushPtr = flush;
180 alvherre@kurilemu.de 693 [ + + - + ]:GNC 146 : if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
3681 fujii@postgresql.org 694 :CBC 143 : *applyPtr = apply;
695 : : }
3424 696 : 143 : }
697 : :
698 : : /*
699 : : * Calculate the Nth latest Write, Flush and Apply positions among sync
700 : : * standbys.
701 : : */
702 : : static void
2208 tgl@sss.pgh.pa.us 703 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
704 : : XLogRecPtr *flushPtr,
705 : : XLogRecPtr *applyPtr,
706 : : SyncRepStandbyData *sync_standbys,
707 : : int num_standbys,
708 : : uint8 nth)
709 : : {
710 : : XLogRecPtr *write_array;
711 : : XLogRecPtr *flush_array;
712 : : XLogRecPtr *apply_array;
713 : : int i;
714 : :
715 : : /* Should have enough candidates, or somebody messed up */
716 [ # # # # ]: 0 : Assert(nth > 0 && nth <= num_standbys);
717 : :
146 michael@paquier.xyz 718 :UNC 0 : write_array = palloc_array(XLogRecPtr, num_standbys);
719 : 0 : flush_array = palloc_array(XLogRecPtr, num_standbys);
720 : 0 : apply_array = palloc_array(XLogRecPtr, num_standbys);
721 : :
2208 tgl@sss.pgh.pa.us 722 [ # # ]:UBC 0 : for (i = 0; i < num_standbys; i++)
723 : : {
724 : 0 : write_array[i] = sync_standbys[i].write;
725 : 0 : flush_array[i] = sync_standbys[i].flush;
726 : 0 : apply_array[i] = sync_standbys[i].apply;
727 : : }
728 : :
729 : : /* Sort each array in descending order */
730 : 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
731 : 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
732 : 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
733 : :
734 : : /* Get Nth latest Write, Flush, Apply positions */
3424 fujii@postgresql.org 735 : 0 : *writePtr = write_array[nth - 1];
736 : 0 : *flushPtr = flush_array[nth - 1];
737 : 0 : *applyPtr = apply_array[nth - 1];
738 : :
739 : 0 : pfree(write_array);
740 : 0 : pfree(flush_array);
741 : 0 : pfree(apply_array);
742 : 0 : }
743 : :
744 : : /*
745 : : * Compare lsn in order to sort array in descending order.
746 : : */
747 : : static int
748 : 0 : cmp_lsn(const void *a, const void *b)
749 : : {
3275 bruce@momjian.us 750 : 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
751 : 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
752 : :
809 nathan@postgresql.or 753 : 0 : return pg_cmp_u64(lsn2, lsn1);
754 : : }
755 : :
756 : : /*
757 : : * Return data about walsenders that are candidates to be sync standbys.
758 : : *
759 : : * *standbys is set to a palloc'd array of structs of per-walsender data,
760 : : * and the number of valid entries (candidate sync senders) is returned.
761 : : * (This might be more or fewer than num_sync; caller must check.)
762 : : */
763 : : int
2208 tgl@sss.pgh.pa.us 764 :CBC 694 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
765 : : {
766 : : int i;
767 : : int n;
768 : :
769 : : /* Create result array */
146 michael@paquier.xyz 770 :GNC 694 : *standbys = palloc_array(SyncRepStandbyData, max_wal_senders);
771 : :
772 : : /* Quick exit if sync replication is not requested */
3424 fujii@postgresql.org 773 [ + + ]:CBC 694 : if (SyncRepConfig == NULL)
2208 tgl@sss.pgh.pa.us 774 : 531 : return 0;
775 : :
776 : : /* Collect raw data from shared memory */
777 : 163 : n = 0;
3424 fujii@postgresql.org 778 [ + + ]: 1793 : for (i = 0; i < max_wal_senders; i++)
779 : : {
780 : : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
781 : : * rearrangement */
782 : : SyncRepStandbyData *stby;
783 : : WalSndState state; /* not included in SyncRepStandbyData */
784 : :
785 : 1630 : walsnd = &WalSndCtl->walsnds[i];
2208 tgl@sss.pgh.pa.us 786 : 1630 : stby = *standbys + n;
787 : :
3231 alvherre@alvh.no-ip. 788 [ - + ]: 1630 : SpinLockAcquire(&walsnd->mutex);
2208 tgl@sss.pgh.pa.us 789 : 1630 : stby->pid = walsnd->pid;
3231 alvherre@alvh.no-ip. 790 : 1630 : state = walsnd->state;
2208 tgl@sss.pgh.pa.us 791 : 1630 : stby->write = walsnd->write;
792 : 1630 : stby->flush = walsnd->flush;
793 : 1630 : stby->apply = walsnd->apply;
794 : 1630 : stby->sync_standby_priority = walsnd->sync_standby_priority;
3231 alvherre@alvh.no-ip. 795 : 1630 : SpinLockRelease(&walsnd->mutex);
796 : :
797 : : /* Must be active */
2208 tgl@sss.pgh.pa.us 798 [ + + ]: 1630 : if (stby->pid == 0)
3424 fujii@postgresql.org 799 : 1418 : continue;
800 : :
801 : : /* Must be streaming or stopping */
2714 michael@paquier.xyz 802 [ + + - + ]: 212 : if (state != WALSNDSTATE_STREAMING &&
803 : : state != WALSNDSTATE_STOPPING)
3424 fujii@postgresql.org 804 :UBC 0 : continue;
805 : :
806 : : /* Must be synchronous */
2208 tgl@sss.pgh.pa.us 807 [ + + ]:CBC 212 : if (stby->sync_standby_priority == 0)
3424 fujii@postgresql.org 808 : 12 : continue;
809 : :
810 : : /* Must have a valid flush position */
180 alvherre@kurilemu.de 811 [ - + ]:GNC 200 : if (!XLogRecPtrIsValid(stby->flush))
3424 fujii@postgresql.org 812 :UBC 0 : continue;
813 : :
814 : : /* OK, it's a candidate */
2208 tgl@sss.pgh.pa.us 815 :CBC 200 : stby->walsnd_index = i;
816 : 200 : stby->is_me = (walsnd == MyWalSnd);
817 : 200 : n++;
818 : : }
819 : :
820 : : /*
821 : : * In quorum mode, we return all the candidates. In priority mode, if we
822 : : * have too many candidates then return only the num_sync ones of highest
823 : : * priority.
824 : : */
825 [ + + ]: 163 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
826 [ + + ]: 162 : n > SyncRepConfig->num_sync)
827 : : {
828 : : /* Sort by priority ... */
829 : 15 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
830 : : standby_priority_comparator);
831 : : /* ... then report just the first num_sync ones */
832 : 15 : n = SyncRepConfig->num_sync;
833 : : }
834 : :
835 : 163 : return n;
836 : : }
837 : :
838 : : /*
839 : : * qsort comparator to sort SyncRepStandbyData entries by priority
840 : : */
841 : : static int
842 : 34 : standby_priority_comparator(const void *a, const void *b)
843 : : {
844 : 34 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
845 : 34 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
846 : :
847 : : /* First, sort by increasing priority value */
848 [ + + ]: 34 : if (sa->sync_standby_priority != sb->sync_standby_priority)
849 : 14 : return sa->sync_standby_priority - sb->sync_standby_priority;
850 : :
851 : : /*
852 : : * We might have equal priority values; arbitrarily break ties by position
853 : : * in the WalSnd array. (This is utterly bogus, since that is arrival
854 : : * order dependent, but there are regression tests that rely on it.)
855 : : */
856 : 20 : return sa->walsnd_index - sb->walsnd_index;
857 : : }
858 : :
859 : :
860 : : /*
861 : : * Check if we are in the list of sync standbys, and if so, determine
862 : : * priority sequence. Return priority if set, or zero to indicate that
863 : : * we are not a potential sync standby.
864 : : *
865 : : * Compare the parameter SyncRepStandbyNames against the application_name
866 : : * for this WALSender, or allow any name if we find a wildcard "*".
867 : : */
868 : : static int
5539 simon@2ndQuadrant.co 869 : 783 : SyncRepGetStandbyPriority(void)
870 : : {
871 : : const char *standby_name;
872 : : int priority;
873 : 783 : bool found = false;
874 : :
875 : : /*
876 : : * Since synchronous cascade replication is not allowed, we always set the
877 : : * priority of cascading walsender to zero.
878 : : */
5404 879 [ + + ]: 783 : if (am_cascading_walsender)
880 : 29 : return 0;
881 : :
3660 tgl@sss.pgh.pa.us 882 [ + - + + : 754 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
- + ]
5539 simon@2ndQuadrant.co 883 : 729 : return 0;
884 : :
3660 tgl@sss.pgh.pa.us 885 : 25 : standby_name = SyncRepConfig->member_names;
886 [ + + ]: 33 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
887 : : {
5539 simon@2ndQuadrant.co 888 [ + + ]: 32 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
3660 tgl@sss.pgh.pa.us 889 [ + + ]: 18 : strcmp(standby_name, "*") == 0)
890 : : {
5539 simon@2ndQuadrant.co 891 : 24 : found = true;
892 : 24 : break;
893 : : }
3660 tgl@sss.pgh.pa.us 894 : 8 : standby_name += strlen(standby_name) + 1;
895 : : }
896 : :
3296 fujii@postgresql.org 897 [ + + ]: 25 : if (!found)
898 : 1 : return 0;
899 : :
900 : : /*
901 : : * In quorum-based sync replication, all the standbys in the list have the
902 : : * same priority, one.
903 : : */
904 [ + - ]: 24 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
905 : : }
906 : :
907 : : /*
908 : : * Walk the specified queue from head. Set the state of any backends that
909 : : * need to be woken, remove them from the queue, and then wake them.
910 : : * Pass all = true to wake whole queue; otherwise, just wake up to
911 : : * the walsender's LSN.
912 : : *
913 : : * The caller must hold SyncRepLock in exclusive mode.
914 : : */
915 : : static int
5215 simon@2ndQuadrant.co 916 : 167 : SyncRepWakeQueue(bool all, int mode)
917 : : {
5539 918 : 167 : volatile WalSndCtlData *walsndctl = WalSndCtl;
5504 bruce@momjian.us 919 : 167 : int numprocs = 0;
920 : : dlist_mutable_iter iter;
921 : :
5215 simon@2ndQuadrant.co 922 [ + - - + ]: 167 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
2377 michael@paquier.xyz 923 [ - + ]: 167 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
5215 simon@2ndQuadrant.co 924 [ - + ]: 167 : Assert(SyncRepQueueIsOrderedByLSN(mode));
925 : :
1203 andres@anarazel.de 926 [ + - + + ]: 192 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
927 : : {
1082 tgl@sss.pgh.pa.us 928 : 29 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
929 : :
930 : : /*
931 : : * Assume the queue is ordered by LSN
932 : : */
4876 alvherre@alvh.no-ip. 933 [ + - + + ]: 29 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
5539 simon@2ndQuadrant.co 934 : 4 : return numprocs;
935 : :
936 : : /*
937 : : * Remove from queue.
938 : : */
1203 andres@anarazel.de 939 : 25 : dlist_delete_thoroughly(&proc->syncRepLinks);
940 : :
941 : : /*
942 : : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
943 : : * make sure that it sees the queue link being removed before the
944 : : * syncRepState change.
945 : : */
3219 heikki.linnakangas@i 946 : 25 : pg_write_barrier();
947 : :
948 : : /*
949 : : * Set state to complete; see SyncRepWaitForLSN() for discussion of
950 : : * the various states.
951 : : */
1203 andres@anarazel.de 952 : 25 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
953 : :
954 : : /*
955 : : * Wake only when we have set state and removed from queue.
956 : : */
957 : 25 : SetLatch(&(proc->procLatch));
958 : :
5539 simon@2ndQuadrant.co 959 : 25 : numprocs++;
960 : : }
961 : :
962 : 163 : return numprocs;
963 : : }
964 : :
965 : : /*
966 : : * The checkpointer calls this as needed to update the shared
967 : : * sync_standbys_status flag, so that backends don't remain permanently wedged
968 : : * if synchronous_standby_names is unset. It's safe to check the current value
969 : : * without the lock, because it's only ever updated by one process. But we
970 : : * must take the lock to change it.
971 : : */
972 : : void
5528 rhaas@postgresql.org 973 : 732 : SyncRepUpdateSyncStandbysDefined(void)
974 : : {
975 [ + - + + ]: 732 : bool sync_standbys_defined = SyncStandbysDefined();
976 : :
389 michael@paquier.xyz 977 : 732 : if (sync_standbys_defined !=
978 [ + + ]: 732 : ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
979 : : {
5528 rhaas@postgresql.org 980 : 14 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
981 : :
982 : : /*
983 : : * If synchronous_standby_names has been reset to empty, it's futile
984 : : * for backends to continue waiting. Since the user no longer wants
985 : : * synchronous replication, we'd better wake them up.
986 : : */
987 [ + + ]: 14 : if (!sync_standbys_defined)
988 : : {
989 : : int i;
990 : :
5215 simon@2ndQuadrant.co 991 [ + + ]: 4 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
992 : 3 : SyncRepWakeQueue(true, i);
993 : : }
994 : :
995 : : /*
996 : : * Only allow people to join the queue when there are synchronous
997 : : * standbys defined. Without this interlock, there's a race
998 : : * condition: we might wake up all the current waiters; then, some
999 : : * backend that hasn't yet reloaded its config might go to sleep on
1000 : : * the queue (and never wake up). This prevents that.
1001 : : */
389 michael@paquier.xyz 1002 [ + + ]: 14 : WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
1003 : : (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
1004 : :
1005 : 14 : LWLockRelease(SyncRepLock);
1006 : : }
1007 [ + + ]: 718 : else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
1008 : : {
1009 : 628 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1010 : :
1011 : : /*
1012 : : * Note that there is no need to wake up the queues here. We would
1013 : : * reach this path only if SyncStandbysDefined() returns false, or it
1014 : : * would mean that some backends are waiting with the GUC set. See
1015 : : * SyncRepWaitForLSN().
1016 : : */
1017 [ + - - + ]: 628 : Assert(!SyncStandbysDefined());
1018 : :
1019 : : /*
1020 : : * Even if there is no sync standby defined, let the readers of this
1021 : : * information know that the sync standby data has been initialized.
1022 : : * This can just be done once, hence the previous check on
1023 : : * SYNC_STANDBY_INIT to avoid useless work.
1024 : : */
1025 : 628 : WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
1026 : :
5528 rhaas@postgresql.org 1027 : 628 : LWLockRelease(SyncRepLock);
1028 : : }
1029 : 732 : }
1030 : :
1031 : : #ifdef USE_ASSERT_CHECKING
1032 : : static bool
5215 simon@2ndQuadrant.co 1033 : 201 : SyncRepQueueIsOrderedByLSN(int mode)
1034 : : {
1035 : : XLogRecPtr lastLSN;
1036 : : dlist_iter iter;
1037 : :
1038 [ + - - + ]: 201 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1039 : :
96 alvherre@kurilemu.de 1040 :GNC 201 : lastLSN = InvalidXLogRecPtr;
1041 : :
1203 andres@anarazel.de 1042 [ + - + + ]:CBC 264 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
1043 : : {
1044 : 63 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
1045 : :
1046 : : /*
1047 : : * Check the queue is ordered by LSN and that multiple procs don't
1048 : : * have matching LSNs
1049 : : */
4876 alvherre@alvh.no-ip. 1050 [ - + ]: 63 : if (proc->waitLSN <= lastLSN)
5539 simon@2ndQuadrant.co 1051 :UBC 0 : return false;
1052 : :
5539 simon@2ndQuadrant.co 1053 :CBC 63 : lastLSN = proc->waitLSN;
1054 : : }
1055 : :
1056 : 201 : return true;
1057 : : }
1058 : : #endif
1059 : :
1060 : : /*
1061 : : * ===========================================================
1062 : : * Synchronous Replication functions executed by any process
1063 : : * ===========================================================
1064 : : */
1065 : :
1066 : : bool
5507 tgl@sss.pgh.pa.us 1067 : 1371 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1068 : : {
3681 fujii@postgresql.org 1069 [ + - + + ]: 1371 : if (*newval != NULL && (*newval)[0] != '\0')
5539 simon@2ndQuadrant.co 1070 : 74 : {
1071 : : yyscan_t scanner;
1072 : : int parse_rc;
1073 : : SyncRepConfigData *pconf;
1074 : :
1075 : : /* Result of parsing is returned in one of these two variables */
466 peter@eisentraut.org 1076 : 74 : SyncRepConfigData *syncrep_parse_result = NULL;
1077 : 74 : char *syncrep_parse_error_msg = NULL;
1078 : :
1079 : : /* Parse the synchronous_standby_names string */
519 1080 : 74 : syncrep_scanner_init(*newval, &scanner);
466 1081 : 74 : parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
519 1082 : 74 : syncrep_scanner_finish(scanner);
1083 : :
3660 tgl@sss.pgh.pa.us 1084 [ + - - + ]: 74 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1085 : : {
3681 fujii@postgresql.org 1086 :UBC 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
3660 tgl@sss.pgh.pa.us 1087 [ # # ]: 0 : if (syncrep_parse_error_msg)
1088 : 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1089 : : else
1090 : : /* translator: %s is a GUC name */
524 alvherre@alvh.no-ip. 1091 : 0 : GUC_check_errdetail("\"%s\" parser failed.",
1092 : : "synchronous_standby_names");
3681 fujii@postgresql.org 1093 : 0 : return false;
1094 : : }
1095 : :
3426 fujii@postgresql.org 1096 [ - + ]:CBC 74 : if (syncrep_parse_result->num_sync <= 0)
1097 : : {
3426 fujii@postgresql.org 1098 :UBC 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1099 : 0 : syncrep_parse_result->num_sync);
1100 : 0 : return false;
1101 : : }
1102 : :
1103 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
1104 : : pconf = (SyncRepConfigData *)
1299 tgl@sss.pgh.pa.us 1105 :CBC 74 : guc_malloc(LOG, syncrep_parse_result->config_size);
3660 1106 [ - + ]: 74 : if (pconf == NULL)
3660 tgl@sss.pgh.pa.us 1107 :UBC 0 : return false;
3660 tgl@sss.pgh.pa.us 1108 :CBC 74 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1109 : :
523 peter@eisentraut.org 1110 : 74 : *extra = pconf;
1111 : :
1112 : : /*
1113 : : * We need not explicitly clean up syncrep_parse_result. It, and any
1114 : : * other cruft generated during parsing, will be freed when the
1115 : : * current memory context is deleted. (This code is generally run in
1116 : : * a short-lived context used for config file processing, so that will
1117 : : * not be very long.)
1118 : : */
1119 : : }
1120 : : else
3660 tgl@sss.pgh.pa.us 1121 : 1297 : *extra = NULL;
1122 : :
5507 1123 : 1371 : return true;
1124 : : }
1125 : :
1126 : : void
3660 1127 : 1361 : assign_synchronous_standby_names(const char *newval, void *extra)
1128 : : {
1129 : 1361 : SyncRepConfig = (SyncRepConfigData *) extra;
1130 : 1361 : }
1131 : :
1132 : : void
5215 simon@2ndQuadrant.co 1133 : 2097 : assign_synchronous_commit(int newval, void *extra)
1134 : : {
1135 [ - + + + ]: 2097 : switch (newval)
1136 : : {
5215 simon@2ndQuadrant.co 1137 :UBC 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1138 : 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1139 : 0 : break;
5215 simon@2ndQuadrant.co 1140 :CBC 1426 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1141 : 1426 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1142 : 1426 : break;
3689 rhaas@postgresql.org 1143 : 2 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1144 : 2 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1145 : 2 : break;
5215 simon@2ndQuadrant.co 1146 : 669 : default:
1147 : 669 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1148 : 669 : break;
1149 : : }
1150 : 2097 : }
|