Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * wait.c
4 : : * Implements WAIT FOR, which allows waiting for events such as
5 : : * time passing or LSN having been replayed on replica.
6 : : *
7 : : * Portions Copyright (c) 2025, PostgreSQL Global Development Group
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/wait.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : : #include "postgres.h"
15 : :
16 : : #include <math.h>
17 : :
18 : : #include "access/xlogrecovery.h"
19 : : #include "access/xlogwait.h"
20 : : #include "commands/defrem.h"
21 : : #include "commands/wait.h"
22 : : #include "executor/executor.h"
23 : : #include "parser/parse_node.h"
24 : : #include "storage/proc.h"
25 : : #include "utils/builtins.h"
26 : : #include "utils/guc.h"
27 : : #include "utils/pg_lsn.h"
28 : : #include "utils/snapmgr.h"
29 : :
30 : :
31 : : void
41 akorotkov@postgresql 32 :GNC 26 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
33 : : {
34 : : XLogRecPtr lsn;
35 : 26 : int64 timeout = 0;
36 : : WaitLSNResult waitLSNResult;
37 : 26 : bool throw = true;
38 : : TupleDesc tupdesc;
39 : : TupOutputState *tstate;
40 : 26 : const char *result = "<unset>";
41 : 26 : bool timeout_specified = false;
42 : 26 : bool no_throw_specified = false;
43 : :
44 : : /* Parse and validate the mandatory LSN */
45 : 26 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
46 : : CStringGetDatum(stmt->lsn_literal)));
47 : :
48 [ + + + + : 59 : foreach_node(DefElem, defel, stmt->options)
+ + ]
49 : : {
50 [ + + ]: 21 : if (strcmp(defel->defname, "timeout") == 0)
51 : : {
52 : : char *timeout_str;
53 : : const char *hintmsg;
54 : : double result;
55 : :
56 [ + + ]: 12 : if (timeout_specified)
57 : 1 : errorConflictingDefElem(defel, pstate);
58 : 11 : timeout_specified = true;
59 : :
60 : 11 : timeout_str = defGetString(defel);
61 : :
62 [ + + ]: 11 : if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
63 : : {
64 [ + - - + ]: 1 : ereport(ERROR,
65 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : : errmsg("invalid timeout value: \"%s\"", timeout_str),
67 : : hintmsg ? errhint("%s", _(hintmsg)) : 0);
68 : : }
69 : :
70 : : /*
71 : : * Get rid of any fractional part in the input. This is so we
72 : : * don't fail on just-out-of-range values that would round into
73 : : * range.
74 : : */
75 : 10 : result = rint(result);
76 : :
77 : : /* Range check */
78 [ + - + - : 10 : if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
- + - + ]
41 akorotkov@postgresql 79 [ # # ]:UNC 0 : ereport(ERROR,
80 : : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
81 : : errmsg("timeout value is out of range"));
82 : :
41 akorotkov@postgresql 83 [ + + ]:GNC 10 : if (result < 0)
84 [ + - ]: 1 : ereport(ERROR,
85 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
86 : : errmsg("timeout cannot be negative"));
87 : :
88 : 9 : timeout = (int64) result;
89 : : }
90 [ + + ]: 9 : else if (strcmp(defel->defname, "no_throw") == 0)
91 : : {
92 [ + + ]: 7 : if (no_throw_specified)
93 : 1 : errorConflictingDefElem(defel, pstate);
94 : :
95 : 6 : no_throw_specified = true;
96 : :
97 : 6 : throw = !defGetBoolean(defel);
98 : : }
99 : : else
100 : : {
101 [ + - ]: 2 : ereport(ERROR,
102 : : errcode(ERRCODE_SYNTAX_ERROR),
103 : : errmsg("option \"%s\" not recognized",
104 : : defel->defname),
105 : : parser_errposition(pstate, defel->location));
106 : : }
107 : : }
108 : :
109 : : /*
110 : : * We are going to wait for the LSN replay. We should first care that we
111 : : * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
112 : : * Otherwise, our snapshot could prevent the replay of WAL records
113 : : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
114 : : * command, not a procedure or function.
115 : : *
116 : : * At first, we should check there is no active snapshot. According to
117 : : * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
118 : : * processed with a snapshot. Thankfully, we can pop this snapshot,
119 : : * because PortalRunUtility() can tolerate this.
120 : : */
121 [ + + ]: 19 : if (ActiveSnapshotSet())
122 : 1 : PopActiveSnapshot();
123 : :
124 : : /*
125 : : * At second, invalidate a catalog snapshot if any. And we should be done
126 : : * with the preparation.
127 : : */
128 : 19 : InvalidateCatalogSnapshot();
129 : :
130 : : /* Give up if there is still an active or registered snapshot. */
131 [ + + ]: 19 : if (HaveRegisteredOrActiveSnapshot())
132 [ + - ]: 2 : ereport(ERROR,
133 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
134 : : errmsg("WAIT FOR must be only called without an active or registered snapshot"),
135 : : errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
136 : :
137 : : /*
138 : : * As the result we should hold no snapshot, and correspondingly our xmin
139 : : * should be unset.
140 : : */
141 [ - + ]: 17 : Assert(MyProc->xmin == InvalidTransactionId);
142 : :
143 : 17 : waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
144 : :
145 : : /*
146 : : * Process the result of WaitForLSN(). Throw appropriate error if needed.
147 : : */
148 [ + + + - ]: 17 : switch (waitLSNResult)
149 : : {
150 : 11 : case WAIT_LSN_RESULT_SUCCESS:
151 : : /* Nothing to do on success */
152 : 11 : result = "success";
153 : 11 : break;
154 : :
155 : 3 : case WAIT_LSN_RESULT_TIMEOUT:
156 [ + + ]: 3 : if (throw)
157 [ + - ]: 1 : ereport(ERROR,
158 : : errcode(ERRCODE_QUERY_CANCELED),
159 : : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
160 : : LSN_FORMAT_ARGS(lsn),
161 : : LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
162 : : else
163 : 2 : result = "timeout";
164 : 2 : break;
165 : :
166 : 3 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
167 [ + + ]: 3 : if (throw)
168 : : {
169 [ + + ]: 2 : if (PromoteIsTriggered())
170 : : {
171 [ + - ]: 1 : ereport(ERROR,
172 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
173 : : errmsg("recovery is not in progress"),
174 : : errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
175 : : LSN_FORMAT_ARGS(lsn),
176 : : LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
177 : : }
178 : : else
179 [ + - ]: 1 : ereport(ERROR,
180 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
181 : : errmsg("recovery is not in progress"),
182 : : errhint("Waiting for the replay LSN can only be executed during recovery."));
183 : : }
184 : : else
185 : 1 : result = "not in recovery";
186 : 1 : break;
187 : : }
188 : :
189 : : /* need a tuple descriptor representing a single TEXT column */
190 : 14 : tupdesc = WaitStmtResultDesc(stmt);
191 : :
192 : : /* prepare for projection of tuples */
193 : 14 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
194 : :
195 : : /* Send it */
196 : 14 : do_text_output_oneline(tstate, result);
197 : :
198 : 14 : end_tup_output(tstate);
199 : 14 : }
200 : :
201 : : TupleDesc
202 : 40 : WaitStmtResultDesc(WaitStmt *stmt)
203 : : {
204 : : TupleDesc tupdesc;
205 : :
206 : : /* Need a tuple descriptor representing a single TEXT column */
207 : 40 : tupdesc = CreateTemplateTupleDesc(1);
208 : 40 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
209 : : TEXTOID, -1, 0);
210 : 40 : return tupdesc;
211 : : }
|