Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * logicalfuncs.c
4 : : *
5 : : * Support functions for using logical decoding and management of
6 : : * logical replication slots via SQL.
7 : : *
8 : : *
9 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
10 : : *
11 : : * IDENTIFICATION
12 : : * src/backend/replication/logical/logicalfuncs.c
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres.h"
17 : :
18 : : #include <unistd.h>
19 : :
20 : : #include "access/xlogrecovery.h"
21 : : #include "access/xlogutils.h"
22 : : #include "catalog/pg_type.h"
23 : : #include "fmgr.h"
24 : : #include "funcapi.h"
25 : : #include "mb/pg_wchar.h"
26 : : #include "miscadmin.h"
27 : : #include "nodes/makefuncs.h"
28 : : #include "replication/decode.h"
29 : : #include "replication/logical.h"
30 : : #include "replication/message.h"
31 : : #include "utils/array.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/inval.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/pg_lsn.h"
36 : : #include "utils/regproc.h"
37 : : #include "utils/resowner.h"
38 : :
39 : : /* Private data for writing out data */
40 : : typedef struct DecodingOutputState
41 : : {
42 : : Tuplestorestate *tupstore;
43 : : TupleDesc tupdesc;
44 : : bool binary_output;
45 : : int64 returned_rows;
46 : : } DecodingOutputState;
47 : :
48 : : /*
49 : : * Prepare for an output plugin write.
50 : : */
51 : : static void
4257 rhaas@postgresql.org 52 :CBC 151166 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
53 : : bool last_write)
54 : : {
55 : 151166 : resetStringInfo(ctx->out);
56 : 151166 : }
57 : :
58 : : /*
59 : : * Perform output plugin write into tuplestore.
60 : : */
61 : : static void
62 : 151166 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
63 : : bool last_write)
64 : : {
65 : : Datum values[3];
66 : : bool nulls[3];
67 : : DecodingOutputState *p;
68 : :
69 : : /* SQL Datums can only be of a limited length... */
70 [ - + ]: 151166 : if (ctx->out->len > MaxAllocSize - VARHDRSZ)
4257 rhaas@postgresql.org 71 [ # # ]:UBC 0 : elog(ERROR, "too much output for sql interface");
72 : :
4257 rhaas@postgresql.org 73 :CBC 151166 : p = (DecodingOutputState *) ctx->output_writer_private;
74 : :
75 : 151166 : memset(nulls, 0, sizeof(nulls));
76 : 151166 : values[0] = LSNGetDatum(lsn);
77 : 151166 : values[1] = TransactionIdGetDatum(xid);
78 : :
79 : : /*
80 : : * Assert ctx->out is in database encoding when we're writing textual
81 : : * output.
82 : : */
83 [ + + ]: 151166 : if (!p->binary_output)
84 [ - + ]: 151130 : Assert(pg_verify_mbstr(GetDatabaseEncoding(),
85 : : ctx->out->data, ctx->out->len,
86 : : false));
87 : :
88 : : /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
2098 alvherre@alvh.no-ip. 89 : 151166 : values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
90 : :
4257 rhaas@postgresql.org 91 : 151166 : tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
92 : 151166 : p->returned_rows++;
93 : 151166 : }
94 : :
95 : : /*
96 : : * Helper function for the various SQL callable logical decoding functions.
97 : : */
98 : : static Datum
99 : 209 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
100 : : {
101 : : Name name;
102 : : XLogRecPtr upto_lsn;
103 : : int32 upto_nchanges;
104 : 209 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
105 : : MemoryContext per_query_ctx;
106 : : MemoryContext oldcontext;
107 : : XLogRecPtr end_of_wal;
108 : : XLogRecPtr wait_for_wal_lsn;
109 : : LogicalDecodingContext *ctx;
46 alvherre@kurilemu.de 110 :GNC 209 : ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
111 : : ArrayType *arr;
112 : : Size ndim;
4257 rhaas@postgresql.org 113 :CBC 209 : List *options = NIL;
114 : : DecodingOutputState *p;
115 : :
1505 michael@paquier.xyz 116 : 209 : CheckSlotPermissions();
117 : :
3580 tgl@sss.pgh.pa.us 118 : 208 : CheckLogicalDecodingRequirements();
119 : :
120 [ - + ]: 208 : if (PG_ARGISNULL(0))
3580 tgl@sss.pgh.pa.us 121 [ # # ]:UBC 0 : ereport(ERROR,
122 : : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
123 : : errmsg("slot name must not be null")));
3580 tgl@sss.pgh.pa.us 124 :CBC 208 : name = PG_GETARG_NAME(0);
125 : :
4257 rhaas@postgresql.org 126 [ + - ]: 208 : if (PG_ARGISNULL(1))
127 : 208 : upto_lsn = InvalidXLogRecPtr;
128 : : else
4257 rhaas@postgresql.org 129 :UBC 0 : upto_lsn = PG_GETARG_LSN(1);
130 : :
4257 rhaas@postgresql.org 131 [ + - ]:CBC 208 : if (PG_ARGISNULL(2))
132 : 208 : upto_nchanges = InvalidXLogRecPtr;
133 : : else
4257 rhaas@postgresql.org 134 :UBC 0 : upto_nchanges = PG_GETARG_INT32(2);
135 : :
3580 tgl@sss.pgh.pa.us 136 [ - + ]:CBC 208 : if (PG_ARGISNULL(3))
3580 tgl@sss.pgh.pa.us 137 [ # # ]:UBC 0 : ereport(ERROR,
138 : : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
139 : : errmsg("options array must not be null")));
3580 tgl@sss.pgh.pa.us 140 :CBC 208 : arr = PG_GETARG_ARRAYTYPE_P(3);
141 : :
142 : : /* state to write output to */
4257 rhaas@postgresql.org 143 : 208 : p = palloc0(sizeof(DecodingOutputState));
144 : :
145 : 208 : p->binary_output = binary;
146 : :
147 : 208 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
148 : 208 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
149 : :
150 : : /* Deconstruct options array */
3580 tgl@sss.pgh.pa.us 151 : 208 : ndim = ARR_NDIM(arr);
4257 rhaas@postgresql.org 152 [ - + ]: 208 : if (ndim > 1)
153 : : {
4257 rhaas@postgresql.org 154 [ # # ]:UBC 0 : ereport(ERROR,
155 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
156 : : errmsg("array must be one-dimensional")));
157 : : }
4257 rhaas@postgresql.org 158 [ - + ]:CBC 208 : else if (array_contains_nulls(arr))
159 : : {
4257 rhaas@postgresql.org 160 [ # # ]:UBC 0 : ereport(ERROR,
161 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
162 : : errmsg("array must not contain nulls")));
163 : : }
4257 rhaas@postgresql.org 164 [ + + ]:CBC 208 : else if (ndim == 1)
165 : : {
166 : : int nelems;
167 : : Datum *datum_opts;
168 : : int i;
169 : :
170 [ - + ]: 183 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
171 : :
1215 peter@eisentraut.org 172 : 183 : deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
173 : :
4257 rhaas@postgresql.org 174 [ - + ]: 183 : if (nelems % 2 != 0)
4257 rhaas@postgresql.org 175 [ # # ]:UBC 0 : ereport(ERROR,
176 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
177 : : errmsg("array must have even number of elements")));
178 : :
4257 rhaas@postgresql.org 179 [ + + ]:CBC 555 : for (i = 0; i < nelems; i += 2)
180 : : {
789 michael@paquier.xyz 181 : 372 : char *optname = TextDatumGetCString(datum_opts[i]);
4257 rhaas@postgresql.org 182 : 372 : char *opt = TextDatumGetCString(datum_opts[i + 1]);
183 : :
789 michael@paquier.xyz 184 : 372 : options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
185 : : }
186 : : }
187 : :
1106 188 : 208 : InitMaterializedSRF(fcinfo, 0);
1331 189 : 208 : p->tupstore = rsinfo->setResult;
190 : 208 : p->tupdesc = rsinfo->setDesc;
191 : :
192 : : /*
193 : : * Compute the current end-of-wal.
194 : : */
3464 alvherre@alvh.no-ip. 195 [ + + ]: 208 : if (!RecoveryInProgress())
1453 rhaas@postgresql.org 196 : 207 : end_of_wal = GetFlushRecPtr(NULL);
197 : : else
198 : 1 : end_of_wal = GetXLogReplayRecPtr(NULL);
199 : :
270 akapila@postgresql.o 200 : 208 : ReplicationSlotAcquire(NameStr(*name), true, true);
201 : :
4257 rhaas@postgresql.org 202 [ + + ]: 207 : PG_TRY();
203 : : {
204 : : /* restart at slot's confirmed_flush */
205 : 408 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
206 : : options,
207 : : false,
1632 tmunro@postgresql.or 208 : 207 : XL_ROUTINE(.page_read = read_local_xlog_page,
209 : : .segment_open = wal_segment_open,
210 : : .segment_close = wal_segment_close),
211 : : LogicalOutputPrepareWrite,
212 : : LogicalOutputWrite, NULL);
213 : :
4257 rhaas@postgresql.org 214 : 201 : MemoryContextSwitchTo(oldcontext);
215 : :
216 : : /*
217 : : * Check whether the output plugin writes textual output if that's
218 : : * what we need.
219 : : */
220 [ + + ]: 201 : if (!binary &&
3811 bruce@momjian.us 221 [ + + ]: 187 : ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
4257 rhaas@postgresql.org 222 [ + - ]: 1 : ereport(ERROR,
223 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
224 : : errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
225 : : NameStr(MyReplicationSlot->data.plugin),
226 : : format_procedure(fcinfo->flinfo->fn_oid))));
227 : :
228 : : /*
229 : : * Wait for specified streaming replication standby servers (if any)
230 : : * to confirm receipt of WAL up to wait_for_wal_lsn.
231 : : */
599 akapila@postgresql.o 232 [ + - ]: 200 : if (XLogRecPtrIsInvalid(upto_lsn))
233 : 200 : wait_for_wal_lsn = end_of_wal;
234 : : else
599 akapila@postgresql.o 235 :UBC 0 : wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
236 : :
599 akapila@postgresql.o 237 :CBC 200 : WaitForStandbyConfirmation(wait_for_wal_lsn);
238 : :
4257 rhaas@postgresql.org 239 : 200 : ctx->output_writer_private = p;
240 : :
241 : : /*
242 : : * Decoding of WAL must start at restart_lsn so that the entirety of
243 : : * xacts that committed after the slot's confirmed_flush can be
244 : : * accumulated into reorder buffers.
245 : : */
2102 heikki.linnakangas@i 246 : 200 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
247 : :
248 : : /* invalidate non-timetravel entries */
4257 rhaas@postgresql.org 249 : 200 : InvalidateSystemCaches();
250 : :
251 : : /* Decode until we run out of records */
2102 heikki.linnakangas@i 252 [ + + ]: 1493569 : while (ctx->reader->EndRecPtr < end_of_wal)
253 : : {
254 : : XLogRecord *record;
4257 rhaas@postgresql.org 255 : 1493369 : char *errm = NULL;
256 : :
1632 tmunro@postgresql.or 257 : 1493369 : record = XLogReadRecord(ctx->reader, &errm);
4257 rhaas@postgresql.org 258 [ - + ]: 1493369 : if (errm)
1448 michael@paquier.xyz 259 [ # # ]:UBC 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
260 : :
261 : : /*
262 : : * The {begin_txn,change,commit_txn}_wrapper callbacks above will
263 : : * store the description into our tuplestore.
264 : : */
4257 rhaas@postgresql.org 265 [ + - ]:CBC 1493369 : if (record != NULL)
266 : : {
3995 heikki.linnakangas@i 267 : 1493369 : LogicalDecodingProcessRecord(ctx, ctx->reader);
268 : :
269 : : /*
270 : : * We used to have bugs where logical decoding would fail to
271 : : * preserve the resource owner. Verify that that doesn't
272 : : * happen anymore. XXX this could be removed once it's been
273 : : * battle-tested.
274 : : */
46 alvherre@kurilemu.de 275 [ - + ]:GNC 1493369 : Assert(CurrentResourceOwner == old_resowner);
276 : : }
277 : :
278 : : /* check limits */
4257 rhaas@postgresql.org 279 [ - + ]:CBC 1493369 : if (upto_lsn != InvalidXLogRecPtr &&
4257 rhaas@postgresql.org 280 [ # # ]:UBC 0 : upto_lsn <= ctx->reader->EndRecPtr)
281 : 0 : break;
4257 rhaas@postgresql.org 282 [ - + ]:CBC 1493369 : if (upto_nchanges != 0 &&
4257 rhaas@postgresql.org 283 [ # # ]:UBC 0 : upto_nchanges <= p->returned_rows)
284 : 0 : break;
4139 andres@anarazel.de 285 [ - + ]:CBC 1493369 : CHECK_FOR_INTERRUPTS();
286 : : }
287 : :
288 : : /*
289 : : * Next time, start where we left off. (Hunting things, the family
290 : : * business..)
291 : : */
3580 tgl@sss.pgh.pa.us 292 [ + - + + ]: 200 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
293 : : {
294 : 183 : LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
295 : :
296 : : /*
297 : : * If only the confirmed_flush_lsn has changed the slot won't get
298 : : * marked as dirty by the above. Callers on the walsender
299 : : * interface are expected to keep track of their own progress and
300 : : * don't need it written out. But SQL-interface users cannot
301 : : * specify their own start positions and it's harder for them to
302 : : * keep track of their progress, so we should make more of an
303 : : * effort to save it for them.
304 : : *
305 : : * Dirty the slot so it's written out at the next checkpoint.
306 : : * We'll still lose its position on crash, as documented, but it's
307 : : * better than always losing the position even on clean restart.
308 : : */
3340 simon@2ndQuadrant.co 309 : 183 : ReplicationSlotMarkDirty();
310 : : }
311 : :
312 : : /* free context, call shutdown callback */
3580 tgl@sss.pgh.pa.us 313 : 200 : FreeDecodingContext(ctx);
314 : :
315 : 200 : ReplicationSlotRelease();
316 : 200 : InvalidateSystemCaches();
317 : : }
4257 rhaas@postgresql.org 318 : 7 : PG_CATCH();
319 : : {
320 : : /* clear all timetravel entries */
321 : 7 : InvalidateSystemCaches();
322 : :
323 : 7 : PG_RE_THROW();
324 : : }
325 [ - + ]: 200 : PG_END_TRY();
326 : :
327 : 200 : return (Datum) 0;
328 : : }
329 : :
330 : : /*
331 : : * SQL function returning the changestream as text, consuming the data.
332 : : */
333 : : Datum
334 : 184 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
335 : : {
3580 tgl@sss.pgh.pa.us 336 : 184 : return pg_logical_slot_get_changes_guts(fcinfo, true, false);
337 : : }
338 : :
339 : : /*
340 : : * SQL function returning the changestream as text, only peeking ahead.
341 : : */
342 : : Datum
4257 rhaas@postgresql.org 343 : 11 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
344 : : {
3580 tgl@sss.pgh.pa.us 345 : 11 : return pg_logical_slot_get_changes_guts(fcinfo, false, false);
346 : : }
347 : :
348 : : /*
349 : : * SQL function returning the changestream in binary, consuming the data.
350 : : */
351 : : Datum
4257 rhaas@postgresql.org 352 : 6 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
353 : : {
3580 tgl@sss.pgh.pa.us 354 : 6 : return pg_logical_slot_get_changes_guts(fcinfo, true, true);
355 : : }
356 : :
357 : : /*
358 : : * SQL function returning the changestream in binary, only peeking ahead.
359 : : */
360 : : Datum
4257 rhaas@postgresql.org 361 : 8 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
362 : : {
3580 tgl@sss.pgh.pa.us 363 : 8 : return pg_logical_slot_get_changes_guts(fcinfo, false, true);
364 : : }
365 : :
366 : :
367 : : /*
368 : : * SQL function for writing logical decoding message into WAL.
369 : : */
370 : : Datum
3492 simon@2ndQuadrant.co 371 : 114 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
372 : : {
373 : 114 : bool transactional = PG_GETARG_BOOL(0);
374 : 114 : char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
375 : 114 : bytea *data = PG_GETARG_BYTEA_PP(2);
741 michael@paquier.xyz 376 : 114 : bool flush = PG_GETARG_BOOL(3);
377 : : XLogRecPtr lsn;
378 : :
3492 simon@2ndQuadrant.co 379 [ - + - - : 114 : lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
- - - - -
+ - + ]
380 : : transactional, flush);
381 : 114 : PG_RETURN_LSN(lsn);
382 : : }
383 : :
384 : : Datum
385 : 114 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
386 : : {
387 : : /* bytea and text are compatible */
388 : 114 : return pg_logical_emit_message_bytea(fcinfo);
389 : : }
|