Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * wait.c
4 : : * Implements WAIT FOR, which allows waiting for events such as
5 : : * time passing or LSN having been replayed, flushed, or written.
6 : : *
7 : : * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/wait.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : : #include "postgres.h"
15 : :
16 : : #include <math.h>
17 : :
18 : : #include "access/xlog.h"
19 : : #include "access/xlogrecovery.h"
20 : : #include "access/xlogwait.h"
21 : : #include "catalog/pg_type_d.h"
22 : : #include "commands/defrem.h"
23 : : #include "commands/wait.h"
24 : : #include "executor/executor.h"
25 : : #include "parser/parse_node.h"
26 : : #include "storage/proc.h"
27 : : #include "utils/builtins.h"
28 : : #include "utils/guc.h"
29 : : #include "utils/pg_lsn.h"
30 : : #include "utils/snapmgr.h"
31 : :
32 : :
33 : : void
22 akorotkov@postgresql 34 :GNC 244 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, bool isTopLevel,
35 : : DestReceiver *dest)
36 : : {
37 : : XLogRecPtr lsn;
181 38 : 244 : int64 timeout = 0;
39 : : WaitLSNResult waitLSNResult;
120 40 : 244 : WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
181 41 : 244 : bool throw = true;
42 : : TupleDesc tupdesc;
43 : : TupOutputState *tstate;
44 : 244 : const char *result = "<unset>";
45 : 244 : bool timeout_specified = false;
46 : 244 : bool no_throw_specified = false;
120 47 : 244 : bool mode_specified = false;
48 : :
49 : : /*
50 : : * WAIT FOR must not be run as a non-top-level statement (e.g., inside a
51 : : * function, procedure, or DO block). Forbid this case upfront.
52 : : */
22 53 [ + + ]: 244 : if (!isTopLevel)
54 [ + - ]: 3 : ereport(ERROR,
55 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
56 : : errmsg("%s can only be executed as a top-level statement",
57 : : "WAIT FOR"),
58 : : errdetail("WAIT FOR cannot be used within a function, procedure, or DO block.")));
59 : :
60 : : /* Parse and validate the mandatory LSN */
181 61 : 241 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
62 : : CStringGetDatum(stmt->lsn_literal)));
63 : :
64 [ + + + + : 1102 : foreach_node(DefElem, defel, stmt->options)
+ + ]
65 : : {
120 66 [ + + ]: 638 : if (strcmp(defel->defname, "mode") == 0)
67 : : {
68 : : char *mode_str;
69 : :
70 [ + + ]: 219 : if (mode_specified)
71 : 1 : errorConflictingDefElem(defel, pstate);
72 : 218 : mode_specified = true;
73 : :
74 : 218 : mode_str = defGetString(defel);
75 : :
76 [ + + ]: 218 : if (pg_strcasecmp(mode_str, "standby_replay") == 0)
77 : 167 : lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
78 [ + + ]: 51 : else if (pg_strcasecmp(mode_str, "standby_write") == 0)
79 : 27 : lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
80 [ + + ]: 24 : else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
81 : 16 : lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
82 [ + + ]: 8 : else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
83 : 7 : lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
84 : : else
85 [ + - ]: 1 : ereport(ERROR,
86 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 : : errmsg("unrecognized value for %s option \"%s\": \"%s\"",
88 : : "WAIT", defel->defname, mode_str),
89 : : parser_errposition(pstate, defel->location)));
90 : : }
91 [ + + ]: 419 : else if (strcmp(defel->defname, "timeout") == 0)
92 : : {
93 : : char *timeout_str;
94 : : const char *hintmsg;
95 : : double dval;
96 : :
181 97 [ + + ]: 223 : if (timeout_specified)
98 : 1 : errorConflictingDefElem(defel, pstate);
99 : 222 : timeout_specified = true;
100 : :
101 : 222 : timeout_str = defGetString(defel);
102 : :
12 drowley@postgresql.o 103 [ + + ]: 222 : if (!parse_real(timeout_str, &dval, GUC_UNIT_MS, &hintmsg))
104 : : {
181 akorotkov@postgresql 105 [ + - - + ]: 1 : ereport(ERROR,
106 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
107 : : errmsg("invalid timeout value: \"%s\"", timeout_str),
108 : : hintmsg ? errhint("%s", _(hintmsg)) : 0);
109 : : }
110 : :
111 : : /*
112 : : * Get rid of any fractional part in the input. This is so we
113 : : * don't fail on just-out-of-range values that would round into
114 : : * range.
115 : : */
12 drowley@postgresql.o 116 : 221 : dval = rint(dval);
117 : :
118 : : /* Range check */
119 [ + - + - : 221 : if (unlikely(isnan(dval) || !FLOAT8_FITS_IN_INT64(dval)))
- + - + ]
181 akorotkov@postgresql 120 [ # # ]:UNC 0 : ereport(ERROR,
121 : : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
122 : : errmsg("timeout value is out of range"));
123 : :
12 drowley@postgresql.o 124 [ + + ]:GNC 221 : if (dval < 0)
181 akorotkov@postgresql 125 [ + - ]: 1 : ereport(ERROR,
126 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 : : errmsg("timeout cannot be negative"));
128 : :
12 drowley@postgresql.o 129 : 220 : timeout = (int64) dval;
130 : : }
181 akorotkov@postgresql 131 [ + + ]: 196 : else if (strcmp(defel->defname, "no_throw") == 0)
132 : : {
133 [ + + ]: 194 : if (no_throw_specified)
134 : 1 : errorConflictingDefElem(defel, pstate);
135 : :
136 : 193 : no_throw_specified = true;
137 : :
138 : 193 : throw = !defGetBoolean(defel);
139 : : }
140 : : else
141 : : {
142 [ + - ]: 2 : ereport(ERROR,
143 : : errcode(ERRCODE_SYNTAX_ERROR),
144 : : errmsg("option \"%s\" not recognized",
145 : : defel->defname),
146 : : parser_errposition(pstate, defel->location));
147 : : }
148 : : }
149 : :
150 : : /*
151 : : * We are going to wait for the LSN. We should first care that we don't
152 : : * hold a snapshot and correspondingly our MyProc->xmin is invalid.
153 : : * Otherwise, our snapshot could prevent the replay of WAL records
154 : : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
155 : : * command, not a procedure or function.
156 : : *
157 : : * Non-top-level contexts are rejected above, but be defensive and pop any
158 : : * active snapshot if one is present. PortalRunUtility() can tolerate
159 : : * utility commands that remove the active snapshot.
160 : : */
161 [ - + ]: 232 : if (ActiveSnapshotSet())
181 akorotkov@postgresql 162 :UNC 0 : PopActiveSnapshot();
163 : :
164 : : /*
165 : : * At second, invalidate a catalog snapshot if any. And we should be done
166 : : * with the preparation.
167 : : */
181 akorotkov@postgresql 168 :GNC 232 : InvalidateCatalogSnapshot();
169 : :
170 : : /* Give up if there is still an active or registered snapshot. */
171 [ + + ]: 232 : if (HaveRegisteredOrActiveSnapshot())
172 [ + - ]: 1 : ereport(ERROR,
173 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
174 : : errmsg("WAIT FOR must be called without an active or registered snapshot"),
175 : : errdetail("WAIT FOR cannot be executed within a transaction with an isolation level higher than READ COMMITTED."));
176 : :
177 : : /*
178 : : * As the result we should hold no snapshot, and correspondingly our xmin
179 : : * should be unset.
180 : : */
181 [ - + ]: 231 : Assert(MyProc->xmin == InvalidTransactionId);
182 : :
183 : : /*
184 : : * Validate that the requested mode matches the current server state.
185 : : * Primary modes can only be used on a primary.
186 : : */
120 187 [ + + ]: 231 : if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
188 : : {
189 [ + + ]: 7 : if (RecoveryInProgress())
190 [ + - ]: 1 : ereport(ERROR,
191 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 : : errmsg("recovery is in progress"),
193 : : errhint("Waiting for primary_flush can only be done on a primary server. "
194 : : "Use standby_flush mode on a standby server.")));
195 : : }
196 : :
197 : : /* Now wait for the LSN */
198 : 230 : waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
199 : :
200 : : /*
201 : : * Process the result of WaitForLSN(). Throw appropriate error if needed.
202 : : */
181 203 [ + + + - ]: 230 : switch (waitLSNResult)
204 : : {
205 : 220 : case WAIT_LSN_RESULT_SUCCESS:
206 : : /* Nothing to do on success */
207 : 220 : result = "success";
208 : 220 : break;
209 : :
210 : 5 : case WAIT_LSN_RESULT_TIMEOUT:
211 [ + + ]: 5 : if (throw)
212 : : {
120 213 : 1 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
214 : :
215 [ + - - - : 1 : switch (lsnType)
- ]
216 : : {
217 : 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
218 [ + - ]: 1 : ereport(ERROR,
219 : : errcode(ERRCODE_QUERY_CANCELED),
220 : : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
221 : : LSN_FORMAT_ARGS(lsn),
222 : : LSN_FORMAT_ARGS(currentLSN)));
223 : : break;
224 : :
120 akorotkov@postgresql 225 :UNC 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
226 [ # # ]: 0 : ereport(ERROR,
227 : : errcode(ERRCODE_QUERY_CANCELED),
228 : : errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
229 : : LSN_FORMAT_ARGS(lsn),
230 : : LSN_FORMAT_ARGS(currentLSN)));
231 : : break;
232 : :
233 : 0 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
234 [ # # ]: 0 : ereport(ERROR,
235 : : errcode(ERRCODE_QUERY_CANCELED),
236 : : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
237 : : LSN_FORMAT_ARGS(lsn),
238 : : LSN_FORMAT_ARGS(currentLSN)));
239 : : break;
240 : :
241 : 0 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
242 [ # # ]: 0 : ereport(ERROR,
243 : : errcode(ERRCODE_QUERY_CANCELED),
244 : : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
245 : : LSN_FORMAT_ARGS(lsn),
246 : : LSN_FORMAT_ARGS(currentLSN)));
247 : : break;
248 : :
249 : 0 : default:
250 [ # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
251 : : }
252 : : }
253 : : else
181 akorotkov@postgresql 254 :GNC 4 : result = "timeout";
255 : 4 : break;
256 : :
257 : 5 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
258 [ + + ]: 5 : if (throw)
259 : : {
260 [ + + ]: 4 : if (PromoteIsTriggered())
261 : : {
120 262 : 3 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
263 : :
264 [ + + + - ]: 3 : switch (lsnType)
265 : : {
266 : 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
267 [ + - ]: 1 : ereport(ERROR,
268 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
269 : : errmsg("recovery is not in progress"),
270 : : errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
271 : : LSN_FORMAT_ARGS(lsn),
272 : : LSN_FORMAT_ARGS(currentLSN)));
273 : : break;
274 : :
275 : 1 : case WAIT_LSN_TYPE_STANDBY_WRITE:
276 [ + - ]: 1 : ereport(ERROR,
277 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
278 : : errmsg("recovery is not in progress"),
279 : : errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
280 : : LSN_FORMAT_ARGS(lsn),
281 : : LSN_FORMAT_ARGS(currentLSN)));
282 : : break;
283 : :
284 : 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
285 [ + - ]: 1 : ereport(ERROR,
286 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
287 : : errmsg("recovery is not in progress"),
288 : : errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
289 : : LSN_FORMAT_ARGS(lsn),
290 : : LSN_FORMAT_ARGS(currentLSN)));
291 : : break;
292 : :
120 akorotkov@postgresql 293 :UNC 0 : default:
294 [ # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
295 : : }
296 : : }
297 : : else
298 : : {
120 akorotkov@postgresql 299 [ - - + - ]:GNC 1 : switch (lsnType)
300 : : {
120 akorotkov@postgresql 301 :UNC 0 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
302 [ # # ]: 0 : ereport(ERROR,
303 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
304 : : errmsg("recovery is not in progress"),
305 : : errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
306 : : break;
307 : :
308 : 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
309 [ # # ]: 0 : ereport(ERROR,
310 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
311 : : errmsg("recovery is not in progress"),
312 : : errhint("Waiting for the standby_write LSN can only be executed during recovery."));
313 : : break;
314 : :
120 akorotkov@postgresql 315 :GNC 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
316 [ + - ]: 1 : ereport(ERROR,
317 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318 : : errmsg("recovery is not in progress"),
319 : : errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
320 : : break;
321 : :
120 akorotkov@postgresql 322 :UNC 0 : default:
323 [ # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
324 : : }
325 : : }
326 : : }
327 : : else
181 akorotkov@postgresql 328 :GNC 1 : result = "not in recovery";
329 : 1 : break;
330 : : }
331 : :
332 : : /* need a tuple descriptor representing a single TEXT column */
333 : 225 : tupdesc = WaitStmtResultDesc(stmt);
334 : :
335 : : /* prepare for projection of tuples */
336 : 225 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
337 : :
338 : : /* Send it */
339 : 225 : do_text_output_oneline(tstate, result);
340 : :
341 : 225 : end_tup_output(tstate);
342 : 225 : }
343 : :
344 : : TupleDesc
345 : 469 : WaitStmtResultDesc(WaitStmt *stmt)
346 : : {
347 : : TupleDesc tupdesc;
348 : :
349 : : /*
350 : : * Need a tuple descriptor representing a single TEXT column.
351 : : *
352 : : * We use TupleDescInitBuiltinEntry instead of TupleDescInitEntry to avoid
353 : : * syscache access. This is important because WaitStmtResultDesc may be
354 : : * called after snapshots have been released, and we must not re-establish
355 : : * a catalog snapshot which could cause recovery conflicts on a standby.
356 : : */
357 : 469 : tupdesc = CreateTemplateTupleDesc(1);
29 358 : 469 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "status",
359 : : TEXTOID, -1, 0);
50 drowley@postgresql.o 360 : 469 : TupleDescFinalize(tupdesc);
181 akorotkov@postgresql 361 : 469 : return tupdesc;
362 : : }
|