Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgoutput.c
4 : : * Logical Replication output plugin
5 : : *
6 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgoutput/pgoutput.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/tupconvert.h"
16 : : #include "catalog/partition.h"
17 : : #include "catalog/pg_publication.h"
18 : : #include "catalog/pg_publication_rel.h"
19 : : #include "catalog/pg_subscription.h"
20 : : #include "commands/defrem.h"
21 : : #include "commands/subscriptioncmds.h"
22 : : #include "executor/executor.h"
23 : : #include "fmgr.h"
24 : : #include "nodes/makefuncs.h"
25 : : #include "parser/parse_relation.h"
26 : : #include "replication/logical.h"
27 : : #include "replication/logicalproto.h"
28 : : #include "replication/origin.h"
29 : : #include "replication/pgoutput.h"
30 : : #include "rewrite/rewriteHandler.h"
31 : : #include "utils/builtins.h"
32 : : #include "utils/inval.h"
33 : : #include "utils/lsyscache.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/rel.h"
36 : : #include "utils/syscache.h"
37 : : #include "utils/varlena.h"
38 : :
216 tgl@sss.pgh.pa.us 39 :CBC 530 : PG_MODULE_MAGIC_EXT(
40 : : .name = "pgoutput",
41 : : .version = PG_VERSION
42 : : );
43 : :
44 : : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : : OutputPluginOptions *opt, bool is_init);
46 : : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : : ReorderBufferTXN *txn);
49 : : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : : ReorderBufferTXN *txn, Relation relation,
53 : : ReorderBufferChange *change);
54 : : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : : ReorderBufferChange *change);
57 : : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : : bool transactional, const char *prefix,
60 : : Size sz, const char *message);
61 : : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : : RepOriginId origin_id);
63 : : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : : ReorderBufferTXN *txn);
65 : : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : : ReorderBufferTXN *txn,
71 : : XLogRecPtr prepare_end_lsn,
72 : : TimestampTz prepare_time);
73 : : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : : ReorderBufferTXN *txn);
75 : : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : : ReorderBufferTXN *txn);
77 : : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : : ReorderBufferTXN *txn,
79 : : XLogRecPtr abort_lsn);
80 : : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : : ReorderBufferTXN *txn,
82 : : XLogRecPtr commit_lsn);
83 : : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 : :
86 : : static bool publications_valid;
87 : :
88 : : static List *LoadPublications(List *pubnames);
89 : : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : : uint32 hashvalue);
91 : : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : : bool send_origin);
94 : :
95 : : /*
96 : : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : : * "delete"). See RelationSyncEntry.exprstate[].
98 : : */
99 : : enum RowFilterPubAction
100 : : {
101 : : PUBACTION_INSERT,
102 : : PUBACTION_UPDATE,
103 : : PUBACTION_DELETE,
104 : : };
105 : :
106 : : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 : :
108 : : /*
109 : : * Entry in the map used to remember which relation schemas we sent.
110 : : *
111 : : * The schema_sent flag determines if the current schema record for the
112 : : * relation (and for its ancestor if publish_as_relid is set) was already
113 : : * sent to the subscriber (in which case we don't need to send it again).
114 : : *
115 : : * The schema cache on downstream is however updated only at commit time,
116 : : * and with streamed transactions the commit order may be different from
117 : : * the order the transactions are sent in. Also, the (sub) transactions
118 : : * might get aborted so we need to send the schema for each (sub) transaction
119 : : * so that we don't lose the schema information on abort. For handling this,
120 : : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : : * the schema.
122 : : *
123 : : * For partitions, 'pubactions' considers not only the table's own
124 : : * publications, but also those of all of its ancestors.
125 : : */
126 : : typedef struct RelationSyncEntry
127 : : {
128 : : Oid relid; /* relation oid */
129 : :
130 : : bool replicate_valid; /* overall validity flag for entry */
131 : :
132 : : bool schema_sent;
133 : :
134 : : /*
135 : : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : : * columns and the 'publish_generated_columns' parameter is set to
137 : : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : : * indicating that no generated columns should be published, unless
139 : : * explicitly specified in the column list.
140 : : */
141 : : PublishGencolsType include_gencols_type;
142 : : List *streamed_txns; /* streamed toplevel transactions with this
143 : : * schema */
144 : :
145 : : /* are we publishing this rel? */
146 : : PublicationActions pubactions;
147 : :
148 : : /*
149 : : * ExprState array for row filter. Different publication actions don't
150 : : * allow multiple expressions to always be combined into one, because
151 : : * updates or deletes restrict the column in expression to be part of the
152 : : * replica identity index whereas inserts do not have this restriction, so
153 : : * there is one ExprState per publication action.
154 : : */
155 : : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : : EState *estate; /* executor state used for row filter */
157 : : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 : :
160 : : /*
161 : : * OID of the relation to publish changes as. For a partition, this may
162 : : * be set to one of its ancestors whose schema will be used when
163 : : * replicating changes, if publish_via_partition_root is set for the
164 : : * publication.
165 : : */
166 : : Oid publish_as_relid;
167 : :
168 : : /*
169 : : * Map used when replicating using an ancestor's schema to convert tuples
170 : : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : : * having identical TupleDesc.
173 : : */
174 : : AttrMap *attrmap;
175 : :
176 : : /*
177 : : * Columns included in the publication, or NULL if all columns are
178 : : * included implicitly. Note that the attnums in this bitmap are not
179 : : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : : */
181 : : Bitmapset *columns;
182 : :
183 : : /*
184 : : * Private context to store additional data for this entry - state for the
185 : : * row filter expressions, column list, etc.
186 : : */
187 : : MemoryContext entry_cxt;
188 : : } RelationSyncEntry;
189 : :
190 : : /*
191 : : * Maintain a per-transaction level variable to track whether the transaction
192 : : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : : * messages for empty transactions which saves network bandwidth.
195 : : *
196 : : * This optimization is not used for prepared transactions because if the
197 : : * WALSender restarts after prepare of a transaction and before commit prepared
198 : : * of the same transaction then we won't be able to figure out if we have
199 : : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : : * because we would have lost the in-memory txndata information that was
201 : : * present prior to the restart. This will result in sending a spurious
202 : : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : : * downstream which would lead to an error when it tries to process it.
204 : : *
205 : : * XXX We could achieve this optimization by changing protocol to send
206 : : * additional information so that downstream can detect that the corresponding
207 : : * prepare has not been sent. However, adding such a check for every
208 : : * transaction in the downstream could be costly so we might want to do it
209 : : * optionally.
210 : : *
211 : : * We also don't have this optimization for streamed transactions because
212 : : * they can contain prepared transactions.
213 : : */
214 : : typedef struct PGOutputTxnData
215 : : {
216 : : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : : } PGOutputTxnData;
218 : :
219 : : /* Map used to remember which relation schemas we sent. */
220 : : static HTAB *RelationSyncCache = NULL;
221 : :
222 : : static void init_rel_sync_cache(MemoryContext cachectx);
223 : : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : : Relation relation);
226 : : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : : LogicalDecodingContext *ctx,
228 : : RelationSyncEntry *relentry);
229 : : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : : uint32 hashvalue);
232 : : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : : TransactionId xid);
234 : : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : : TransactionId xid);
236 : : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : : RelationSyncEntry *entry);
238 : : static void pgoutput_memory_context_reset(void *arg);
239 : :
240 : : /* row filter routines */
241 : : static EState *create_estate_for_relation(Relation rel);
242 : : static void pgoutput_row_filter_init(PGOutputData *data,
243 : : List *publications,
244 : : RelationSyncEntry *entry);
245 : : static bool pgoutput_row_filter_exec_expr(ExprState *state,
246 : : ExprContext *econtext);
247 : : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
248 : : TupleTableSlot **new_slot_ptr,
249 : : RelationSyncEntry *entry,
250 : : ReorderBufferChangeType *action);
251 : :
252 : : /* column list routines */
253 : : static void pgoutput_column_list_init(PGOutputData *data,
254 : : List *publications,
255 : : RelationSyncEntry *entry);
256 : :
257 : : /*
258 : : * Specify output plugin callbacks
259 : : */
260 : : void
3204 peter_e@gmx.net 261 : 720 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
262 : : {
263 : 720 : cb->startup_cb = pgoutput_startup;
264 : 720 : cb->begin_cb = pgoutput_begin_txn;
265 : 720 : cb->change_cb = pgoutput_change;
2761 266 : 720 : cb->truncate_cb = pgoutput_truncate;
1666 akapila@postgresql.o 267 : 720 : cb->message_cb = pgoutput_message;
3204 peter_e@gmx.net 268 : 720 : cb->commit_cb = pgoutput_commit_txn;
269 : :
1567 akapila@postgresql.o 270 : 720 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
271 : 720 : cb->prepare_cb = pgoutput_prepare_txn;
272 : 720 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
273 : 720 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
3204 peter_e@gmx.net 274 : 720 : cb->filter_by_origin_cb = pgoutput_origin_filter;
275 : 720 : cb->shutdown_cb = pgoutput_shutdown;
276 : :
277 : : /* transaction streaming */
1881 akapila@postgresql.o 278 : 720 : cb->stream_start_cb = pgoutput_stream_start;
279 : 720 : cb->stream_stop_cb = pgoutput_stream_stop;
280 : 720 : cb->stream_abort_cb = pgoutput_stream_abort;
281 : 720 : cb->stream_commit_cb = pgoutput_stream_commit;
282 : 720 : cb->stream_change_cb = pgoutput_change;
1666 283 : 720 : cb->stream_message_cb = pgoutput_message;
1881 284 : 720 : cb->stream_truncate_cb = pgoutput_truncate;
285 : : /* transaction streaming - two-phase commit */
1546 286 : 720 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
3204 peter_e@gmx.net 287 : 720 : }
288 : :
289 : : static void
1666 akapila@postgresql.o 290 : 397 : parse_output_parameters(List *options, PGOutputData *data)
291 : : {
292 : : ListCell *lc;
3204 peter_e@gmx.net 293 : 397 : bool protocol_version_given = false;
294 : 397 : bool publication_names_given = false;
1928 tgl@sss.pgh.pa.us 295 : 397 : bool binary_option_given = false;
1666 akapila@postgresql.o 296 : 397 : bool messages_option_given = false;
1881 297 : 397 : bool streaming_given = false;
1567 298 : 397 : bool two_phase_option_given = false;
1195 299 : 397 : bool origin_option_given = false;
300 : :
301 : : /* Initialize optional parameters to defaults */
1666 302 : 397 : data->binary = false;
1023 303 : 397 : data->streaming = LOGICALREP_STREAM_OFF;
1666 304 : 397 : data->messages = false;
1567 305 : 397 : data->two_phase = false;
104 fujii@postgresql.org 306 :GNC 397 : data->publish_no_origin = false;
307 : :
3204 peter_e@gmx.net 308 [ + - + + :CBC 1980 : foreach(lc, options)
+ + ]
309 : : {
310 : 1583 : DefElem *defel = (DefElem *) lfirst(lc);
311 : :
312 [ + - - + ]: 1583 : Assert(defel->arg == NULL || IsA(defel->arg, String));
313 : :
314 : : /* Check each param, whether or not we recognize it */
315 [ + + ]: 1583 : if (strcmp(defel->defname, "proto_version") == 0)
316 : : {
317 : : unsigned long parsed;
318 : : char *endptr;
319 : :
320 [ - + ]: 397 : if (protocol_version_given)
3204 peter_e@gmx.net 321 [ # # ]:UBC 0 : ereport(ERROR,
322 : : (errcode(ERRCODE_SYNTAX_ERROR),
323 : : errmsg("conflicting or redundant options")));
3204 peter_e@gmx.net 324 :CBC 397 : protocol_version_given = true;
325 : :
1352 peter@eisentraut.org 326 : 397 : errno = 0;
327 : 397 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
328 [ + - - + ]: 397 : if (errno != 0 || *endptr != '\0')
3204 peter_e@gmx.net 329 [ # # ]:UBC 0 : ereport(ERROR,
330 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 : : errmsg("invalid proto_version")));
332 : :
1352 peter@eisentraut.org 333 [ - + ]:CBC 397 : if (parsed > PG_UINT32_MAX)
3204 peter_e@gmx.net 334 [ # # ]:UBC 0 : ereport(ERROR,
335 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
336 : : errmsg("proto_version \"%s\" out of range",
337 : : strVal(defel->arg))));
338 : :
1666 akapila@postgresql.o 339 :CBC 397 : data->protocol_version = (uint32) parsed;
340 : : }
3204 peter_e@gmx.net 341 [ + + ]: 1186 : else if (strcmp(defel->defname, "publication_names") == 0)
342 : : {
343 [ - + ]: 397 : if (publication_names_given)
3204 peter_e@gmx.net 344 [ # # ]:UBC 0 : ereport(ERROR,
345 : : (errcode(ERRCODE_SYNTAX_ERROR),
346 : : errmsg("conflicting or redundant options")));
3204 peter_e@gmx.net 347 :CBC 397 : publication_names_given = true;
348 : :
349 [ - + ]: 397 : if (!SplitIdentifierString(strVal(defel->arg), ',',
350 : : &data->publication_names))
3086 bruce@momjian.us 351 [ # # ]:UBC 0 : ereport(ERROR,
352 : : (errcode(ERRCODE_INVALID_NAME),
353 : : errmsg("invalid publication_names syntax")));
354 : : }
1928 tgl@sss.pgh.pa.us 355 [ + + ]:CBC 789 : else if (strcmp(defel->defname, "binary") == 0)
356 : : {
357 [ - + ]: 11 : if (binary_option_given)
1928 tgl@sss.pgh.pa.us 358 [ # # ]:UBC 0 : ereport(ERROR,
359 : : (errcode(ERRCODE_SYNTAX_ERROR),
360 : : errmsg("conflicting or redundant options")));
1928 tgl@sss.pgh.pa.us 361 :CBC 11 : binary_option_given = true;
362 : :
1666 akapila@postgresql.o 363 : 11 : data->binary = defGetBoolean(defel);
364 : : }
365 [ + + ]: 778 : else if (strcmp(defel->defname, "messages") == 0)
366 : : {
367 [ - + ]: 4 : if (messages_option_given)
1666 akapila@postgresql.o 368 [ # # ]:UBC 0 : ereport(ERROR,
369 : : (errcode(ERRCODE_SYNTAX_ERROR),
370 : : errmsg("conflicting or redundant options")));
1666 akapila@postgresql.o 371 :CBC 4 : messages_option_given = true;
372 : :
373 : 4 : data->messages = defGetBoolean(defel);
374 : : }
1881 375 [ + + ]: 774 : else if (strcmp(defel->defname, "streaming") == 0)
376 : : {
377 [ - + ]: 379 : if (streaming_given)
1881 akapila@postgresql.o 378 [ # # ]:UBC 0 : ereport(ERROR,
379 : : (errcode(ERRCODE_SYNTAX_ERROR),
380 : : errmsg("conflicting or redundant options")));
1881 akapila@postgresql.o 381 :CBC 379 : streaming_given = true;
382 : :
1023 383 : 379 : data->streaming = defGetStreamingMode(defel);
384 : : }
1567 385 [ + + ]: 395 : else if (strcmp(defel->defname, "two_phase") == 0)
386 : : {
387 [ - + ]: 7 : if (two_phase_option_given)
1567 akapila@postgresql.o 388 [ # # ]:UBC 0 : ereport(ERROR,
389 : : (errcode(ERRCODE_SYNTAX_ERROR),
390 : : errmsg("conflicting or redundant options")));
1567 akapila@postgresql.o 391 :CBC 7 : two_phase_option_given = true;
392 : :
393 : 7 : data->two_phase = defGetBoolean(defel);
394 : : }
1195 395 [ + - ]: 388 : else if (strcmp(defel->defname, "origin") == 0)
396 : : {
397 : : char *origin;
398 : :
399 [ - + ]: 388 : if (origin_option_given)
1195 akapila@postgresql.o 400 [ # # ]:UBC 0 : ereport(ERROR,
401 : : errcode(ERRCODE_SYNTAX_ERROR),
402 : : errmsg("conflicting or redundant options"));
1195 akapila@postgresql.o 403 :CBC 388 : origin_option_given = true;
404 : :
762 405 : 388 : origin = defGetString(defel);
406 [ + + ]: 388 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
407 : 24 : data->publish_no_origin = true;
408 [ + - ]: 364 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
409 : 364 : data->publish_no_origin = false;
410 : : else
1195 akapila@postgresql.o 411 [ # # ]:UBC 0 : ereport(ERROR,
412 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413 : : errmsg("unrecognized origin value: \"%s\"", origin));
414 : : }
415 : : else
3204 peter_e@gmx.net 416 [ # # ]: 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
417 : : }
418 : :
419 : : /* Check required options */
679 akapila@postgresql.o 420 [ - + ]:CBC 397 : if (!protocol_version_given)
679 akapila@postgresql.o 421 [ # # ]:UBC 0 : ereport(ERROR,
422 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
423 : : errmsg("option \"%s\" missing", "proto_version"));
679 akapila@postgresql.o 424 [ - + ]:CBC 397 : if (!publication_names_given)
679 akapila@postgresql.o 425 [ # # ]:UBC 0 : ereport(ERROR,
426 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427 : : errmsg("option \"%s\" missing", "publication_names"));
3204 peter_e@gmx.net 428 :CBC 397 : }
429 : :
430 : : /*
431 : : * Memory context reset callback of PGOutputData->context.
432 : : */
433 : : static void
19 msawada@postgresql.o 434 : 1034 : pgoutput_memory_context_reset(void *arg)
435 : : {
436 [ + + ]: 1034 : if (RelationSyncCache)
437 : : {
438 : 194 : hash_destroy(RelationSyncCache);
439 : 194 : RelationSyncCache = NULL;
440 : : }
441 : 1034 : }
442 : :
443 : : /*
444 : : * Initialize this plugin
445 : : */
446 : : static void
3086 bruce@momjian.us 447 : 720 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
448 : : bool is_init)
449 : : {
450 : 720 : PGOutputData *data = palloc0(sizeof(PGOutputData));
451 : : static bool publication_callback_registered = false;
452 : : MemoryContextCallback *mcallback;
453 : :
454 : : /* Create our memory context for private allocations. */
3204 peter_e@gmx.net 455 : 720 : data->context = AllocSetContextCreate(ctx->context,
456 : : "logical replication output context",
457 : : ALLOCSET_DEFAULT_SIZES);
458 : :
1344 akapila@postgresql.o 459 : 720 : data->cachectx = AllocSetContextCreate(ctx->context,
460 : : "logical replication cache context",
461 : : ALLOCSET_DEFAULT_SIZES);
462 : :
323 michael@paquier.xyz 463 : 720 : data->pubctx = AllocSetContextCreate(ctx->context,
464 : : "logical replication publication list context",
465 : : ALLOCSET_SMALL_SIZES);
466 : :
467 : : /*
468 : : * Ensure to cleanup RelationSyncCache even when logical decoding invoked
469 : : * via SQL interface ends up with an error.
470 : : */
19 msawada@postgresql.o 471 : 720 : mcallback = palloc0(sizeof(MemoryContextCallback));
472 : 720 : mcallback->func = pgoutput_memory_context_reset;
473 : 720 : MemoryContextRegisterResetCallback(ctx->context, mcallback);
474 : :
3204 peter_e@gmx.net 475 : 720 : ctx->output_plugin_private = data;
476 : :
477 : : /* This plugin uses binary protocol. */
478 : 720 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
479 : :
480 : : /*
481 : : * This is replication start and not slot initialization.
482 : : *
483 : : * Parse and validate options passed by the client.
484 : : */
485 [ + + ]: 720 : if (!is_init)
486 : : {
487 : : /* Parse the params and ERROR if we see any we don't recognize */
1666 akapila@postgresql.o 488 : 397 : parse_output_parameters(ctx->output_plugin_options, data);
489 : :
490 : : /* Check if we support requested protocol */
1858 491 [ - + ]: 397 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
3204 peter_e@gmx.net 492 [ # # ]:UBC 0 : ereport(ERROR,
493 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
494 : : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
495 : : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
496 : :
3204 peter_e@gmx.net 497 [ - + ]:CBC 397 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
3204 peter_e@gmx.net 498 [ # # ]:UBC 0 : ereport(ERROR,
499 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
500 : : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
501 : : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
502 : :
503 : : /*
504 : : * Decide whether to enable streaming. It is disabled by default, in
505 : : * which case we just update the flag in decoding context. Otherwise
506 : : * we only allow it with sufficient version of the protocol, and when
507 : : * the output plugin supports it.
508 : : */
1023 akapila@postgresql.o 509 [ + + ]:CBC 397 : if (data->streaming == LOGICALREP_STREAM_OFF)
1881 510 : 18 : ctx->streaming = false;
1023 511 [ + + ]: 379 : else if (data->streaming == LOGICALREP_STREAM_ON &&
512 [ - + ]: 25 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
1881 akapila@postgresql.o 513 [ # # ]:UBC 0 : ereport(ERROR,
514 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
515 : : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
516 : : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
1023 akapila@postgresql.o 517 [ + + ]:CBC 379 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
518 [ - + ]: 354 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
1023 akapila@postgresql.o 519 [ # # ]:UBC 0 : ereport(ERROR,
520 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
521 : : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
522 : : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
1881 akapila@postgresql.o 523 [ - + ]:CBC 379 : else if (!ctx->streaming)
1881 akapila@postgresql.o 524 [ # # ]:UBC 0 : ereport(ERROR,
525 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
526 : : errmsg("streaming requested, but not supported by output plugin")));
527 : :
528 : : /*
529 : : * Here, we just check whether the two-phase option is passed by
530 : : * plugin and decide whether to enable it at later point of time. It
531 : : * remains enabled if the previous start-up has done so. But we only
532 : : * allow the option to be passed in with sufficient version of the
533 : : * protocol, and when the output plugin supports it.
534 : : */
1567 akapila@postgresql.o 535 [ + + ]:CBC 397 : if (!data->two_phase)
536 : 390 : ctx->twophase_opt_given = false;
537 [ - + ]: 7 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
1567 akapila@postgresql.o 538 [ # # ]:UBC 0 : ereport(ERROR,
539 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
540 : : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
541 : : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
1567 akapila@postgresql.o 542 [ - + ]:CBC 7 : else if (!ctx->twophase)
1567 akapila@postgresql.o 543 [ # # ]:UBC 0 : ereport(ERROR,
544 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
545 : : errmsg("two-phase commit requested, but not supported by output plugin")));
546 : : else
1567 akapila@postgresql.o 547 :CBC 7 : ctx->twophase_opt_given = true;
548 : :
549 : : /* Init publication state. */
3204 peter_e@gmx.net 550 : 397 : data->publications = NIL;
551 : 397 : publications_valid = false;
552 : :
553 : : /*
554 : : * Register callback for pg_publication if we didn't already do that
555 : : * during some previous call in this process.
556 : : */
978 tgl@sss.pgh.pa.us 557 [ + + ]: 397 : if (!publication_callback_registered)
558 : : {
559 : 395 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
560 : : publication_invalidation_cb,
561 : : (Datum) 0);
229 akapila@postgresql.o 562 : 395 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
563 : : (Datum) 0);
978 tgl@sss.pgh.pa.us 564 : 395 : publication_callback_registered = true;
565 : : }
566 : :
567 : : /* Initialize relation schema cache. */
3204 peter_e@gmx.net 568 : 397 : init_rel_sync_cache(CacheMemoryContext);
569 : : }
570 : : else
571 : : {
572 : : /*
573 : : * Disable the streaming and prepared transactions during the slot
574 : : * initialization mode.
575 : : */
1881 akapila@postgresql.o 576 : 323 : ctx->streaming = false;
1567 577 : 323 : ctx->twophase = false;
578 : : }
3204 peter_e@gmx.net 579 : 720 : }
580 : :
581 : : /*
582 : : * BEGIN callback.
583 : : *
584 : : * Don't send the BEGIN message here instead postpone it until the first
585 : : * change. In logical replication, a common scenario is to replicate a set of
586 : : * tables (instead of all tables) and transactions whose changes were on
587 : : * the table(s) that are not published will produce empty transactions. These
588 : : * empty transactions will send BEGIN and COMMIT messages to subscribers,
589 : : * using bandwidth on something with little/no use for logical replication.
590 : : */
591 : : static void
1265 tgl@sss.pgh.pa.us 592 : 1002 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
593 : : {
594 : 1002 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
595 : : sizeof(PGOutputTxnData));
596 : :
1308 akapila@postgresql.o 597 : 1002 : txn->output_plugin_private = txndata;
598 : 1002 : }
599 : :
600 : : /*
601 : : * Send BEGIN.
602 : : *
603 : : * This is called while processing the first change of the transaction.
604 : : */
605 : : static void
606 : 460 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
607 : : {
3086 bruce@momjian.us 608 : 460 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1308 akapila@postgresql.o 609 : 460 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
610 : :
611 [ - + ]: 460 : Assert(txndata);
612 [ - + ]: 460 : Assert(!txndata->sent_begin_txn);
613 : :
3204 peter_e@gmx.net 614 : 460 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
615 : 460 : logicalrep_write_begin(ctx->out, txn);
1308 akapila@postgresql.o 616 : 460 : txndata->sent_begin_txn = true;
617 : :
1567 618 : 460 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
619 : : send_replication_origin);
620 : :
3204 peter_e@gmx.net 621 : 460 : OutputPluginWrite(ctx, true);
622 : 459 : }
623 : :
624 : : /*
625 : : * COMMIT callback
626 : : */
627 : : static void
628 : 999 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
629 : : XLogRecPtr commit_lsn)
630 : : {
1308 akapila@postgresql.o 631 : 999 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
632 : : bool sent_begin_txn;
633 : :
634 [ - + ]: 999 : Assert(txndata);
635 : :
636 : : /*
637 : : * We don't need to send the commit message unless some relevant change
638 : : * from this transaction has been sent to the downstream.
639 : : */
640 : 999 : sent_begin_txn = txndata->sent_begin_txn;
993 641 : 999 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
1308 642 : 999 : pfree(txndata);
643 : 999 : txn->output_plugin_private = NULL;
644 : :
645 [ + + ]: 999 : if (!sent_begin_txn)
646 : : {
647 [ + + ]: 541 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
648 : 541 : return;
649 : : }
650 : :
3204 peter_e@gmx.net 651 : 458 : OutputPluginPrepareWrite(ctx, true);
652 : 458 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
653 : 458 : OutputPluginWrite(ctx, true);
654 : : }
655 : :
656 : : /*
657 : : * BEGIN PREPARE callback
658 : : */
659 : : static void
1567 akapila@postgresql.o 660 : 17 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
661 : : {
662 : 17 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
663 : :
664 : 17 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
665 : 17 : logicalrep_write_begin_prepare(ctx->out, txn);
666 : :
667 : 17 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
668 : : send_replication_origin);
669 : :
670 : 17 : OutputPluginWrite(ctx, true);
671 : 17 : }
672 : :
673 : : /*
674 : : * PREPARE callback
675 : : */
676 : : static void
677 : 17 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
678 : : XLogRecPtr prepare_lsn)
679 : : {
993 680 : 17 : OutputPluginUpdateProgress(ctx, false);
681 : :
1567 682 : 17 : OutputPluginPrepareWrite(ctx, true);
683 : 17 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
684 : 17 : OutputPluginWrite(ctx, true);
685 : 17 : }
686 : :
687 : : /*
688 : : * COMMIT PREPARED callback
689 : : */
690 : : static void
691 : 21 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
692 : : XLogRecPtr commit_lsn)
693 : : {
993 694 : 21 : OutputPluginUpdateProgress(ctx, false);
695 : :
1567 696 : 21 : OutputPluginPrepareWrite(ctx, true);
697 : 21 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
698 : 21 : OutputPluginWrite(ctx, true);
699 : 21 : }
700 : :
701 : : /*
702 : : * ROLLBACK PREPARED callback
703 : : */
704 : : static void
705 : 5 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
706 : : ReorderBufferTXN *txn,
707 : : XLogRecPtr prepare_end_lsn,
708 : : TimestampTz prepare_time)
709 : : {
993 710 : 5 : OutputPluginUpdateProgress(ctx, false);
711 : :
1567 712 : 5 : OutputPluginPrepareWrite(ctx, true);
713 : 5 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
714 : : prepare_time);
715 : 5 : OutputPluginWrite(ctx, true);
716 : 5 : }
717 : :
718 : : /*
719 : : * Write the current schema of the relation and its ancestor (if any) if not
720 : : * done yet.
721 : : */
722 : : static void
2761 peter_e@gmx.net 723 : 182223 : maybe_send_schema(LogicalDecodingContext *ctx,
724 : : ReorderBufferChange *change,
725 : : Relation relation, RelationSyncEntry *relentry)
726 : : {
761 michael@paquier.xyz 727 : 182223 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
728 : : bool schema_sent;
1881 akapila@postgresql.o 729 : 182223 : TransactionId xid = InvalidTransactionId;
730 : 182223 : TransactionId topxid = InvalidTransactionId;
731 : :
732 : : /*
733 : : * Remember XID of the (sub)transaction for the change. We don't care if
734 : : * it's top-level transaction or not (we have already sent that XID in
735 : : * start of the current streaming block).
736 : : *
737 : : * If we're not in a streaming block, just use InvalidTransactionId and
738 : : * the write methods will not include it.
739 : : */
761 michael@paquier.xyz 740 [ + + ]: 182223 : if (data->in_streaming)
1881 akapila@postgresql.o 741 : 175901 : xid = change->txn->xid;
742 : :
956 743 [ + + ]: 182223 : if (rbtxn_is_subtxn(change->txn))
744 [ + - ]: 10169 : topxid = rbtxn_get_toptxn(change->txn)->xid;
745 : : else
1881 746 : 172054 : topxid = xid;
747 : :
748 : : /*
749 : : * Do we need to send the schema? We do track streamed transactions
750 : : * separately, because those may be applied later (and the regular
751 : : * transactions won't see their effects until then) and in an order that
752 : : * we don't know at this point.
753 : : *
754 : : * XXX There is a scope of optimization here. Currently, we always send
755 : : * the schema first time in a streaming transaction but we can probably
756 : : * avoid that by checking 'relentry->schema_sent' flag. However, before
757 : : * doing that we need to study its impact on the case where we have a mix
758 : : * of streaming and non-streaming transactions.
759 : : */
761 michael@paquier.xyz 760 [ + + ]: 182223 : if (data->in_streaming)
1881 akapila@postgresql.o 761 : 175901 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
762 : : else
763 : 6322 : schema_sent = relentry->schema_sent;
764 : :
765 : : /* Nothing to do if we already sent the schema. */
766 [ + + ]: 182223 : if (schema_sent)
2029 peter@eisentraut.org 767 : 181882 : return;
768 : :
769 : : /*
770 : : * Send the schema. If the changes will be published using an ancestor's
771 : : * schema, not the relation's own, send that ancestor's schema before
772 : : * sending relation's own (XXX - maybe sending only the former suffices?).
773 : : */
774 [ + + ]: 341 : if (relentry->publish_as_relid != RelationGetRelid(relation))
775 : : {
776 : 32 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
777 : :
355 akapila@postgresql.o 778 : 32 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
2029 peter@eisentraut.org 779 : 31 : RelationClose(ancestor);
780 : : }
781 : :
355 akapila@postgresql.o 782 : 340 : send_relation_and_attrs(relation, xid, ctx, relentry);
783 : :
761 michael@paquier.xyz 784 [ + + ]: 340 : if (data->in_streaming)
1881 akapila@postgresql.o 785 : 63 : set_schema_sent_in_streamed_txn(relentry, topxid);
786 : : else
787 : 277 : relentry->schema_sent = true;
788 : : }
789 : :
790 : : /*
791 : : * Sends a relation
792 : : */
793 : : static void
794 : 372 : send_relation_and_attrs(Relation relation, TransactionId xid,
795 : : LogicalDecodingContext *ctx,
796 : : RelationSyncEntry *relentry)
797 : : {
2029 peter@eisentraut.org 798 : 372 : TupleDesc desc = RelationGetDescr(relation);
355 akapila@postgresql.o 799 : 372 : Bitmapset *columns = relentry->columns;
278 800 : 372 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
801 : : int i;
802 : :
803 : : /*
804 : : * Write out type info if needed. We do that only for user-created types.
805 : : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
806 : : * objects with hand-assigned OIDs to be "built in", not for instance any
807 : : * function or type defined in the information_schema. This is important
808 : : * because only hand-assigned OIDs can be expected to remain stable across
809 : : * major versions.
810 : : */
2029 peter@eisentraut.org 811 [ + + ]: 1164 : for (i = 0; i < desc->natts; i++)
812 : : {
813 : 792 : Form_pg_attribute att = TupleDescAttr(desc, i);
814 : :
278 akapila@postgresql.o 815 [ + + ]: 792 : if (!logicalrep_should_publish_column(att, columns,
816 : : include_gencols_type))
2029 peter@eisentraut.org 817 : 71 : continue;
818 : :
819 [ + + ]: 721 : if (att->atttypid < FirstGenbkiObjectId)
820 : 703 : continue;
821 : :
2761 peter_e@gmx.net 822 : 18 : OutputPluginPrepareWrite(ctx, false);
1881 akapila@postgresql.o 823 : 18 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
2761 peter_e@gmx.net 824 : 18 : OutputPluginWrite(ctx, false);
825 : : }
826 : :
2029 peter@eisentraut.org 827 : 372 : OutputPluginPrepareWrite(ctx, false);
278 akapila@postgresql.o 828 : 372 : logicalrep_write_rel(ctx->out, xid, relation, columns,
829 : : include_gencols_type);
2029 peter@eisentraut.org 830 : 372 : OutputPluginWrite(ctx, false);
2761 peter_e@gmx.net 831 : 371 : }
832 : :
833 : : /*
834 : : * Executor state preparation for evaluation of row filter expressions for the
835 : : * specified relation.
836 : : */
837 : : static EState *
1344 akapila@postgresql.o 838 : 17 : create_estate_for_relation(Relation rel)
839 : : {
840 : : EState *estate;
841 : : RangeTblEntry *rte;
967 tgl@sss.pgh.pa.us 842 : 17 : List *perminfos = NIL;
843 : :
1344 akapila@postgresql.o 844 : 17 : estate = CreateExecutorState();
845 : :
846 : 17 : rte = makeNode(RangeTblEntry);
847 : 17 : rte->rtekind = RTE_RELATION;
848 : 17 : rte->relid = RelationGetRelid(rel);
849 : 17 : rte->relkind = rel->rd_rel->relkind;
850 : 17 : rte->rellockmode = AccessShareLock;
851 : :
967 tgl@sss.pgh.pa.us 852 : 17 : addRTEPermissionInfo(&perminfos, rte);
853 : :
263 amitlan@postgresql.o 854 : 17 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
855 : : bms_make_singleton(1));
856 : :
1344 akapila@postgresql.o 857 : 17 : estate->es_output_cid = GetCurrentCommandId(false);
858 : :
859 : 17 : return estate;
860 : : }
861 : :
862 : : /*
863 : : * Evaluates row filter.
864 : : *
865 : : * If the row filter evaluates to NULL, it is taken as false i.e. the change
866 : : * isn't replicated.
867 : : */
868 : : static bool
869 : 38 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
870 : : {
871 : : Datum ret;
872 : : bool isnull;
873 : :
874 [ - + ]: 38 : Assert(state != NULL);
875 : :
876 : 38 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
877 : :
878 [ - + - - : 38 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
- - - - ]
879 : : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
880 : : isnull ? "true" : "false");
881 : :
882 [ + + ]: 38 : if (isnull)
883 : 1 : return false;
884 : :
885 : 37 : return DatumGetBool(ret);
886 : : }
887 : :
888 : : /*
889 : : * Make sure the per-entry memory context exists.
890 : : */
891 : : static void
1312 tomas.vondra@postgre 892 : 327 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
893 : : {
894 : : Relation relation;
895 : :
896 : : /* The context may already exist, in which case bail out. */
897 [ + + ]: 327 : if (entry->entry_cxt)
898 : 17 : return;
899 : :
900 : 310 : relation = RelationIdGetRelation(entry->publish_as_relid);
901 : :
902 : 310 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
903 : : "entry private context",
904 : : ALLOCSET_SMALL_SIZES);
905 : :
906 : 310 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
907 : : RelationGetRelationName(relation));
908 : : }
909 : :
910 : : /*
911 : : * Initialize the row filter.
912 : : */
913 : : static void
1344 akapila@postgresql.o 914 : 310 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
915 : : RelationSyncEntry *entry)
916 : : {
917 : : ListCell *lc;
918 : 310 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
919 : 310 : bool no_filter[] = {false, false, false}; /* One per pubaction */
920 : : MemoryContext oldctx;
921 : : int idx;
922 : 310 : bool has_filter = true;
1131 923 : 310 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
924 : :
925 : : /*
926 : : * Find if there are any row filters for this relation. If there are, then
927 : : * prepare the necessary ExprState and cache it in entry->exprstate. To
928 : : * build an expression state, we need to ensure the following:
929 : : *
930 : : * All the given publication-table mappings must be checked.
931 : : *
932 : : * Multiple publications might have multiple row filters for this
933 : : * relation. Since row filter usage depends on the DML operation, there
934 : : * are multiple lists (one for each operation) to which row filters will
935 : : * be appended.
936 : : *
937 : : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
938 : : * expression" so it takes precedence.
939 : : */
1344 940 [ + - + + : 331 : foreach(lc, publications)
+ + ]
941 : : {
942 : 314 : Publication *pub = lfirst(lc);
943 : 314 : HeapTuple rftuple = NULL;
944 : 314 : Datum rfdatum = 0;
1131 945 : 314 : bool pub_no_filter = true;
946 : :
947 : : /*
948 : : * If the publication is FOR ALL TABLES, or the publication includes a
949 : : * FOR TABLES IN SCHEMA where the table belongs to the referred
950 : : * schema, then it is treated the same as if there are no row filters
951 : : * (even if other publications have a row filter).
952 : : */
953 [ + + ]: 314 : if (!pub->alltables &&
954 [ + + ]: 236 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
955 : : ObjectIdGetDatum(schemaid),
956 : : ObjectIdGetDatum(pub->oid)))
957 : : {
958 : : /*
959 : : * Check for the presence of a row filter in this publication.
960 : : */
1344 961 : 229 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
962 : : ObjectIdGetDatum(entry->publish_as_relid),
963 : : ObjectIdGetDatum(pub->oid));
964 : :
965 [ + + ]: 229 : if (HeapTupleIsValid(rftuple))
966 : : {
967 : : /* Null indicates no filter. */
968 : 217 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
969 : : Anum_pg_publication_rel_prqual,
970 : : &pub_no_filter);
971 : : }
972 : : }
973 : :
974 [ + + ]: 314 : if (pub_no_filter)
975 : : {
976 [ + + ]: 300 : if (rftuple)
977 : 203 : ReleaseSysCache(rftuple);
978 : :
979 : 300 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
980 : 300 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
981 : 300 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
982 : :
983 : : /*
984 : : * Quick exit if all the DML actions are publicized via this
985 : : * publication.
986 : : */
987 [ + - ]: 300 : if (no_filter[PUBACTION_INSERT] &&
988 [ + + ]: 300 : no_filter[PUBACTION_UPDATE] &&
989 [ + - ]: 293 : no_filter[PUBACTION_DELETE])
990 : : {
991 : 293 : has_filter = false;
992 : 293 : break;
993 : : }
994 : :
995 : : /* No additional work for this publication. Next one. */
996 : 7 : continue;
997 : : }
998 : :
999 : : /* Form the per pubaction row filter lists. */
1000 [ + - + - ]: 14 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
1001 : 14 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
1002 : 14 : TextDatumGetCString(rfdatum));
1003 [ + - + - ]: 14 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
1004 : 14 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
1005 : 14 : TextDatumGetCString(rfdatum));
1006 [ + - + - ]: 14 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
1007 : 14 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
1008 : 14 : TextDatumGetCString(rfdatum));
1009 : :
1010 : 14 : ReleaseSysCache(rftuple);
1011 : : } /* loop all subscribed publications */
1012 : :
1013 : : /* Clean the row filter */
1014 [ + + ]: 1240 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1015 : : {
1016 [ + + ]: 930 : if (no_filter[idx])
1017 : : {
1018 : 888 : list_free_deep(rfnodes[idx]);
1019 : 888 : rfnodes[idx] = NIL;
1020 : : }
1021 : : }
1022 : :
1023 [ + + ]: 310 : if (has_filter)
1024 : : {
1025 : 17 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1026 : :
1312 tomas.vondra@postgre 1027 : 17 : pgoutput_ensure_entry_cxt(data, entry);
1028 : :
1029 : : /*
1030 : : * Now all the filters for all pubactions are known. Combine them when
1031 : : * their pubactions are the same.
1032 : : */
1033 : 17 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1344 akapila@postgresql.o 1034 : 17 : entry->estate = create_estate_for_relation(relation);
1035 [ + + ]: 68 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1036 : : {
1037 : 51 : List *filters = NIL;
1038 : : Expr *rfnode;
1039 : :
1040 [ + + ]: 51 : if (rfnodes[idx] == NIL)
1041 : 21 : continue;
1042 : :
1043 [ + - + + : 63 : foreach(lc, rfnodes[idx])
+ + ]
263 peter@eisentraut.org 1044 : 33 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1045 : :
1046 : : /* combine the row filter and cache the ExprState */
1344 akapila@postgresql.o 1047 : 30 : rfnode = make_orclause(filters);
1048 : 30 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1049 : : } /* for each pubaction */
1050 : 17 : MemoryContextSwitchTo(oldctx);
1051 : :
1052 : 17 : RelationClose(relation);
1053 : : }
1054 : 310 : }
1055 : :
1056 : : /*
1057 : : * If the table contains a generated column, check for any conflicting
1058 : : * values of 'publish_generated_columns' parameter in the publications.
1059 : : */
1060 : : static void
355 1061 : 310 : check_and_init_gencol(PGOutputData *data, List *publications,
1062 : : RelationSyncEntry *entry)
1063 : : {
1064 : 310 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1065 : 310 : TupleDesc desc = RelationGetDescr(relation);
1066 : 310 : bool gencolpresent = false;
1067 : 310 : bool first = true;
1068 : :
1069 : : /* Check if there is any generated column present. */
1070 [ + + ]: 959 : for (int i = 0; i < desc->natts; i++)
1071 : : {
6 drowley@postgresql.o 1072 :GNC 656 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1073 : :
355 akapila@postgresql.o 1074 [ + + ]:CBC 656 : if (att->attgenerated)
1075 : : {
1076 : 7 : gencolpresent = true;
1077 : 7 : break;
1078 : : }
1079 : : }
1080 : :
1081 : : /* There are no generated columns to be published. */
1082 [ + + ]: 310 : if (!gencolpresent)
1083 : : {
278 1084 : 303 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
355 1085 : 303 : return;
1086 : : }
1087 : :
1088 : : /*
1089 : : * There may be a conflicting value for 'publish_generated_columns'
1090 : : * parameter in the publications.
1091 : : */
1092 [ + - + + : 22 : foreach_ptr(Publication, pub, publications)
+ + ]
1093 : : {
1094 : : /*
1095 : : * The column list takes precedence over the
1096 : : * 'publish_generated_columns' parameter. Those will be checked later,
1097 : : * see pgoutput_column_list_init.
1098 : : */
1099 [ + + ]: 8 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1100 : 3 : continue;
1101 : :
1102 [ + - ]: 5 : if (first)
1103 : : {
278 1104 : 5 : entry->include_gencols_type = pub->pubgencols_type;
355 1105 : 5 : first = false;
1106 : : }
278 akapila@postgresql.o 1107 [ # # ]:UBC 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
355 1108 [ # # ]: 0 : ereport(ERROR,
1109 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1110 : : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1111 : : get_namespace_name(RelationGetNamespace(relation)),
1112 : : RelationGetRelationName(relation)));
1113 : : }
1114 : : }
1115 : :
1116 : : /*
1117 : : * Initialize the column list.
1118 : : */
1119 : : static void
1312 tomas.vondra@postgre 1120 :CBC 310 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1121 : : RelationSyncEntry *entry)
1122 : : {
1123 : : ListCell *lc;
1244 akapila@postgresql.o 1124 : 310 : bool first = true;
1125 : 310 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
355 1126 : 310 : bool found_pub_collist = false;
1127 : 310 : Bitmapset *relcols = NULL;
1128 : :
1129 : 310 : pgoutput_ensure_entry_cxt(data, entry);
1130 : :
1131 : : /*
1132 : : * Find if there are any column lists for this relation. If there are,
1133 : : * build a bitmap using the column lists.
1134 : : *
1135 : : * Multiple publications might have multiple column lists for this
1136 : : * relation.
1137 : : *
1138 : : * Note that we don't support the case where the column list is different
1139 : : * for the same table when combining publications. See comments atop
1140 : : * fetch_relation_list. But one can later change the publication so we
1141 : : * still need to check all the given publication-table mappings and report
1142 : : * an error if any publications have a different column list.
1143 : : */
1312 tomas.vondra@postgre 1144 [ + - + + : 630 : foreach(lc, publications)
+ + ]
1145 : : {
1146 : 321 : Publication *pub = lfirst(lc);
1244 akapila@postgresql.o 1147 : 321 : Bitmapset *cols = NULL;
1148 : :
1149 : : /* Retrieve the bitmap of columns for a column list publication. */
355 1150 : 321 : found_pub_collist |= check_and_fetch_column_list(pub,
1151 : : entry->publish_as_relid,
1152 : : entry->entry_cxt, &cols);
1153 : :
1154 : : /*
1155 : : * For non-column list publications — e.g. TABLE (without a column
1156 : : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1157 : : * of the table (including generated columns when
1158 : : * 'publish_generated_columns' parameter is true).
1159 : : */
1160 [ + + ]: 321 : if (!cols)
1161 : : {
1162 : : /*
1163 : : * Cache the table columns for the first publication with no
1164 : : * specified column list to detect publication with a different
1165 : : * column list.
1166 : : */
1167 [ + + + + ]: 282 : if (!relcols && (list_length(publications) > 1))
1168 : : {
1169 : 9 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1170 : :
278 1171 : 9 : relcols = pub_form_cols_map(relation,
1172 : : entry->include_gencols_type);
355 1173 : 9 : MemoryContextSwitchTo(oldcxt);
1174 : : }
1175 : :
1176 : 282 : cols = relcols;
1177 : : }
1178 : :
1244 1179 [ + + ]: 321 : if (first)
1180 : : {
1181 : 310 : entry->columns = cols;
1182 : 310 : first = false;
1183 : : }
1184 [ + + ]: 11 : else if (!bms_equal(entry->columns, cols))
1185 [ + - ]: 1 : ereport(ERROR,
1186 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1187 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1188 : : get_namespace_name(RelationGetNamespace(relation)),
1189 : : RelationGetRelationName(relation)));
1190 : : } /* loop all subscribed publications */
1191 : :
1192 : : /*
1193 : : * If no column list publications exist, columns to be published will be
1194 : : * computed later according to the 'publish_generated_columns' parameter.
1195 : : */
355 1196 [ + + ]: 309 : if (!found_pub_collist)
1197 : 273 : entry->columns = NULL;
1198 : :
1244 1199 : 309 : RelationClose(relation);
1312 tomas.vondra@postgre 1200 : 309 : }
1201 : :
1202 : : /*
1203 : : * Initialize the slot for storing new and old tuples, and build the map that
1204 : : * will be used to convert the relation's tuples into the ancestor's format.
1205 : : */
1206 : : static void
1344 akapila@postgresql.o 1207 : 310 : init_tuple_slot(PGOutputData *data, Relation relation,
1208 : : RelationSyncEntry *entry)
1209 : : {
1210 : : MemoryContext oldctx;
1211 : : TupleDesc oldtupdesc;
1212 : : TupleDesc newtupdesc;
1213 : :
1214 : 310 : oldctx = MemoryContextSwitchTo(data->cachectx);
1215 : :
1216 : : /*
1217 : : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1218 : : * live as long as the cache remains.
1219 : : */
701 1220 : 310 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1221 : 310 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1222 : :
1344 1223 : 310 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1224 : 310 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1225 : :
1226 : 310 : MemoryContextSwitchTo(oldctx);
1227 : :
1228 : : /*
1229 : : * Cache the map that will be used to convert the relation's tuples into
1230 : : * the ancestor's format, if needed.
1231 : : */
1232 [ + + ]: 310 : if (entry->publish_as_relid != RelationGetRelid(relation))
1233 : : {
1234 : 37 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1235 : 37 : TupleDesc indesc = RelationGetDescr(relation);
1236 : 37 : TupleDesc outdesc = RelationGetDescr(ancestor);
1237 : :
1238 : : /* Map must live as long as the logical decoding context. */
302 michael@paquier.xyz 1239 : 37 : oldctx = MemoryContextSwitchTo(data->cachectx);
1240 : :
1064 alvherre@alvh.no-ip. 1241 : 37 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1242 : :
1344 akapila@postgresql.o 1243 : 37 : MemoryContextSwitchTo(oldctx);
1244 : 37 : RelationClose(ancestor);
1245 : : }
1246 : 310 : }
1247 : :
1248 : : /*
1249 : : * Change is checked against the row filter if any.
1250 : : *
1251 : : * Returns true if the change is to be replicated, else false.
1252 : : *
1253 : : * For inserts, evaluate the row filter for new tuple.
1254 : : * For deletes, evaluate the row filter for old tuple.
1255 : : * For updates, evaluate the row filter for old and new tuple.
1256 : : *
1257 : : * For updates, if both evaluations are true, we allow sending the UPDATE and
1258 : : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1259 : : * only one of the tuples matches the row filter expression, we transform
1260 : : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1261 : : * following rules:
1262 : : *
1263 : : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1264 : : * Case 2: old-row (no match) new row (match) -> INSERT
1265 : : * Case 3: old-row (match) new-row (no match) -> DELETE
1266 : : * Case 4: old-row (match) new row (match) -> UPDATE
1267 : : *
1268 : : * The new action is updated in the action parameter.
1269 : : *
1270 : : * The new slot could be updated when transforming the UPDATE into INSERT,
1271 : : * because the original new tuple might not have column values from the replica
1272 : : * identity.
1273 : : *
1274 : : * Examples:
1275 : : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1276 : : * Since the old tuple satisfies, the initial table synchronization copied this
1277 : : * row (or another method was used to guarantee that there is data
1278 : : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1279 : : * row filter, so from a data consistency perspective, that row should be
1280 : : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1281 : : * statement and be sent to the subscriber. Keeping this row on the subscriber
1282 : : * is undesirable because it doesn't reflect what was defined in the row filter
1283 : : * expression on the publisher. This row on the subscriber would likely not be
1284 : : * modified by replication again. If someone inserted a new row with the same
1285 : : * old identifier, replication could stop due to a constraint violation.
1286 : : *
1287 : : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1288 : : * Since the old tuple doesn't satisfy, the initial table synchronization
1289 : : * probably didn't copy this row. However, after the UPDATE the new tuple does
1290 : : * satisfy the row filter, so from a data consistency perspective, that row
1291 : : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1292 : : * statements have no effect (it matches no row -- see
1293 : : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1294 : : * INSERT statement and be sent to the subscriber. However, this might surprise
1295 : : * someone who expects the data set to satisfy the row filter expression on the
1296 : : * provider.
1297 : : */
1298 : : static bool
1299 : 182220 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1300 : : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1301 : : ReorderBufferChangeType *action)
1302 : : {
1303 : : TupleDesc desc;
1304 : : int i;
1305 : : bool old_matched,
1306 : : new_matched,
1307 : : result;
1308 : : TupleTableSlot *tmp_new_slot;
1309 : 182220 : TupleTableSlot *new_slot = *new_slot_ptr;
1310 : : ExprContext *ecxt;
1311 : : ExprState *filter_exprstate;
1312 : :
1313 : : /*
1314 : : * We need this map to avoid relying on ReorderBufferChangeType enums
1315 : : * having specific values.
1316 : : */
1317 : : static const int map_changetype_pubaction[] = {
1318 : : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1319 : : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1320 : : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1321 : : };
1322 : :
1323 [ + + + + : 182220 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
- + ]
1324 : : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1325 : : *action == REORDER_BUFFER_CHANGE_DELETE);
1326 : :
1327 [ + + - + ]: 182220 : Assert(new_slot || old_slot);
1328 : :
1329 : : /* Get the corresponding row filter */
1330 : 182220 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1331 : :
1332 : : /* Bail out if there is no row filter */
1333 [ + + ]: 182220 : if (!filter_exprstate)
1334 : 182186 : return true;
1335 : :
1336 [ - + ]: 34 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1337 : : get_namespace_name(RelationGetNamespace(relation)),
1338 : : RelationGetRelationName(relation));
1339 : :
1340 [ + + ]: 34 : ResetPerTupleExprContext(entry->estate);
1341 : :
1342 [ + + ]: 34 : ecxt = GetPerTupleExprContext(entry->estate);
1343 : :
1344 : : /*
1345 : : * For the following occasions where there is only one tuple, we can
1346 : : * evaluate the row filter for that tuple and return.
1347 : : *
1348 : : * For inserts, we only have the new tuple.
1349 : : *
1350 : : * For updates, we can have only a new tuple when none of the replica
1351 : : * identity columns changed and none of those columns have external data
1352 : : * but we still need to evaluate the row filter for the new tuple as the
1353 : : * existing values of those columns might not match the filter. Also,
1354 : : * users can use constant expressions in the row filter, so we anyway need
1355 : : * to evaluate it for the new tuple.
1356 : : *
1357 : : * For deletes, we only have the old tuple.
1358 : : */
1359 [ + + + + ]: 34 : if (!new_slot || !old_slot)
1360 : : {
1361 [ + + ]: 30 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1362 : 30 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1363 : :
1364 : 30 : return result;
1365 : : }
1366 : :
1367 : : /*
1368 : : * Both the old and new tuples must be valid only for updates and need to
1369 : : * be checked against the row filter.
1370 : : */
1371 [ - + ]: 4 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1372 : :
1373 : 4 : slot_getallattrs(new_slot);
1374 : 4 : slot_getallattrs(old_slot);
1375 : :
1376 : 4 : tmp_new_slot = NULL;
1377 : 4 : desc = RelationGetDescr(relation);
1378 : :
1379 : : /*
1380 : : * The new tuple might not have all the replica identity columns, in which
1381 : : * case it needs to be copied over from the old tuple.
1382 : : */
1383 [ + + ]: 12 : for (i = 0; i < desc->natts; i++)
1384 : : {
312 drowley@postgresql.o 1385 : 8 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1386 : :
1387 : : /*
1388 : : * if the column in the new tuple or old tuple is null, nothing to do
1389 : : */
1344 akapila@postgresql.o 1390 [ + + - + ]: 8 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1391 : 1 : continue;
1392 : :
1393 : : /*
1394 : : * Unchanged toasted replica identity columns are only logged in the
1395 : : * old tuple. Copy this over to the new tuple. The changed (or WAL
1396 : : * Logged) toast values are always assembled in memory and set as
1397 : : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1398 : : */
1399 [ + + + + ]: 11 : if (att->attlen == -1 &&
84 peter@eisentraut.org 1400 :GNC 4 : VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1401 [ + - ]: 1 : !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1402 : : {
1344 akapila@postgresql.o 1403 [ + - ]:CBC 1 : if (!tmp_new_slot)
1404 : : {
1405 : 1 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1406 : 1 : ExecClearTuple(tmp_new_slot);
1407 : :
1408 : 1 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1409 : 1 : desc->natts * sizeof(Datum));
1410 : 1 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1411 : 1 : desc->natts * sizeof(bool));
1412 : : }
1413 : :
1414 : 1 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1415 : 1 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1416 : : }
1417 : : }
1418 : :
1419 : 4 : ecxt->ecxt_scantuple = old_slot;
1420 : 4 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1421 : :
1422 [ + + ]: 4 : if (tmp_new_slot)
1423 : : {
1424 : 1 : ExecStoreVirtualTuple(tmp_new_slot);
1425 : 1 : ecxt->ecxt_scantuple = tmp_new_slot;
1426 : : }
1427 : : else
1428 : 3 : ecxt->ecxt_scantuple = new_slot;
1429 : :
1430 : 4 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1431 : :
1432 : : /*
1433 : : * Case 1: if both tuples don't match the row filter, bailout. Send
1434 : : * nothing.
1435 : : */
1436 [ + + - + ]: 4 : if (!old_matched && !new_matched)
1344 akapila@postgresql.o 1437 :UBC 0 : return false;
1438 : :
1439 : : /*
1440 : : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1441 : : * tuple does, transform the UPDATE into INSERT.
1442 : : *
1443 : : * Use the newly transformed tuple that must contain the column values for
1444 : : * all the replica identity columns. This is required to ensure that the
1445 : : * while inserting the tuple in the downstream node, we have all the
1446 : : * required column values.
1447 : : */
1344 akapila@postgresql.o 1448 [ + + + - ]:CBC 4 : if (!old_matched && new_matched)
1449 : : {
1450 : 2 : *action = REORDER_BUFFER_CHANGE_INSERT;
1451 : :
1452 [ + + ]: 2 : if (tmp_new_slot)
1453 : 1 : *new_slot_ptr = tmp_new_slot;
1454 : : }
1455 : :
1456 : : /*
1457 : : * Case 3: if the old tuple satisfies the row filter but the new tuple
1458 : : * doesn't, transform the UPDATE into DELETE.
1459 : : *
1460 : : * This transformation does not require another tuple. The Old tuple will
1461 : : * be used for DELETE.
1462 : : */
1463 [ + - + + ]: 2 : else if (old_matched && !new_matched)
1464 : 1 : *action = REORDER_BUFFER_CHANGE_DELETE;
1465 : :
1466 : : /*
1467 : : * Case 4: if both tuples match the row filter, transformation isn't
1468 : : * required. (*action is default UPDATE).
1469 : : */
1470 : :
1471 : 4 : return true;
1472 : : }
1473 : :
1474 : : /*
1475 : : * Sends the decoded DML over wire.
1476 : : *
1477 : : * This is called both in streaming and non-streaming modes.
1478 : : */
1479 : : static void
3204 peter_e@gmx.net 1480 : 183405 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1481 : : Relation relation, ReorderBufferChange *change)
1482 : : {
3086 bruce@momjian.us 1483 : 183405 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1308 akapila@postgresql.o 1484 : 183405 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1485 : : MemoryContext old;
1486 : : RelationSyncEntry *relentry;
1881 1487 : 183405 : TransactionId xid = InvalidTransactionId;
1750 1488 : 183405 : Relation ancestor = NULL;
1344 1489 : 183405 : Relation targetrel = relation;
1490 : 183405 : ReorderBufferChangeType action = change->action;
1491 : 183405 : TupleTableSlot *old_slot = NULL;
1492 : 183405 : TupleTableSlot *new_slot = NULL;
1493 : :
2804 peter_e@gmx.net 1494 [ - + ]: 183405 : if (!is_publishable_relation(relation))
1495 : 1184 : return;
1496 : :
1497 : : /*
1498 : : * Remember the xid for the change in streaming mode. We need to send xid
1499 : : * with each change in the streaming mode so that subscriber can make
1500 : : * their association and on aborts, it can discard the corresponding
1501 : : * changes.
1502 : : */
761 michael@paquier.xyz 1503 [ + + ]: 183405 : if (data->in_streaming)
1881 akapila@postgresql.o 1504 : 175901 : xid = change->txn->xid;
1505 : :
1344 1506 : 183405 : relentry = get_rel_sync_entry(data, relation);
1507 : :
1508 : : /* First check the table filter */
1509 [ + + + - ]: 183404 : switch (action)
1510 : : {
3204 peter_e@gmx.net 1511 : 105981 : case REORDER_BUFFER_CHANGE_INSERT:
1512 [ + + ]: 105981 : if (!relentry->pubactions.pubinsert)
1513 : 85 : return;
1514 : 105896 : break;
1515 : 34494 : case REORDER_BUFFER_CHANGE_UPDATE:
1516 [ + + ]: 34494 : if (!relentry->pubactions.pubupdate)
1517 : 44 : return;
1518 : 34450 : break;
1519 : 42929 : case REORDER_BUFFER_CHANGE_DELETE:
1520 [ + + ]: 42929 : if (!relentry->pubactions.pubdelete)
1521 : 1055 : return;
1522 : :
1523 : : /*
1524 : : * This is only possible if deletes are allowed even when replica
1525 : : * identity is not defined for a table. Since the DELETE action
1526 : : * can't be published, we simply return.
1527 : : */
943 akapila@postgresql.o 1528 [ - + ]: 41874 : if (!change->data.tp.oldtuple)
1529 : : {
943 akapila@postgresql.o 1530 [ # # ]:UBC 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1531 : 0 : return;
1532 : : }
3204 peter_e@gmx.net 1533 :CBC 41874 : break;
3204 peter_e@gmx.net 1534 :UBC 0 : default:
1535 : 0 : Assert(false);
1536 : : }
1537 : :
1538 : : /* Avoid leaking memory by using and resetting our own context */
3204 peter_e@gmx.net 1539 :CBC 182220 : old = MemoryContextSwitchTo(data->context);
1540 : :
1541 : : /* Switch relation if publishing via root. */
943 akapila@postgresql.o 1542 [ + + ]: 182220 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1543 : : {
1544 [ - + ]: 70 : Assert(relation->rd_rel->relispartition);
1545 : 70 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1546 : 70 : targetrel = ancestor;
1547 : : }
1548 : :
1549 [ + + ]: 182220 : if (change->data.tp.oldtuple)
1550 : : {
1551 : 42023 : old_slot = relentry->old_slot;
638 msawada@postgresql.o 1552 : 42023 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1553 : :
1554 : : /* Convert tuple if needed. */
943 akapila@postgresql.o 1555 [ + + ]: 42023 : if (relentry->attrmap)
1556 : : {
1557 : 5 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1558 : : &TTSOpsVirtual);
1559 : :
1560 : 5 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1561 : : }
1562 : : }
1563 : :
1564 [ + + ]: 182220 : if (change->data.tp.newtuple)
1565 : : {
1566 : 140346 : new_slot = relentry->new_slot;
638 msawada@postgresql.o 1567 : 140346 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1568 : :
1569 : : /* Convert tuple if needed. */
943 akapila@postgresql.o 1570 [ + + ]: 140346 : if (relentry->attrmap)
1571 : : {
1572 : 20 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1573 : : &TTSOpsVirtual);
1574 : :
1575 : 20 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1576 : : }
1577 : : }
1578 : :
1579 : : /*
1580 : : * Check row filter.
1581 : : *
1582 : : * Updates could be transformed to inserts or deletes based on the results
1583 : : * of the row filter for old and new tuple.
1584 : : */
1585 [ + + ]: 182220 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1586 : 12 : goto cleanup;
1587 : :
1588 : : /*
1589 : : * Send BEGIN if we haven't yet.
1590 : : *
1591 : : * We send the BEGIN message after ensuring that we will actually send the
1592 : : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1593 : : * transactions.
1594 : : */
1595 [ + + + + ]: 182208 : if (txndata && !txndata->sent_begin_txn)
1596 : 448 : pgoutput_send_begin(ctx, txn);
1597 : :
1598 : : /*
1599 : : * Schema should be sent using the original relation because it also sends
1600 : : * the ancestor's relation.
1601 : : */
1602 : 182207 : maybe_send_schema(ctx, change, relation, relentry);
1603 : :
1604 : 182206 : OutputPluginPrepareWrite(ctx, true);
1605 : :
1606 : : /* Send the data */
1607 [ + + + - ]: 182206 : switch (action)
1608 : : {
1609 : 105884 : case REORDER_BUFFER_CHANGE_INSERT:
1610 : 105884 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
355 1611 : 105884 : data->binary, relentry->columns,
1612 : : relentry->include_gencols_type);
943 1613 : 105884 : break;
1614 : 34447 : case REORDER_BUFFER_CHANGE_UPDATE:
1615 : 34447 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
355 1616 : 34447 : new_slot, data->binary, relentry->columns,
1617 : : relentry->include_gencols_type);
1344 1618 : 34447 : break;
3204 peter_e@gmx.net 1619 : 41875 : case REORDER_BUFFER_CHANGE_DELETE:
943 akapila@postgresql.o 1620 : 41875 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
355 1621 : 41875 : data->binary, relentry->columns,
1622 : : relentry->include_gencols_type);
3204 peter_e@gmx.net 1623 : 41875 : break;
3204 peter_e@gmx.net 1624 :UBC 0 : default:
1625 : 0 : Assert(false);
1626 : : }
1627 : :
943 akapila@postgresql.o 1628 :CBC 182206 : OutputPluginWrite(ctx, true);
1629 : :
1630 : 182218 : cleanup:
1750 1631 [ + + ]: 182218 : if (RelationIsValid(ancestor))
1632 : : {
1633 : 68 : RelationClose(ancestor);
1634 : 68 : ancestor = NULL;
1635 : : }
1636 : :
1637 : : /* Drop the new slots that were used to store the converted tuples. */
488 1638 [ + + ]: 182218 : if (relentry->attrmap)
1639 : : {
1640 [ + + ]: 25 : if (old_slot)
1641 : 5 : ExecDropSingleTupleTableSlot(old_slot);
1642 : :
1643 [ + + ]: 25 : if (new_slot)
1644 : 20 : ExecDropSingleTupleTableSlot(new_slot);
1645 : : }
1646 : :
3204 peter_e@gmx.net 1647 : 182218 : MemoryContextSwitchTo(old);
1648 : 182218 : MemoryContextReset(data->context);
1649 : : }
1650 : :
1651 : : static void
2761 1652 : 14 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1653 : : int nrelations, Relation relations[], ReorderBufferChange *change)
1654 : : {
1655 : 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1308 akapila@postgresql.o 1656 : 14 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1657 : : MemoryContext old;
1658 : : RelationSyncEntry *relentry;
1659 : : int i;
1660 : : int nrelids;
1661 : : Oid *relids;
1881 1662 : 14 : TransactionId xid = InvalidTransactionId;
1663 : :
1664 : : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
761 michael@paquier.xyz 1665 [ - + ]: 14 : if (data->in_streaming)
1881 akapila@postgresql.o 1666 :UBC 0 : xid = change->txn->xid;
1667 : :
2761 peter_e@gmx.net 1668 :CBC 14 : old = MemoryContextSwitchTo(data->context);
1669 : :
1670 : 14 : relids = palloc0(nrelations * sizeof(Oid));
1671 : 14 : nrelids = 0;
1672 : :
1673 [ + + ]: 47 : for (i = 0; i < nrelations; i++)
1674 : : {
1675 : 33 : Relation relation = relations[i];
1676 : 33 : Oid relid = RelationGetRelid(relation);
1677 : :
1678 [ - + ]: 33 : if (!is_publishable_relation(relation))
2761 peter_e@gmx.net 1679 :UBC 0 : continue;
1680 : :
1344 akapila@postgresql.o 1681 :CBC 33 : relentry = get_rel_sync_entry(data, relation);
1682 : :
2761 peter_e@gmx.net 1683 [ + + ]: 33 : if (!relentry->pubactions.pubtruncate)
1684 : 14 : continue;
1685 : :
1686 : : /*
1687 : : * Don't send partitions if the publication wants to send only the
1688 : : * root tables through it.
1689 : : */
2029 peter@eisentraut.org 1690 [ + + ]: 19 : if (relation->rd_rel->relispartition &&
1691 [ + + ]: 15 : relentry->publish_as_relid != relid)
2058 1692 : 3 : continue;
1693 : :
2761 peter_e@gmx.net 1694 : 16 : relids[nrelids++] = relid;
1695 : :
1696 : : /* Send BEGIN if we haven't yet */
1308 akapila@postgresql.o 1697 [ + - + + ]: 16 : if (txndata && !txndata->sent_begin_txn)
1698 : 10 : pgoutput_send_begin(ctx, txn);
1699 : :
1545 fujii@postgresql.org 1700 : 16 : maybe_send_schema(ctx, change, relation, relentry);
1701 : : }
1702 : :
2738 peter_e@gmx.net 1703 [ + + ]: 14 : if (nrelids > 0)
1704 : : {
1705 : 10 : OutputPluginPrepareWrite(ctx, true);
1706 : 10 : logicalrep_write_truncate(ctx->out,
1707 : : xid,
1708 : : nrelids,
1709 : : relids,
1710 : 10 : change->data.truncate.cascade,
1711 : 10 : change->data.truncate.restart_seqs);
1712 : 10 : OutputPluginWrite(ctx, true);
1713 : : }
1714 : :
2761 1715 : 14 : MemoryContextSwitchTo(old);
1716 : 14 : MemoryContextReset(data->context);
1717 : 14 : }
1718 : :
1719 : : static void
1666 akapila@postgresql.o 1720 : 7 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1721 : : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1722 : : const char *message)
1723 : : {
1724 : 7 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1725 : 7 : TransactionId xid = InvalidTransactionId;
1726 : :
1727 [ + + ]: 7 : if (!data->messages)
1728 : 2 : return;
1729 : :
1730 : : /*
1731 : : * Remember the xid for the message in streaming mode. See
1732 : : * pgoutput_change.
1733 : : */
761 michael@paquier.xyz 1734 [ - + ]: 5 : if (data->in_streaming)
1666 akapila@postgresql.o 1735 :UBC 0 : xid = txn->xid;
1736 : :
1737 : : /*
1738 : : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1739 : : */
1308 akapila@postgresql.o 1740 [ + + ]:CBC 5 : if (transactional)
1741 : : {
1742 : 2 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1743 : :
1744 : : /* Send BEGIN if we haven't yet */
1745 [ + - + - ]: 2 : if (txndata && !txndata->sent_begin_txn)
1746 : 2 : pgoutput_send_begin(ctx, txn);
1747 : : }
1748 : :
1666 1749 : 5 : OutputPluginPrepareWrite(ctx, true);
1750 : 5 : logicalrep_write_message(ctx->out,
1751 : : xid,
1752 : : message_lsn,
1753 : : transactional,
1754 : : prefix,
1755 : : sz,
1756 : : message);
1757 : 5 : OutputPluginWrite(ctx, true);
1758 : : }
1759 : :
1760 : : /*
1761 : : * Return true if the data is associated with an origin and the user has
1762 : : * requested the changes that don't have an origin, false otherwise.
1763 : : */
1764 : : static bool
3204 peter_e@gmx.net 1765 : 305561 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1766 : : RepOriginId origin_id)
1767 : : {
762 akapila@postgresql.o 1768 : 305561 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1769 : :
1770 [ + + + + ]: 305561 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1195 1771 : 147 : return true;
1772 : :
3204 peter_e@gmx.net 1773 : 305414 : return false;
1774 : : }
1775 : :
1776 : : /*
1777 : : * Shutdown the output plugin.
1778 : : *
1779 : : * Note, we don't need to clean the data->context, data->cachectx, and
1780 : : * data->pubctx as they are child contexts of the ctx->context so they
1781 : : * will be cleaned up by logical decoding machinery.
1782 : : */
1783 : : static void
3086 bruce@momjian.us 1784 : 517 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1785 : : {
19 msawada@postgresql.o 1786 : 517 : pgoutput_memory_context_reset(NULL);
3204 peter_e@gmx.net 1787 : 517 : }
1788 : :
1789 : : /*
1790 : : * Load publications from the list of publication names.
1791 : : *
1792 : : * Here, we skip the publications that don't exist yet. This will allow us
1793 : : * to silently continue the replication in the absence of a missing publication.
1794 : : * This is required because we allow the users to create publications after they
1795 : : * have specified the required publications at the time of replication start.
1796 : : */
1797 : : static List *
1798 : 197 : LoadPublications(List *pubnames)
1799 : : {
1800 : 197 : List *result = NIL;
1801 : : ListCell *lc;
1802 : :
3086 bruce@momjian.us 1803 [ + - + + : 439 : foreach(lc, pubnames)
+ + ]
1804 : : {
1805 : 242 : char *pubname = (char *) lfirst(lc);
228 akapila@postgresql.o 1806 : 242 : Publication *pub = GetPublicationByName(pubname, true);
1807 : :
1808 [ + + ]: 242 : if (pub)
1809 : 240 : result = lappend(result, pub);
1810 : : else
1811 [ + - ]: 2 : ereport(WARNING,
1812 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1813 : : errmsg("skipped loading publication \"%s\"", pubname),
1814 : : errdetail("The publication does not exist at this point in the WAL."),
1815 : : errhint("Create the publication if it does not exist."));
1816 : : }
1817 : :
3204 peter_e@gmx.net 1818 : 197 : return result;
1819 : : }
1820 : :
1821 : : /*
1822 : : * Publication syscache invalidation callback.
1823 : : *
1824 : : * Called for invalidations on pg_publication.
1825 : : */
1826 : : static void
1827 : 332 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1828 : : {
1829 : 332 : publications_valid = false;
1830 : 332 : }
1831 : :
1832 : : /*
1833 : : * START STREAM callback
1834 : : */
1835 : : static void
1881 akapila@postgresql.o 1836 : 607 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1837 : : ReorderBufferTXN *txn)
1838 : : {
761 michael@paquier.xyz 1839 : 607 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1881 akapila@postgresql.o 1840 : 607 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1841 : :
1842 : : /* we can't nest streaming of transactions */
761 michael@paquier.xyz 1843 [ - + ]: 607 : Assert(!data->in_streaming);
1844 : :
1845 : : /*
1846 : : * If we already sent the first stream for this transaction then don't
1847 : : * send the origin id in the subsequent streams.
1848 : : */
1881 akapila@postgresql.o 1849 [ + + ]: 607 : if (rbtxn_is_streamed(txn))
1850 : 552 : send_replication_origin = false;
1851 : :
1852 : 607 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1853 : 607 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1854 : :
1567 1855 : 607 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1856 : : send_replication_origin);
1857 : :
1881 1858 : 607 : OutputPluginWrite(ctx, true);
1859 : :
1860 : : /* we're streaming a chunk of transaction now */
761 michael@paquier.xyz 1861 : 607 : data->in_streaming = true;
1881 akapila@postgresql.o 1862 : 607 : }
1863 : :
1864 : : /*
1865 : : * STOP STREAM callback
1866 : : */
1867 : : static void
1868 : 607 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1869 : : ReorderBufferTXN *txn)
1870 : : {
761 michael@paquier.xyz 1871 : 607 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1872 : :
1873 : : /* we should be streaming a transaction */
1874 [ - + ]: 607 : Assert(data->in_streaming);
1875 : :
1881 akapila@postgresql.o 1876 : 607 : OutputPluginPrepareWrite(ctx, true);
1877 : 607 : logicalrep_write_stream_stop(ctx->out);
1878 : 607 : OutputPluginWrite(ctx, true);
1879 : :
1880 : : /* we've stopped streaming a transaction */
761 michael@paquier.xyz 1881 : 607 : data->in_streaming = false;
1881 akapila@postgresql.o 1882 : 607 : }
1883 : :
1884 : : /*
1885 : : * Notify downstream to discard the streamed transaction (along with all
1886 : : * its subtransactions, if it's a toplevel transaction).
1887 : : */
1888 : : static void
1889 : 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1890 : : ReorderBufferTXN *txn,
1891 : : XLogRecPtr abort_lsn)
1892 : : {
1893 : : ReorderBufferTXN *toptxn;
1023 1894 : 26 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1895 : 26 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1896 : :
1897 : : /*
1898 : : * The abort should happen outside streaming block, even for streamed
1899 : : * transactions. The transaction has to be marked as streamed, though.
1900 : : */
761 michael@paquier.xyz 1901 [ - + ]: 26 : Assert(!data->in_streaming);
1902 : :
1903 : : /* determine the toplevel transaction */
956 akapila@postgresql.o 1904 [ + + ]: 26 : toptxn = rbtxn_get_toptxn(txn);
1905 : :
1881 1906 [ - + ]: 26 : Assert(rbtxn_is_streamed(toptxn));
1907 : :
1908 : 26 : OutputPluginPrepareWrite(ctx, true);
1023 1909 : 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1910 : : txn->abort_time, write_abort_info);
1911 : :
1881 1912 : 26 : OutputPluginWrite(ctx, true);
1913 : :
1914 : 26 : cleanup_rel_sync_cache(toptxn->xid, false);
1915 : 26 : }
1916 : :
1917 : : /*
1918 : : * Notify downstream to apply the streamed transaction (along with all
1919 : : * its subtransactions).
1920 : : */
1921 : : static void
1922 : 44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1923 : : ReorderBufferTXN *txn,
1924 : : XLogRecPtr commit_lsn)
1925 : : {
761 michael@paquier.xyz 1926 : 44 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1927 : :
1928 : : /*
1929 : : * The commit should happen outside streaming block, even for streamed
1930 : : * transactions. The transaction has to be marked as streamed, though.
1931 : : */
1932 [ - + ]: 44 : Assert(!data->in_streaming);
1881 akapila@postgresql.o 1933 [ - + ]: 44 : Assert(rbtxn_is_streamed(txn));
1934 : :
993 1935 : 44 : OutputPluginUpdateProgress(ctx, false);
1936 : :
1881 1937 : 44 : OutputPluginPrepareWrite(ctx, true);
1938 : 44 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1939 : 44 : OutputPluginWrite(ctx, true);
1940 : :
1941 : 44 : cleanup_rel_sync_cache(txn->xid, true);
1942 : 44 : }
1943 : :
1944 : : /*
1945 : : * PREPARE callback (for streaming two-phase commit).
1946 : : *
1947 : : * Notify the downstream to prepare the transaction.
1948 : : */
1949 : : static void
1546 1950 : 8 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1951 : : ReorderBufferTXN *txn,
1952 : : XLogRecPtr prepare_lsn)
1953 : : {
1954 [ - + ]: 8 : Assert(rbtxn_is_streamed(txn));
1955 : :
993 1956 : 8 : OutputPluginUpdateProgress(ctx, false);
1546 1957 : 8 : OutputPluginPrepareWrite(ctx, true);
1958 : 8 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1959 : 8 : OutputPluginWrite(ctx, true);
1960 : 8 : }
1961 : :
1962 : : /*
1963 : : * Initialize the relation schema sync cache for a decoding session.
1964 : : *
1965 : : * The hash table is destroyed at the end of a decoding session. While
1966 : : * relcache invalidations still exist and will still be invoked, they
1967 : : * will just see the null hash table global and take no action.
1968 : : */
1969 : : static void
3204 peter_e@gmx.net 1970 : 397 : init_rel_sync_cache(MemoryContext cachectx)
1971 : : {
1972 : : HASHCTL ctl;
1973 : : static bool relation_callbacks_registered = false;
1974 : :
1975 : : /* Nothing to do if hash table already exists */
1976 [ - + ]: 397 : if (RelationSyncCache != NULL)
1977 : 2 : return;
1978 : :
1979 : : /* Make a new hash table for the cache */
1980 : 397 : ctl.keysize = sizeof(Oid);
1981 : 397 : ctl.entrysize = sizeof(RelationSyncEntry);
1982 : 397 : ctl.hcxt = cachectx;
1983 : :
1984 : 397 : RelationSyncCache = hash_create("logical replication output relation cache",
1985 : : 128, &ctl,
1986 : : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1987 : :
1988 [ - + ]: 397 : Assert(RelationSyncCache != NULL);
1989 : :
1990 : : /* No more to do if we already registered callbacks */
978 tgl@sss.pgh.pa.us 1991 [ + + ]: 397 : if (relation_callbacks_registered)
1992 : 2 : return;
1993 : :
1994 : : /* We must update the cache entry for a relation after a relcache flush */
3204 peter_e@gmx.net 1995 : 395 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1996 : :
1997 : : /*
1998 : : * Flush all cache entries after a pg_namespace change, in case it was a
1999 : : * schema rename affecting a relation being replicated.
2000 : : *
2001 : : * XXX: It is not a good idea to invalidate all the relation entries in
2002 : : * RelationSyncCache on schema rename. We can optimize it to invalidate
2003 : : * only the required relations by either having a specific invalidation
2004 : : * message containing impacted relations or by having schema information
2005 : : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
2006 : : * passed to the callback.
2007 : : */
1026 tgl@sss.pgh.pa.us 2008 : 395 : CacheRegisterSyscacheCallback(NAMESPACEOID,
2009 : : rel_sync_cache_publication_cb,
2010 : : (Datum) 0);
2011 : :
978 2012 : 395 : relation_callbacks_registered = true;
2013 : : }
2014 : :
2015 : : /*
2016 : : * We expect relatively small number of streamed transactions.
2017 : : */
2018 : : static bool
1881 akapila@postgresql.o 2019 : 175901 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2020 : : {
1212 alvherre@alvh.no-ip. 2021 : 175901 : return list_member_xid(entry->streamed_txns, xid);
2022 : : }
2023 : :
2024 : : /*
2025 : : * Add the xid in the rel sync entry for which we have already sent the schema
2026 : : * of the relation.
2027 : : */
2028 : : static void
1881 akapila@postgresql.o 2029 : 63 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2030 : : {
2031 : : MemoryContext oldctx;
2032 : :
2033 : 63 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2034 : :
1212 alvherre@alvh.no-ip. 2035 : 63 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2036 : :
1881 akapila@postgresql.o 2037 : 63 : MemoryContextSwitchTo(oldctx);
2038 : 63 : }
2039 : :
2040 : : /*
2041 : : * Find or create entry in the relation schema cache.
2042 : : *
2043 : : * This looks up publications that the given relation is directly or
2044 : : * indirectly part of (the latter if it's really the relation's ancestor that
2045 : : * is part of a publication) and fills up the found entry with the information
2046 : : * about which operations to publish and whether to use an ancestor's schema
2047 : : * when publishing.
2048 : : */
2049 : : static RelationSyncEntry *
1344 2050 : 183438 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2051 : : {
2052 : : RelationSyncEntry *entry;
2053 : : bool found;
2054 : : MemoryContext oldctx;
2055 : 183438 : Oid relid = RelationGetRelid(relation);
2056 : :
3204 peter_e@gmx.net 2057 [ - + ]: 183438 : Assert(RelationSyncCache != NULL);
2058 : :
2059 : : /* Find cached relation info, creating if not found */
2060 : 183438 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2061 : : &relid,
2062 : : HASH_ENTER, &found);
2063 [ - + ]: 183438 : Assert(entry != NULL);
2064 : :
2065 : : /* initialize entry, if it's new */
1868 akapila@postgresql.o 2066 [ + + ]: 183438 : if (!found)
2067 : : {
1362 2068 : 307 : entry->replicate_valid = false;
1868 2069 : 307 : entry->schema_sent = false;
278 2070 : 307 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1868 2071 : 307 : entry->streamed_txns = NIL;
2072 : 307 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1300 tomas.vondra@postgre 2073 : 307 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1344 akapila@postgresql.o 2074 : 307 : entry->new_slot = NULL;
2075 : 307 : entry->old_slot = NULL;
2076 : 307 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1312 tomas.vondra@postgre 2077 : 307 : entry->entry_cxt = NULL;
1868 akapila@postgresql.o 2078 : 307 : entry->publish_as_relid = InvalidOid;
1312 tomas.vondra@postgre 2079 : 307 : entry->columns = NULL;
1344 akapila@postgresql.o 2080 : 307 : entry->attrmap = NULL;
2081 : : }
2082 : :
2083 : : /* Validate the entry */
1868 2084 [ + + ]: 183438 : if (!entry->replicate_valid)
2085 : : {
1462 2086 : 384 : Oid schemaId = get_rel_namespace(relid);
3204 peter_e@gmx.net 2087 : 384 : List *pubids = GetRelationPublications(relid);
2088 : :
2089 : : /*
2090 : : * We don't acquire a lock on the namespace system table as we build
2091 : : * the cache entry using a historic snapshot and all the later changes
2092 : : * are absorbed while decoding WAL.
2093 : : */
1300 tomas.vondra@postgre 2094 : 384 : List *schemaPubids = GetSchemaPublications(schemaId);
2095 : : ListCell *lc;
2029 peter@eisentraut.org 2096 : 384 : Oid publish_as_relid = relid;
1322 tomas.vondra@postgre 2097 : 384 : int publish_ancestor_level = 0;
1392 michael@paquier.xyz 2098 : 384 : bool am_partition = get_rel_relispartition(relid);
1300 tomas.vondra@postgre 2099 : 384 : char relkind = get_rel_relkind(relid);
1344 akapila@postgresql.o 2100 : 384 : List *rel_publications = NIL;
2101 : :
2102 : : /* Reload publications if needed before use. */
3204 peter_e@gmx.net 2103 [ + + ]: 384 : if (!publications_valid)
2104 : : {
323 michael@paquier.xyz 2105 : 197 : MemoryContextReset(data->pubctx);
2106 : :
2107 : 197 : oldctx = MemoryContextSwitchTo(data->pubctx);
3204 peter_e@gmx.net 2108 : 197 : data->publications = LoadPublications(data->publication_names);
2109 : 197 : MemoryContextSwitchTo(oldctx);
2110 : 197 : publications_valid = true;
2111 : : }
2112 : :
2113 : : /*
2114 : : * Reset schema_sent status as the relation definition may have
2115 : : * changed. Also reset pubactions to empty in case rel was dropped
2116 : : * from a publication. Also free any objects that depended on the
2117 : : * earlier definition.
2118 : : */
1362 akapila@postgresql.o 2119 : 384 : entry->schema_sent = false;
278 2120 : 384 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1362 2121 : 384 : list_free(entry->streamed_txns);
2122 : 384 : entry->streamed_txns = NIL;
1312 tomas.vondra@postgre 2123 : 384 : bms_free(entry->columns);
2124 : 384 : entry->columns = NULL;
1362 akapila@postgresql.o 2125 : 384 : entry->pubactions.pubinsert = false;
2126 : 384 : entry->pubactions.pubupdate = false;
2127 : 384 : entry->pubactions.pubdelete = false;
2128 : 384 : entry->pubactions.pubtruncate = false;
2129 : :
2130 : : /*
2131 : : * Tuple slots cleanups. (Will be rebuilt later if needed).
2132 : : */
1344 2133 [ + + ]: 384 : if (entry->old_slot)
2134 : : {
341 michael@paquier.xyz 2135 : 58 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2136 : :
2137 [ - + ]: 58 : Assert(desc->tdrefcount == -1);
2138 : :
1344 akapila@postgresql.o 2139 : 58 : ExecDropSingleTupleTableSlot(entry->old_slot);
2140 : :
2141 : : /*
2142 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2143 : : * do it now to avoid any leaks.
2144 : : */
341 michael@paquier.xyz 2145 : 58 : FreeTupleDesc(desc);
2146 : : }
1344 akapila@postgresql.o 2147 [ + + ]: 384 : if (entry->new_slot)
2148 : : {
341 michael@paquier.xyz 2149 : 58 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2150 : :
2151 [ - + ]: 58 : Assert(desc->tdrefcount == -1);
2152 : :
1344 akapila@postgresql.o 2153 : 58 : ExecDropSingleTupleTableSlot(entry->new_slot);
2154 : :
2155 : : /*
2156 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2157 : : * do it now to avoid any leaks.
2158 : : */
341 michael@paquier.xyz 2159 : 58 : FreeTupleDesc(desc);
2160 : : }
2161 : :
1344 akapila@postgresql.o 2162 : 384 : entry->old_slot = NULL;
2163 : 384 : entry->new_slot = NULL;
2164 : :
2165 [ + + ]: 384 : if (entry->attrmap)
2166 : 3 : free_attrmap(entry->attrmap);
2167 : 384 : entry->attrmap = NULL;
2168 : :
2169 : : /*
2170 : : * Row filter cache cleanups.
2171 : : */
1312 tomas.vondra@postgre 2172 [ + + ]: 384 : if (entry->entry_cxt)
2173 : 58 : MemoryContextDelete(entry->entry_cxt);
2174 : :
2175 : 384 : entry->entry_cxt = NULL;
1344 akapila@postgresql.o 2176 : 384 : entry->estate = NULL;
2177 : 384 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2178 : :
2179 : : /*
2180 : : * Build publication cache. We can't use one provided by relcache as
2181 : : * relcache considers all publications that the given relation is in,
2182 : : * but here we only need to consider ones that the subscriber
2183 : : * requested.
2184 : : */
3204 peter_e@gmx.net 2185 [ + + + + : 933 : foreach(lc, data->publications)
+ + ]
2186 : : {
2187 : 549 : Publication *pub = lfirst(lc);
2029 peter@eisentraut.org 2188 : 549 : bool publish = false;
2189 : :
2190 : : /*
2191 : : * Under what relid should we publish changes in this publication?
2192 : : * We'll use the top-most relid across all publications. Also
2193 : : * track the ancestor level for this publication.
2194 : : */
1265 tgl@sss.pgh.pa.us 2195 : 549 : Oid pub_relid = relid;
2196 : 549 : int ancestor_level = 0;
2197 : :
2198 : : /*
2199 : : * If this is a FOR ALL TABLES publication, pick the partition
2200 : : * root and set the ancestor level accordingly.
2201 : : */
1300 tomas.vondra@postgre 2202 [ + + ]: 549 : if (pub->alltables)
2203 : : {
2029 peter@eisentraut.org 2204 : 80 : publish = true;
2205 [ + + + - ]: 80 : if (pub->pubviaroot && am_partition)
2206 : : {
1322 tomas.vondra@postgre 2207 : 13 : List *ancestors = get_partition_ancestors(relid);
2208 : :
2209 : 13 : pub_relid = llast_oid(ancestors);
2210 : 13 : ancestor_level = list_length(ancestors);
2211 : : }
2212 : : }
2213 : :
2029 peter@eisentraut.org 2214 [ + + ]: 549 : if (!publish)
2215 : : {
1993 tgl@sss.pgh.pa.us 2216 : 469 : bool ancestor_published = false;
2217 : :
2218 : : /*
2219 : : * For a partition, check if any of the ancestors are
2220 : : * published. If so, note down the topmost ancestor that is
2221 : : * published via this publication, which will be used as the
2222 : : * relation via which to publish the partition's changes.
2223 : : */
2029 peter@eisentraut.org 2224 [ + + ]: 469 : if (am_partition)
2225 : : {
2226 : : Oid ancestor;
2227 : : int level;
1993 tgl@sss.pgh.pa.us 2228 : 121 : List *ancestors = get_partition_ancestors(relid);
2229 : :
1344 akapila@postgresql.o 2230 : 121 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2231 : : ancestors,
2232 : : &level);
2233 : :
2234 [ + + ]: 121 : if (ancestor != InvalidOid)
2235 : : {
2236 : 48 : ancestor_published = true;
2237 [ + + ]: 48 : if (pub->pubviaroot)
2238 : : {
1322 tomas.vondra@postgre 2239 : 25 : pub_relid = ancestor;
2240 : 25 : ancestor_level = level;
2241 : : }
2242 : : }
2243 : : }
2244 : :
1462 akapila@postgresql.o 2245 [ + + + + ]: 726 : if (list_member_oid(pubids, pub->oid) ||
2246 [ + + ]: 507 : list_member_oid(schemaPubids, pub->oid) ||
2247 : : ancestor_published)
2029 peter@eisentraut.org 2248 : 247 : publish = true;
2249 : : }
2250 : :
2251 : : /*
2252 : : * If the relation is to be published, determine actions to
2253 : : * publish, and list of columns, if appropriate.
2254 : : *
2255 : : * Don't publish changes for partitioned tables, because
2256 : : * publishing those of its partitions suffices, unless partition
2257 : : * changes won't be published due to pubviaroot being set.
2258 : : */
2259 [ + + + + ]: 549 : if (publish &&
2260 [ + + ]: 4 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2261 : : {
3204 peter_e@gmx.net 2262 : 324 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2263 : 324 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2264 : 324 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2761 2265 : 324 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2266 : :
2267 : : /*
2268 : : * We want to publish the changes as the top-most ancestor
2269 : : * across all publications. So we need to check if the already
2270 : : * calculated level is higher than the new one. If yes, we can
2271 : : * ignore the new value (as it's a child). Otherwise the new
2272 : : * value is an ancestor, so we keep it.
2273 : : */
1322 tomas.vondra@postgre 2274 [ + + ]: 324 : if (publish_ancestor_level > ancestor_level)
2275 : 1 : continue;
2276 : :
2277 : : /*
2278 : : * If we found an ancestor higher up in the tree, discard the
2279 : : * list of publications through which we replicate it, and use
2280 : : * the new ancestor.
2281 : : */
1321 2282 [ + + ]: 323 : if (publish_ancestor_level < ancestor_level)
2283 : : {
2284 : 38 : publish_as_relid = pub_relid;
2285 : 38 : publish_ancestor_level = ancestor_level;
2286 : :
2287 : : /* reset the publication list for this relation */
2288 : 38 : rel_publications = NIL;
2289 : : }
2290 : : else
2291 : : {
2292 : : /* Same ancestor level, has to be the same OID. */
2293 [ - + ]: 285 : Assert(publish_as_relid == pub_relid);
2294 : : }
2295 : :
2296 : : /* Track publications for this ancestor. */
2297 : 323 : rel_publications = lappend(rel_publications, pub);
2298 : : }
2299 : : }
2300 : :
1344 akapila@postgresql.o 2301 : 384 : entry->publish_as_relid = publish_as_relid;
2302 : :
2303 : : /*
2304 : : * Initialize the tuple slot, map, and row filter. These are only used
2305 : : * when publishing inserts, updates, or deletes.
2306 : : */
2307 [ + + + - ]: 384 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2308 [ - + ]: 74 : entry->pubactions.pubdelete)
2309 : : {
2310 : : /* Initialize the tuple slot and map */
2311 : 310 : init_tuple_slot(data, relation, entry);
2312 : :
2313 : : /* Initialize the row filter */
2314 : 310 : pgoutput_row_filter_init(data, rel_publications, entry);
2315 : :
2316 : : /* Check whether to publish generated columns. */
355 2317 : 310 : check_and_init_gencol(data, rel_publications, entry);
2318 : :
2319 : : /* Initialize the column list */
1312 tomas.vondra@postgre 2320 : 310 : pgoutput_column_list_init(data, rel_publications, entry);
2321 : : }
2322 : :
3204 peter_e@gmx.net 2323 : 383 : list_free(pubids);
1362 akapila@postgresql.o 2324 : 383 : list_free(schemaPubids);
1344 2325 : 383 : list_free(rel_publications);
2326 : :
3204 peter_e@gmx.net 2327 : 383 : entry->replicate_valid = true;
2328 : : }
2329 : :
2330 : 183437 : return entry;
2331 : : }
2332 : :
2333 : : /*
2334 : : * Cleanup list of streamed transactions and update the schema_sent flag.
2335 : : *
2336 : : * When a streamed transaction commits or aborts, we need to remove the
2337 : : * toplevel XID from the schema cache. If the transaction aborted, the
2338 : : * subscriber will simply throw away the schema records we streamed, so
2339 : : * we don't need to do anything else.
2340 : : *
2341 : : * If the transaction is committed, the subscriber will update the relation
2342 : : * cache - so tweak the schema_sent flag accordingly.
2343 : : */
2344 : : static void
1881 akapila@postgresql.o 2345 : 70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2346 : : {
2347 : : HASH_SEQ_STATUS hash_seq;
2348 : : RelationSyncEntry *entry;
2349 : :
2350 [ - + ]: 70 : Assert(RelationSyncCache != NULL);
2351 : :
2352 : 70 : hash_seq_init(&hash_seq, RelationSyncCache);
2353 [ + + ]: 143 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2354 : : {
2355 : : /*
2356 : : * We can set the schema_sent flag for an entry that has committed xid
2357 : : * in the list as that ensures that the subscriber would have the
2358 : : * corresponding schema and we don't need to send it unless there is
2359 : : * any invalidation for that relation.
2360 : : */
663 nathan@postgresql.or 2361 [ + + + - : 164 : foreach_xid(streamed_txn, entry->streamed_txns)
+ + ]
2362 : : {
2363 [ + + ]: 70 : if (xid == streamed_txn)
2364 : : {
1881 akapila@postgresql.o 2365 [ + + ]: 52 : if (is_commit)
2366 : 41 : entry->schema_sent = true;
2367 : :
2368 : 52 : entry->streamed_txns =
663 nathan@postgresql.or 2369 : 52 : foreach_delete_current(entry->streamed_txns, streamed_txn);
1881 akapila@postgresql.o 2370 : 52 : break;
2371 : : }
2372 : : }
2373 : : }
2374 : 70 : }
2375 : :
2376 : : /*
2377 : : * Relcache invalidation callback
2378 : : */
2379 : : static void
3204 peter_e@gmx.net 2380 : 4008 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2381 : : {
2382 : : RelationSyncEntry *entry;
2383 : :
2384 : : /*
2385 : : * We can get here if the plugin was used in SQL interface as the
2386 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2387 : : * no way to unregister the relcache invalidation callback.
2388 : : */
2389 [ + + ]: 4008 : if (RelationSyncCache == NULL)
2390 : 26 : return;
2391 : :
2392 : : /*
2393 : : * Nobody keeps pointers to entries in this hash table around outside
2394 : : * logical decoding callback calls - but invalidation events can come in
2395 : : * *during* a callback if we do any syscache access in the callback.
2396 : : * Because of that we must mark the cache entry as invalid but not damage
2397 : : * any of its substructure here. The next get_rel_sync_entry() call will
2398 : : * rebuild it all.
2399 : : */
1362 akapila@postgresql.o 2400 [ + + ]: 3982 : if (OidIsValid(relid))
2401 : : {
2402 : : /*
2403 : : * Getting invalidations for relations that aren't in the table is
2404 : : * entirely normal. So we don't care if it's found or not.
2405 : : */
2406 : 3925 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2407 : : HASH_FIND, NULL);
2408 [ + + ]: 3925 : if (entry != NULL)
2409 : 663 : entry->replicate_valid = false;
2410 : : }
2411 : : else
2412 : : {
2413 : : /* Whole cache must be flushed. */
2414 : : HASH_SEQ_STATUS status;
2415 : :
2416 : 57 : hash_seq_init(&status, RelationSyncCache);
2417 [ + + ]: 118 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2418 : : {
2419 : 61 : entry->replicate_valid = false;
2420 : : }
2421 : : }
2422 : : }
2423 : :
2424 : : /*
2425 : : * Publication relation/schema map syscache invalidation callback
2426 : : *
2427 : : * Called for invalidations on pg_namespace.
2428 : : */
2429 : : static void
3204 peter_e@gmx.net 2430 : 35 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2431 : : {
2432 : : HASH_SEQ_STATUS status;
2433 : : RelationSyncEntry *entry;
2434 : :
2435 : : /*
2436 : : * We can get here if the plugin was used in SQL interface as the
2437 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2438 : : * no way to unregister the invalidation callbacks.
2439 : : */
2440 [ + + ]: 35 : if (RelationSyncCache == NULL)
2441 : 10 : return;
2442 : :
2443 : : /*
2444 : : * We have no easy way to identify which cache entries this invalidation
2445 : : * event might have affected, so just mark them all invalid.
2446 : : */
2447 : 25 : hash_seq_init(&status, RelationSyncCache);
2448 [ + + ]: 46 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2449 : : {
2450 : 21 : entry->replicate_valid = false;
2451 : : }
2452 : : }
2453 : :
2454 : : /* Send Replication origin */
2455 : : static void
1567 akapila@postgresql.o 2456 : 1084 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2457 : : XLogRecPtr origin_lsn, bool send_origin)
2458 : : {
2459 [ + + ]: 1084 : if (send_origin)
2460 : : {
2461 : : char *origin;
2462 : :
2463 : : /*----------
2464 : : * XXX: which behaviour do we want here?
2465 : : *
2466 : : * Alternatives:
2467 : : * - don't send origin message if origin name not found
2468 : : * (that's what we do now)
2469 : : * - throw error - that will break replication, not good
2470 : : * - send some special "unknown" origin
2471 : : *----------
2472 : : */
2473 [ + - ]: 8 : if (replorigin_by_oid(origin_id, true, &origin))
2474 : : {
2475 : : /* Message boundary */
2476 : 8 : OutputPluginWrite(ctx, false);
2477 : 8 : OutputPluginPrepareWrite(ctx, true);
2478 : :
2479 : 8 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2480 : : }
2481 : : }
2482 : 1084 : }
|