Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * test_decoding.c
4 : : * example logical decoding output plugin
5 : : *
6 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * contrib/test_decoding/test_decoding.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "catalog/pg_type.h"
16 : :
17 : : #include "replication/logical.h"
18 : : #include "replication/origin.h"
19 : :
20 : : #include "utils/builtins.h"
21 : : #include "utils/lsyscache.h"
22 : : #include "utils/memutils.h"
23 : : #include "utils/rel.h"
24 : :
164 tgl@sss.pgh.pa.us 25 :CBC 76 : PG_MODULE_MAGIC_EXT(
26 : : .name = "test_decoding",
27 : : .version = PG_VERSION
28 : : );
29 : :
30 : : typedef struct
31 : : {
32 : : MemoryContext context;
33 : : bool include_xids;
34 : : bool include_timestamp;
35 : : bool skip_empty_xacts;
36 : : bool only_local;
37 : : } TestDecodingData;
38 : :
39 : : /*
40 : : * Maintain the per-transaction level variables to track whether the
41 : : * transaction and or streams have written any changes. In streaming mode the
42 : : * transaction can be decoded in streams so along with maintaining whether the
43 : : * transaction has written any changes, we also need to track whether the
44 : : * current stream has written any changes. This is required so that if user
45 : : * has requested to skip the empty transactions we can skip the empty streams
46 : : * even though the transaction has written some changes.
47 : : */
48 : : typedef struct
49 : : {
50 : : bool xact_wrote_changes;
51 : : bool stream_wrote_changes;
52 : : } TestDecodingTxnData;
53 : :
54 : : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
55 : : bool is_init);
56 : : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
57 : : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
58 : : ReorderBufferTXN *txn);
59 : : static void pg_output_begin(LogicalDecodingContext *ctx,
60 : : TestDecodingData *data,
61 : : ReorderBufferTXN *txn,
62 : : bool last_write);
63 : : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
64 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : : static void pg_decode_change(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn, Relation relation,
67 : : ReorderBufferChange *change);
68 : : static void pg_decode_truncate(LogicalDecodingContext *ctx,
69 : : ReorderBufferTXN *txn,
70 : : int nrelations, Relation relations[],
71 : : ReorderBufferChange *change);
72 : : static bool pg_decode_filter(LogicalDecodingContext *ctx,
73 : : RepOriginId origin_id);
74 : : static void pg_decode_message(LogicalDecodingContext *ctx,
75 : : ReorderBufferTXN *txn, XLogRecPtr lsn,
76 : : bool transactional, const char *prefix,
77 : : Size sz, const char *message);
78 : : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
79 : : TransactionId xid,
80 : : const char *gid);
81 : : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
82 : : ReorderBufferTXN *txn);
83 : : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
84 : : ReorderBufferTXN *txn,
85 : : XLogRecPtr prepare_lsn);
86 : : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
87 : : ReorderBufferTXN *txn,
88 : : XLogRecPtr commit_lsn);
89 : : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
90 : : ReorderBufferTXN *txn,
91 : : XLogRecPtr prepare_end_lsn,
92 : : TimestampTz prepare_time);
93 : : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
94 : : ReorderBufferTXN *txn);
95 : : static void pg_output_stream_start(LogicalDecodingContext *ctx,
96 : : TestDecodingData *data,
97 : : ReorderBufferTXN *txn,
98 : : bool last_write);
99 : : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
100 : : ReorderBufferTXN *txn);
101 : : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
102 : : ReorderBufferTXN *txn,
103 : : XLogRecPtr abort_lsn);
104 : : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
105 : : ReorderBufferTXN *txn,
106 : : XLogRecPtr prepare_lsn);
107 : : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
108 : : ReorderBufferTXN *txn,
109 : : XLogRecPtr commit_lsn);
110 : : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
111 : : ReorderBufferTXN *txn,
112 : : Relation relation,
113 : : ReorderBufferChange *change);
114 : : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
115 : : ReorderBufferTXN *txn, XLogRecPtr lsn,
116 : : bool transactional, const char *prefix,
117 : : Size sz, const char *message);
118 : : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
119 : : ReorderBufferTXN *txn,
120 : : int nrelations, Relation relations[],
121 : : ReorderBufferChange *change);
122 : :
123 : : void
4205 rhaas@postgresql.org 124 : 76 : _PG_init(void)
125 : : {
126 : : /* other plugins can perform things here */
127 : 76 : }
128 : :
129 : : /* specify output plugin callbacks */
130 : : void
131 : 305 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
132 : : {
133 : 305 : cb->startup_cb = pg_decode_startup;
134 : 305 : cb->begin_cb = pg_decode_begin_txn;
135 : 305 : cb->change_cb = pg_decode_change;
2709 peter_e@gmx.net 136 : 305 : cb->truncate_cb = pg_decode_truncate;
4205 rhaas@postgresql.org 137 : 305 : cb->commit_cb = pg_decode_commit_txn;
3783 andres@anarazel.de 138 : 305 : cb->filter_by_origin_cb = pg_decode_filter;
4205 rhaas@postgresql.org 139 : 305 : cb->shutdown_cb = pg_decode_shutdown;
3440 simon@2ndQuadrant.co 140 : 305 : cb->message_cb = pg_decode_message;
1711 akapila@postgresql.o 141 : 305 : cb->filter_prepare_cb = pg_decode_filter_prepare;
142 : 305 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
143 : 305 : cb->prepare_cb = pg_decode_prepare_txn;
144 : 305 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
145 : 305 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
1866 146 : 305 : cb->stream_start_cb = pg_decode_stream_start;
147 : 305 : cb->stream_stop_cb = pg_decode_stream_stop;
148 : 305 : cb->stream_abort_cb = pg_decode_stream_abort;
1711 149 : 305 : cb->stream_prepare_cb = pg_decode_stream_prepare;
1866 150 : 305 : cb->stream_commit_cb = pg_decode_stream_commit;
151 : 305 : cb->stream_change_cb = pg_decode_stream_change;
152 : 305 : cb->stream_message_cb = pg_decode_stream_message;
153 : 305 : cb->stream_truncate_cb = pg_decode_stream_truncate;
4205 rhaas@postgresql.org 154 : 305 : }
155 : :
156 : :
157 : : /* initialize this plugin */
158 : : static void
159 : 305 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
160 : : bool is_init)
161 : : {
162 : : ListCell *option;
163 : : TestDecodingData *data;
1855 akapila@postgresql.o 164 : 305 : bool enable_streaming = false;
165 : :
4023 andres@anarazel.de 166 : 305 : data = palloc0(sizeof(TestDecodingData));
4205 rhaas@postgresql.org 167 : 305 : data->context = AllocSetContextCreate(ctx->context,
168 : : "text conversion context",
169 : : ALLOCSET_DEFAULT_SIZES);
170 : 305 : data->include_xids = true;
171 : 305 : data->include_timestamp = false;
4023 andres@anarazel.de 172 : 305 : data->skip_empty_xacts = false;
3783 173 : 305 : data->only_local = false;
174 : :
4205 rhaas@postgresql.org 175 : 305 : ctx->output_plugin_private = data;
176 : :
177 : 305 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
2726 peter_e@gmx.net 178 : 305 : opt->receive_rewrites = false;
179 : :
4205 rhaas@postgresql.org 180 [ + + + + : 649 : foreach(option, ctx->output_plugin_options)
+ + ]
181 : : {
182 : 347 : DefElem *elem = lfirst(option);
183 : :
184 [ + - - + ]: 347 : Assert(elem->arg == NULL || IsA(elem->arg, String));
185 : :
186 [ + + ]: 347 : if (strcmp(elem->defname, "include-xids") == 0)
187 : : {
188 : : /* if option does not provide a value, it means its value is true */
189 [ - + ]: 163 : if (elem->arg == NULL)
4205 rhaas@postgresql.org 190 :UBC 0 : data->include_xids = true;
4205 rhaas@postgresql.org 191 [ + + ]:CBC 163 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
192 [ + - ]: 2 : ereport(ERROR,
193 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 : : strVal(elem->arg), elem->defname)));
196 : : }
197 [ + + ]: 184 : else if (strcmp(elem->defname, "include-timestamp") == 0)
198 : : {
199 [ - + ]: 1 : if (elem->arg == NULL)
4205 rhaas@postgresql.org 200 :UBC 0 : data->include_timestamp = true;
4205 rhaas@postgresql.org 201 [ - + ]:CBC 1 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
4205 rhaas@postgresql.org 202 [ # # ]:UBC 0 : ereport(ERROR,
203 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 : : strVal(elem->arg), elem->defname)));
206 : : }
4205 rhaas@postgresql.org 207 [ + + ]:CBC 183 : else if (strcmp(elem->defname, "force-binary") == 0)
208 : : {
209 : : bool force_binary;
210 : :
211 [ - + ]: 6 : if (elem->arg == NULL)
4205 rhaas@postgresql.org 212 :UBC 0 : continue;
4205 rhaas@postgresql.org 213 [ - + ]:CBC 6 : else if (!parse_bool(strVal(elem->arg), &force_binary))
4205 rhaas@postgresql.org 214 [ # # ]:UBC 0 : ereport(ERROR,
215 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 : : strVal(elem->arg), elem->defname)));
218 : :
4205 rhaas@postgresql.org 219 [ + + ]:CBC 6 : if (force_binary)
220 : 2 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
221 : : }
4023 andres@anarazel.de 222 [ + + ]: 177 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 : : {
224 : :
225 [ - + ]: 160 : if (elem->arg == NULL)
4023 andres@anarazel.de 226 :UBC 0 : data->skip_empty_xacts = true;
4023 andres@anarazel.de 227 [ - + ]:CBC 160 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
4023 andres@anarazel.de 228 [ # # ]:UBC 0 : ereport(ERROR,
229 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 : : strVal(elem->arg), elem->defname)));
232 : : }
3783 andres@anarazel.de 233 [ + + ]:CBC 17 : else if (strcmp(elem->defname, "only-local") == 0)
234 : : {
235 : :
236 [ - + ]: 3 : if (elem->arg == NULL)
3783 andres@anarazel.de 237 :UBC 0 : data->only_local = true;
3783 andres@anarazel.de 238 [ - + ]:CBC 3 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
3783 andres@anarazel.de 239 [ # # ]:UBC 0 : ereport(ERROR,
240 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 : : strVal(elem->arg), elem->defname)));
243 : : }
2726 peter_e@gmx.net 244 [ + + ]:CBC 14 : else if (strcmp(elem->defname, "include-rewrites") == 0)
245 : : {
246 : :
247 [ - + ]: 1 : if (elem->arg == NULL)
2726 peter_e@gmx.net 248 :UBC 0 : continue;
2726 peter_e@gmx.net 249 [ - + ]:CBC 1 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
2726 peter_e@gmx.net 250 [ # # ]:UBC 0 : ereport(ERROR,
251 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 : : strVal(elem->arg), elem->defname)));
254 : : }
1855 akapila@postgresql.o 255 [ + + ]:CBC 13 : else if (strcmp(elem->defname, "stream-changes") == 0)
256 : : {
257 [ - + ]: 12 : if (elem->arg == NULL)
1855 akapila@postgresql.o 258 :UBC 0 : continue;
1855 akapila@postgresql.o 259 [ - + ]:CBC 12 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
1855 akapila@postgresql.o 260 [ # # ]:UBC 0 : ereport(ERROR,
261 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 : : strVal(elem->arg), elem->defname)));
264 : : }
265 : : else
266 : : {
4205 rhaas@postgresql.org 267 [ + - + - ]:CBC 1 : ereport(ERROR,
268 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 : : errmsg("option \"%s\" = \"%s\" is unknown",
270 : : elem->defname,
271 : : elem->arg ? strVal(elem->arg) : "(null)")));
272 : : }
273 : : }
274 : :
1855 akapila@postgresql.o 275 : 302 : ctx->streaming &= enable_streaming;
4205 rhaas@postgresql.org 276 : 302 : }
277 : :
278 : : /* cleanup this plugin's resources */
279 : : static void
280 : 299 : pg_decode_shutdown(LogicalDecodingContext *ctx)
281 : : {
282 : 299 : TestDecodingData *data = ctx->output_plugin_private;
283 : :
284 : : /* cleanup our own resources via memory context reset */
285 : 299 : MemoryContextDelete(data->context);
286 : 299 : }
287 : :
288 : : /* BEGIN callback */
289 : : static void
290 : 411 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
291 : : {
292 : 411 : TestDecodingData *data = ctx->output_plugin_private;
293 : : TestDecodingTxnData *txndata =
841 tgl@sss.pgh.pa.us 294 : 411 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
295 : :
1754 akapila@postgresql.o 296 : 411 : txndata->xact_wrote_changes = false;
297 : 411 : txn->output_plugin_private = txndata;
298 : :
299 : : /*
300 : : * If asked to skip empty transactions, we'll emit BEGIN at the point
301 : : * where the first operation is received for this transaction.
302 : : */
4023 andres@anarazel.de 303 [ + + ]: 411 : if (data->skip_empty_xacts)
304 : 374 : return;
305 : :
306 : 37 : pg_output_begin(ctx, data, txn, true);
307 : : }
308 : :
309 : : static void
310 : 251 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
311 : : {
312 : 251 : OutputPluginPrepareWrite(ctx, last_write);
4205 rhaas@postgresql.org 313 [ + + ]: 251 : if (data->include_xids)
314 : 33 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
315 : : else
316 : 218 : appendStringInfoString(ctx->out, "BEGIN");
4023 andres@anarazel.de 317 : 251 : OutputPluginWrite(ctx, last_write);
4205 rhaas@postgresql.org 318 : 251 : }
319 : :
320 : : /* COMMIT callback */
321 : : static void
322 : 411 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
323 : : XLogRecPtr commit_lsn)
324 : : {
325 : 411 : TestDecodingData *data = ctx->output_plugin_private;
1754 akapila@postgresql.o 326 : 411 : TestDecodingTxnData *txndata = txn->output_plugin_private;
327 : 411 : bool xact_wrote_changes = txndata->xact_wrote_changes;
328 : :
329 : 411 : pfree(txndata);
330 : 411 : txn->output_plugin_private = NULL;
331 : :
332 [ + + + + ]: 411 : if (data->skip_empty_xacts && !xact_wrote_changes)
4023 andres@anarazel.de 333 : 168 : return;
334 : :
4205 rhaas@postgresql.org 335 : 243 : OutputPluginPrepareWrite(ctx, true);
336 [ + + ]: 243 : if (data->include_xids)
337 : 32 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 : : else
339 : 211 : appendStringInfoString(ctx->out, "COMMIT");
340 : :
341 [ + + ]: 243 : if (data->include_timestamp)
342 : 1 : appendStringInfo(ctx->out, " (at %s)",
343 : : timestamptz_to_str(txn->xact_time.commit_time));
344 : :
345 : 243 : OutputPluginWrite(ctx, true);
346 : : }
347 : :
348 : : /* BEGIN PREPARE callback */
349 : : static void
1711 akapila@postgresql.o 350 : 9 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
351 : : {
352 : 9 : TestDecodingData *data = ctx->output_plugin_private;
353 : : TestDecodingTxnData *txndata =
841 tgl@sss.pgh.pa.us 354 : 9 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
355 : :
1711 akapila@postgresql.o 356 : 9 : txndata->xact_wrote_changes = false;
357 : 9 : txn->output_plugin_private = txndata;
358 : :
359 : : /*
360 : : * If asked to skip empty transactions, we'll emit BEGIN at the point
361 : : * where the first operation is received for this transaction.
362 : : */
363 [ + + ]: 9 : if (data->skip_empty_xacts)
364 : 8 : return;
365 : :
366 : 1 : pg_output_begin(ctx, data, txn, true);
367 : : }
368 : :
369 : : /* PREPARE callback */
370 : : static void
371 : 9 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
372 : : XLogRecPtr prepare_lsn)
373 : : {
374 : 9 : TestDecodingData *data = ctx->output_plugin_private;
375 : 9 : TestDecodingTxnData *txndata = txn->output_plugin_private;
376 : :
377 : : /*
378 : : * If asked to skip empty transactions, we'll emit PREPARE at the point
379 : : * where the first operation is received for this transaction.
380 : : */
381 [ + + + + ]: 9 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 : 1 : return;
383 : :
384 : 8 : OutputPluginPrepareWrite(ctx, true);
385 : :
386 : 8 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 : 8 : quote_literal_cstr(txn->gid));
388 : :
389 [ + + ]: 8 : if (data->include_xids)
390 : 1 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
391 : :
392 [ - + ]: 8 : if (data->include_timestamp)
1711 akapila@postgresql.o 393 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
394 : : timestamptz_to_str(txn->xact_time.prepare_time));
395 : :
1711 akapila@postgresql.o 396 :CBC 8 : OutputPluginWrite(ctx, true);
397 : : }
398 : :
399 : : /* COMMIT PREPARED callback */
400 : : static void
401 : 8 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
402 : : XLogRecPtr commit_lsn)
403 : : {
404 : 8 : TestDecodingData *data = ctx->output_plugin_private;
405 : :
406 : 8 : OutputPluginPrepareWrite(ctx, true);
407 : :
408 : 8 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 : 8 : quote_literal_cstr(txn->gid));
410 : :
411 [ + + ]: 8 : if (data->include_xids)
412 : 1 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
413 : :
414 [ - + ]: 8 : if (data->include_timestamp)
1711 akapila@postgresql.o 415 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
416 : : timestamptz_to_str(txn->xact_time.commit_time));
417 : :
1711 akapila@postgresql.o 418 :CBC 8 : OutputPluginWrite(ctx, true);
419 : 8 : }
420 : :
421 : : /* ROLLBACK PREPARED callback */
422 : : static void
423 : 2 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
424 : : ReorderBufferTXN *txn,
425 : : XLogRecPtr prepare_end_lsn,
426 : : TimestampTz prepare_time)
427 : : {
428 : 2 : TestDecodingData *data = ctx->output_plugin_private;
429 : :
430 : 2 : OutputPluginPrepareWrite(ctx, true);
431 : :
432 : 2 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
433 : 2 : quote_literal_cstr(txn->gid));
434 : :
435 [ - + ]: 2 : if (data->include_xids)
1711 akapila@postgresql.o 436 :UBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
437 : :
1711 akapila@postgresql.o 438 [ - + ]:CBC 2 : if (data->include_timestamp)
1711 akapila@postgresql.o 439 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
440 : : timestamptz_to_str(txn->xact_time.commit_time));
441 : :
1711 akapila@postgresql.o 442 :CBC 2 : OutputPluginWrite(ctx, true);
443 : 2 : }
444 : :
445 : : /*
446 : : * Filter out two-phase transactions.
447 : : *
448 : : * Each plugin can implement its own filtering logic. Here we demonstrate a
449 : : * simple logic by checking the GID. If the GID contains the "_nodecode"
450 : : * substring, then we filter it out.
451 : : */
452 : : static bool
1621 453 : 148 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
454 : : const char *gid)
455 : : {
1711 456 [ + + ]: 148 : if (strstr(gid, "_nodecode") != NULL)
457 : 12 : return true;
458 : :
459 : 136 : return false;
460 : : }
461 : :
462 : : static bool
3783 andres@anarazel.de 463 : 940306 : pg_decode_filter(LogicalDecodingContext *ctx,
464 : : RepOriginId origin_id)
465 : : {
466 : 940306 : TestDecodingData *data = ctx->output_plugin_private;
467 : :
468 [ + + + + ]: 940306 : if (data->only_local && origin_id != InvalidRepOriginId)
469 : 9 : return true;
470 : 940297 : return false;
471 : : }
472 : :
473 : : /*
474 : : * Print literal `outputstr' already represented as string of type `typid'
475 : : * into stringbuf `s'.
476 : : *
477 : : * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
478 : : * if standard_conforming_strings were enabled.
479 : : */
480 : : static void
4205 rhaas@postgresql.org 481 : 175860 : print_literal(StringInfo s, Oid typid, char *outputstr)
482 : : {
483 : : const char *valptr;
484 : :
485 [ + - - + ]: 175860 : switch (typid)
486 : : {
487 : 60216 : case INT2OID:
488 : : case INT4OID:
489 : : case INT8OID:
490 : : case OIDOID:
491 : : case FLOAT4OID:
492 : : case FLOAT8OID:
493 : : case NUMERICOID:
494 : : /* NB: We don't care about Inf, NaN et al. */
495 : 60216 : appendStringInfoString(s, outputstr);
496 : 60216 : break;
497 : :
4205 rhaas@postgresql.org 498 :UBC 0 : case BITOID:
499 : : case VARBITOID:
500 : 0 : appendStringInfo(s, "B'%s'", outputstr);
501 : 0 : break;
502 : :
503 : 0 : case BOOLOID:
504 [ # # ]: 0 : if (strcmp(outputstr, "t") == 0)
505 : 0 : appendStringInfoString(s, "true");
506 : : else
507 : 0 : appendStringInfoString(s, "false");
508 : 0 : break;
509 : :
4205 rhaas@postgresql.org 510 :CBC 115644 : default:
511 : 115644 : appendStringInfoChar(s, '\'');
512 [ + + ]: 5419055 : for (valptr = outputstr; *valptr; valptr++)
513 : : {
514 : 5303411 : char ch = *valptr;
515 : :
516 [ + + ]: 5303411 : if (SQL_STR_DOUBLE(ch, false))
517 : 64 : appendStringInfoChar(s, ch);
518 : 5303411 : appendStringInfoChar(s, ch);
519 : : }
520 : 115644 : appendStringInfoChar(s, '\'');
521 : 115644 : break;
522 : : }
523 : 175860 : }
524 : :
525 : : /* print the tuple 'tuple' into the StringInfo s */
526 : : static void
527 : 145521 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528 : : {
529 : : int natt;
530 : :
531 : : /* print all columns individually */
532 [ + + ]: 347138 : for (natt = 0; natt < tupdesc->natts; natt++)
533 : : {
534 : : Form_pg_attribute attr; /* the attribute itself */
535 : : Oid typid; /* type of current attribute */
536 : : Oid typoutput; /* output function */
537 : : bool typisvarlena;
538 : : Datum origval; /* possibly toasted Datum */
539 : : bool isnull; /* column is null? */
540 : :
2939 andres@anarazel.de 541 : 201617 : attr = TupleDescAttr(tupdesc, natt);
542 : :
543 : : /*
544 : : * don't print dropped columns, we can't be sure everything is
545 : : * available for them
546 : : */
4205 rhaas@postgresql.org 547 [ + + ]: 201617 : if (attr->attisdropped)
548 : 5130 : continue;
549 : :
550 : : /*
551 : : * Don't print system columns, oid will already have been printed if
552 : : * present.
553 : : */
554 [ - + ]: 201545 : if (attr->attnum < 0)
4205 rhaas@postgresql.org 555 :UBC 0 : continue;
556 : :
4205 rhaas@postgresql.org 557 :CBC 201545 : typid = attr->atttypid;
558 : :
559 : : /* get Datum from tuple */
3724 andres@anarazel.de 560 : 201545 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561 : :
4205 rhaas@postgresql.org 562 [ + + + + ]: 201545 : if (isnull && skip_nulls)
563 : 5058 : continue;
564 : :
565 : : /* print attribute name */
566 : 196487 : appendStringInfoChar(s, ' ');
567 : 196487 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
568 : :
569 : : /* print attribute type */
570 : 196487 : appendStringInfoChar(s, '[');
571 : 196487 : appendStringInfoString(s, format_type_be(typid));
572 : 196487 : appendStringInfoChar(s, ']');
573 : :
574 : : /* query output function */
575 : 196487 : getTypeOutputInfo(typid,
576 : : &typoutput, &typisvarlena);
577 : :
578 : : /* print separator */
579 : 196487 : appendStringInfoChar(s, ':');
580 : :
581 : : /* print data */
582 [ + + ]: 196487 : if (isnull)
583 : 20615 : appendStringInfoString(s, "null");
32 peter@eisentraut.org 584 [ + + + + ]:GNC 175872 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
4205 rhaas@postgresql.org 585 :CBC 12 : appendStringInfoString(s, "unchanged-toast-datum");
586 [ + + ]: 175860 : else if (!typisvarlena)
587 : 60220 : print_literal(s, typid,
588 : : OidOutputFunctionCall(typoutput, origval));
589 : : else
590 : : {
591 : : Datum val; /* definitely detoasted Datum */
592 : :
593 : 115640 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
594 : 115640 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 : : }
596 : : }
597 : 145521 : }
598 : :
599 : : /*
600 : : * callback for individual changed tuples
601 : : */
602 : : static void
603 : 150510 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
604 : : Relation relation, ReorderBufferChange *change)
605 : : {
606 : : TestDecodingData *data;
607 : : TestDecodingTxnData *txndata;
608 : : Form_pg_class class_form;
609 : : TupleDesc tupdesc;
610 : : MemoryContext old;
611 : :
612 : 150510 : data = ctx->output_plugin_private;
1754 akapila@postgresql.o 613 : 150510 : txndata = txn->output_plugin_private;
614 : :
615 : : /* output BEGIN if we haven't yet */
616 [ + + + + ]: 150510 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 : : {
4023 andres@anarazel.de 618 : 203 : pg_output_begin(ctx, data, txn, false);
619 : : }
1754 akapila@postgresql.o 620 : 150510 : txndata->xact_wrote_changes = true;
621 : :
4205 rhaas@postgresql.org 622 : 150510 : class_form = RelationGetForm(relation);
623 : 150510 : tupdesc = RelationGetDescr(relation);
624 : :
625 : : /* Avoid leaking memory by using and resetting our own context */
626 : 150510 : old = MemoryContextSwitchTo(data->context);
627 : :
628 : 150510 : OutputPluginPrepareWrite(ctx, true);
629 : :
630 : 150510 : appendStringInfoString(ctx->out, "table ");
631 : 150510 : appendStringInfoString(ctx->out,
2046 alvherre@alvh.no-ip. 632 : 150510 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
2726 peter_e@gmx.net 633 [ + + ]: 150510 : class_form->relrewrite ?
634 : 1 : get_rel_name(class_form->relrewrite) :
635 : : NameStr(class_form->relname)));
3771 636 : 150510 : appendStringInfoChar(ctx->out, ':');
637 : :
4205 rhaas@postgresql.org 638 [ + + + - ]: 150510 : switch (change->action)
639 : : {
640 : 132952 : case REORDER_BUFFER_CHANGE_INSERT:
641 : 132952 : appendStringInfoString(ctx->out, " INSERT:");
4201 tgl@sss.pgh.pa.us 642 [ - + ]: 132952 : if (change->data.tp.newtuple == NULL)
4205 rhaas@postgresql.org 643 :UBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
644 : : else
4205 rhaas@postgresql.org 645 :CBC 132952 : tuple_to_stringinfo(ctx->out, tupdesc,
646 : : change->data.tp.newtuple,
647 : : false);
648 : 132952 : break;
649 : 7536 : case REORDER_BUFFER_CHANGE_UPDATE:
650 : 7536 : appendStringInfoString(ctx->out, " UPDATE:");
4201 tgl@sss.pgh.pa.us 651 [ + + ]: 7536 : if (change->data.tp.oldtuple != NULL)
652 : : {
4205 rhaas@postgresql.org 653 : 18 : appendStringInfoString(ctx->out, " old-key:");
654 : 18 : tuple_to_stringinfo(ctx->out, tupdesc,
655 : : change->data.tp.oldtuple,
656 : : true);
657 : 18 : appendStringInfoString(ctx->out, " new-tuple:");
658 : : }
659 : :
4201 tgl@sss.pgh.pa.us 660 [ - + ]: 7536 : if (change->data.tp.newtuple == NULL)
4205 rhaas@postgresql.org 661 :UBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
662 : : else
4205 rhaas@postgresql.org 663 :CBC 7536 : tuple_to_stringinfo(ctx->out, tupdesc,
664 : : change->data.tp.newtuple,
665 : : false);
666 : 7536 : break;
667 : 10022 : case REORDER_BUFFER_CHANGE_DELETE:
668 : 10022 : appendStringInfoString(ctx->out, " DELETE:");
669 : :
670 : : /* if there was no PK, we only know that a delete happened */
4201 tgl@sss.pgh.pa.us 671 [ + + ]: 10022 : if (change->data.tp.oldtuple == NULL)
4205 rhaas@postgresql.org 672 : 5007 : appendStringInfoString(ctx->out, " (no-tuple-data)");
673 : : /* In DELETE, only the replica identity is present; display that */
674 : : else
675 : 5015 : tuple_to_stringinfo(ctx->out, tupdesc,
676 : : change->data.tp.oldtuple,
677 : : true);
678 : 10022 : break;
4201 tgl@sss.pgh.pa.us 679 :UBC 0 : default:
680 : 0 : Assert(false);
681 : : }
682 : :
4205 rhaas@postgresql.org 683 :CBC 150510 : MemoryContextSwitchTo(old);
684 : 150510 : MemoryContextReset(data->context);
685 : :
686 : 150510 : OutputPluginWrite(ctx, true);
687 : 150510 : }
688 : :
689 : : static void
2709 peter_e@gmx.net 690 : 8 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
691 : : int nrelations, Relation relations[], ReorderBufferChange *change)
692 : : {
693 : : TestDecodingData *data;
694 : : TestDecodingTxnData *txndata;
695 : : MemoryContext old;
696 : : int i;
697 : :
698 : 8 : data = ctx->output_plugin_private;
1754 akapila@postgresql.o 699 : 8 : txndata = txn->output_plugin_private;
700 : :
701 : : /* output BEGIN if we haven't yet */
702 [ + + + - ]: 8 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 : : {
2709 peter_e@gmx.net 704 : 7 : pg_output_begin(ctx, data, txn, false);
705 : : }
1754 akapila@postgresql.o 706 : 8 : txndata->xact_wrote_changes = true;
707 : :
708 : : /* Avoid leaking memory by using and resetting our own context */
2709 peter_e@gmx.net 709 : 8 : old = MemoryContextSwitchTo(data->context);
710 : :
711 : 8 : OutputPluginPrepareWrite(ctx, true);
712 : :
713 : 8 : appendStringInfoString(ctx->out, "table ");
714 : :
715 [ + + ]: 17 : for (i = 0; i < nrelations; i++)
716 : : {
717 [ + + ]: 9 : if (i > 0)
718 : 1 : appendStringInfoString(ctx->out, ", ");
719 : :
720 : 9 : appendStringInfoString(ctx->out,
721 : 9 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 : 9 : NameStr(relations[i]->rd_rel->relname)));
723 : : }
724 : :
725 : 8 : appendStringInfoString(ctx->out, ": TRUNCATE:");
726 : :
727 [ + + ]: 8 : if (change->data.truncate.restart_seqs
728 [ - + ]: 7 : || change->data.truncate.cascade)
729 : : {
730 [ + - ]: 1 : if (change->data.truncate.restart_seqs)
2256 drowley@postgresql.o 731 : 1 : appendStringInfoString(ctx->out, " restart_seqs");
2709 peter_e@gmx.net 732 [ + - ]: 1 : if (change->data.truncate.cascade)
2256 drowley@postgresql.o 733 : 1 : appendStringInfoString(ctx->out, " cascade");
734 : : }
735 : : else
2709 peter_e@gmx.net 736 : 7 : appendStringInfoString(ctx->out, " (no-flags)");
737 : :
738 : 8 : MemoryContextSwitchTo(old);
739 : 8 : MemoryContextReset(data->context);
740 : :
741 : 8 : OutputPluginWrite(ctx, true);
742 : 8 : }
743 : :
744 : : static void
3440 simon@2ndQuadrant.co 745 : 9 : pg_decode_message(LogicalDecodingContext *ctx,
746 : : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
747 : : const char *prefix, Size sz, const char *message)
748 : : {
788 akapila@postgresql.o 749 : 9 : TestDecodingData *data = ctx->output_plugin_private;
750 : : TestDecodingTxnData *txndata;
751 : :
752 [ + + ]: 9 : txndata = transactional ? txn->output_plugin_private : NULL;
753 : :
754 : : /* output BEGIN if we haven't yet for transactional messages */
755 [ + + + - : 9 : if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ + ]
756 : 3 : pg_output_begin(ctx, data, txn, false);
757 : :
758 [ + + ]: 9 : if (transactional)
759 : 5 : txndata->xact_wrote_changes = true;
760 : :
3440 simon@2ndQuadrant.co 761 : 9 : OutputPluginPrepareWrite(ctx, true);
762 : 9 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 : : transactional, prefix, sz);
764 : 9 : appendBinaryStringInfo(ctx->out, message, sz);
765 : 9 : OutputPluginWrite(ctx, true);
766 : 9 : }
767 : :
768 : : static void
1866 akapila@postgresql.o 769 : 40 : pg_decode_stream_start(LogicalDecodingContext *ctx,
770 : : ReorderBufferTXN *txn)
771 : : {
772 : 40 : TestDecodingData *data = ctx->output_plugin_private;
1754 773 : 40 : TestDecodingTxnData *txndata = txn->output_plugin_private;
774 : :
775 : : /*
776 : : * Allocate the txn plugin data for the first stream in the transaction.
777 : : */
778 [ + + ]: 40 : if (txndata == NULL)
779 : : {
780 : : txndata =
781 : 9 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
782 : 9 : txndata->xact_wrote_changes = false;
783 : 9 : txn->output_plugin_private = txndata;
784 : : }
785 : :
786 : 40 : txndata->stream_wrote_changes = false;
1821 787 [ + - ]: 40 : if (data->skip_empty_xacts)
788 : 40 : return;
1821 akapila@postgresql.o 789 :UBC 0 : pg_output_stream_start(ctx, data, txn, true);
790 : : }
791 : :
792 : : static void
1821 akapila@postgresql.o 793 :CBC 11 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
794 : : {
795 : 11 : OutputPluginPrepareWrite(ctx, last_write);
1866 796 [ - + ]: 11 : if (data->include_xids)
1866 akapila@postgresql.o 797 :UBC 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 : : else
1787 drowley@postgresql.o 799 :CBC 11 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
1821 akapila@postgresql.o 800 : 11 : OutputPluginWrite(ctx, last_write);
1866 801 : 11 : }
802 : :
803 : : static void
804 : 40 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
805 : : ReorderBufferTXN *txn)
806 : : {
807 : 40 : TestDecodingData *data = ctx->output_plugin_private;
1754 808 : 40 : TestDecodingTxnData *txndata = txn->output_plugin_private;
809 : :
810 [ + - + + ]: 40 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
1821 811 : 29 : return;
812 : :
1866 813 : 11 : OutputPluginPrepareWrite(ctx, true);
814 [ - + ]: 11 : if (data->include_xids)
1866 akapila@postgresql.o 815 :UBC 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 : : else
1787 drowley@postgresql.o 817 :CBC 11 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
1866 akapila@postgresql.o 818 : 11 : OutputPluginWrite(ctx, true);
819 : : }
820 : :
821 : : static void
822 : 4 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
823 : : ReorderBufferTXN *txn,
824 : : XLogRecPtr abort_lsn)
825 : : {
826 : 4 : TestDecodingData *data = ctx->output_plugin_private;
827 : :
828 : : /*
829 : : * stream abort can be sent for an individual subtransaction but we
830 : : * maintain the output_plugin_private only under the toptxn so if this is
831 : : * not the toptxn then fetch the toptxn.
832 : : */
904 833 [ + - ]: 4 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
1754 834 : 4 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
835 : 4 : bool xact_wrote_changes = txndata->xact_wrote_changes;
836 : :
904 837 [ - + ]: 4 : if (rbtxn_is_toptxn(txn))
838 : : {
1754 akapila@postgresql.o 839 [ # # ]:UBC 0 : Assert(txn->output_plugin_private != NULL);
840 : 0 : pfree(txndata);
841 : 0 : txn->output_plugin_private = NULL;
842 : : }
843 : :
1754 akapila@postgresql.o 844 [ + - - + ]:CBC 4 : if (data->skip_empty_xacts && !xact_wrote_changes)
1821 akapila@postgresql.o 845 :UBC 0 : return;
846 : :
1866 akapila@postgresql.o 847 :CBC 4 : OutputPluginPrepareWrite(ctx, true);
848 [ - + ]: 4 : if (data->include_xids)
1866 akapila@postgresql.o 849 :UBC 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 : : else
1787 drowley@postgresql.o 851 :CBC 4 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
1866 akapila@postgresql.o 852 : 4 : OutputPluginWrite(ctx, true);
853 : : }
854 : :
855 : : static void
1711 856 : 1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
857 : : ReorderBufferTXN *txn,
858 : : XLogRecPtr prepare_lsn)
859 : : {
860 : 1 : TestDecodingData *data = ctx->output_plugin_private;
861 : 1 : TestDecodingTxnData *txndata = txn->output_plugin_private;
862 : :
863 [ + - - + ]: 1 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
1711 akapila@postgresql.o 864 :UBC 0 : return;
865 : :
1711 akapila@postgresql.o 866 :CBC 1 : OutputPluginPrepareWrite(ctx, true);
867 : :
868 [ - + ]: 1 : if (data->include_xids)
1711 akapila@postgresql.o 869 :UBC 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 : 0 : quote_literal_cstr(txn->gid), txn->xid);
871 : : else
1711 akapila@postgresql.o 872 :CBC 1 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 : 1 : quote_literal_cstr(txn->gid));
874 : :
875 [ - + ]: 1 : if (data->include_timestamp)
1711 akapila@postgresql.o 876 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
877 : : timestamptz_to_str(txn->xact_time.prepare_time));
878 : :
1711 akapila@postgresql.o 879 :CBC 1 : OutputPluginWrite(ctx, true);
880 : : }
881 : :
882 : : static void
1866 883 : 5 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
884 : : ReorderBufferTXN *txn,
885 : : XLogRecPtr commit_lsn)
886 : : {
887 : 5 : TestDecodingData *data = ctx->output_plugin_private;
1754 888 : 5 : TestDecodingTxnData *txndata = txn->output_plugin_private;
889 : 5 : bool xact_wrote_changes = txndata->xact_wrote_changes;
890 : :
891 : 5 : pfree(txndata);
892 : 5 : txn->output_plugin_private = NULL;
893 : :
894 [ + - - + ]: 5 : if (data->skip_empty_xacts && !xact_wrote_changes)
1821 akapila@postgresql.o 895 :UBC 0 : return;
896 : :
1866 akapila@postgresql.o 897 :CBC 5 : OutputPluginPrepareWrite(ctx, true);
898 : :
899 [ - + ]: 5 : if (data->include_xids)
1866 akapila@postgresql.o 900 :UBC 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 : : else
1787 drowley@postgresql.o 902 :CBC 5 : appendStringInfoString(ctx->out, "committing streamed transaction");
903 : :
1866 akapila@postgresql.o 904 [ - + ]: 5 : if (data->include_timestamp)
1866 akapila@postgresql.o 905 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
906 : : timestamptz_to_str(txn->xact_time.commit_time));
907 : :
1866 akapila@postgresql.o 908 :CBC 5 : OutputPluginWrite(ctx, true);
909 : : }
910 : :
911 : : /*
912 : : * In streaming mode, we don't display the changes as the transaction can abort
913 : : * at a later point in time. We don't want users to see the changes until the
914 : : * transaction is committed.
915 : : */
916 : : static void
917 : 65 : pg_decode_stream_change(LogicalDecodingContext *ctx,
918 : : ReorderBufferTXN *txn,
919 : : Relation relation,
920 : : ReorderBufferChange *change)
921 : : {
922 : 65 : TestDecodingData *data = ctx->output_plugin_private;
1754 923 : 65 : TestDecodingTxnData *txndata = txn->output_plugin_private;
924 : :
925 : : /* output stream start if we haven't yet */
926 [ + - + + ]: 65 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 : : {
1821 928 : 8 : pg_output_stream_start(ctx, data, txn, false);
929 : : }
1754 930 : 65 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931 : :
1866 932 : 65 : OutputPluginPrepareWrite(ctx, true);
933 [ - + ]: 65 : if (data->include_xids)
1866 akapila@postgresql.o 934 :UBC 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 : : else
1787 drowley@postgresql.o 936 :CBC 65 : appendStringInfoString(ctx->out, "streaming change for transaction");
1866 akapila@postgresql.o 937 : 65 : OutputPluginWrite(ctx, true);
938 : 65 : }
939 : :
940 : : /*
941 : : * In streaming mode, we don't display the contents for transactional messages
942 : : * as the transaction can abort at a later point in time. We don't want users to
943 : : * see the message contents until the transaction is committed.
944 : : */
945 : : static void
946 : 3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
947 : : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
948 : : const char *prefix, Size sz, const char *message)
949 : : {
950 : : /* Output stream start if we haven't yet for transactional messages. */
677 951 [ + - ]: 3 : if (transactional)
952 : : {
953 : 3 : TestDecodingData *data = ctx->output_plugin_private;
954 : 3 : TestDecodingTxnData *txndata = txn->output_plugin_private;
955 : :
956 [ + - + - ]: 3 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 : : {
958 : 3 : pg_output_stream_start(ctx, data, txn, false);
959 : : }
960 : 3 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 : : }
962 : :
1866 963 : 3 : OutputPluginPrepareWrite(ctx, true);
964 : :
965 [ + - ]: 3 : if (transactional)
966 : : {
967 : 3 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 : : transactional, prefix, sz);
969 : : }
970 : : else
971 : : {
1866 akapila@postgresql.o 972 :UBC 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 : : transactional, prefix, sz);
974 : 0 : appendBinaryStringInfo(ctx->out, message, sz);
975 : : }
976 : :
1866 akapila@postgresql.o 977 :CBC 3 : OutputPluginWrite(ctx, true);
978 : 3 : }
979 : :
980 : : /*
981 : : * In streaming mode, we don't display the detailed information of Truncate.
982 : : * See pg_decode_stream_change.
983 : : */
984 : : static void
1866 akapila@postgresql.o 985 :UBC 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
986 : : int nrelations, Relation relations[],
987 : : ReorderBufferChange *change)
988 : : {
989 : 0 : TestDecodingData *data = ctx->output_plugin_private;
1754 990 : 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
991 : :
992 [ # # # # ]: 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 : : {
1821 994 : 0 : pg_output_stream_start(ctx, data, txn, false);
995 : : }
1754 996 : 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997 : :
1866 998 : 0 : OutputPluginPrepareWrite(ctx, true);
999 [ # # ]: 0 : if (data->include_xids)
1000 : 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 : : else
1787 drowley@postgresql.o 1002 : 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
1866 akapila@postgresql.o 1003 : 0 : OutputPluginWrite(ctx, true);
1004 : 0 : }
|