Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgoutput.c
4 : : * Logical Replication output plugin
5 : : *
6 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgoutput/pgoutput.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/tupconvert.h"
16 : : #include "catalog/partition.h"
17 : : #include "catalog/pg_publication.h"
18 : : #include "catalog/pg_publication_rel.h"
19 : : #include "catalog/pg_subscription.h"
20 : : #include "commands/defrem.h"
21 : : #include "commands/subscriptioncmds.h"
22 : : #include "executor/executor.h"
23 : : #include "fmgr.h"
24 : : #include "nodes/makefuncs.h"
25 : : #include "parser/parse_relation.h"
26 : : #include "replication/logical.h"
27 : : #include "replication/logicalproto.h"
28 : : #include "replication/origin.h"
29 : : #include "replication/pgoutput.h"
30 : : #include "rewrite/rewriteHandler.h"
31 : : #include "utils/builtins.h"
32 : : #include "utils/inval.h"
33 : : #include "utils/lsyscache.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/rel.h"
36 : : #include "utils/syscache.h"
37 : : #include "utils/varlena.h"
38 : :
164 tgl@sss.pgh.pa.us 39 :CBC 528 : PG_MODULE_MAGIC_EXT(
40 : : .name = "pgoutput",
41 : : .version = PG_VERSION
42 : : );
43 : :
44 : : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : : OutputPluginOptions *opt, bool is_init);
46 : : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : : ReorderBufferTXN *txn);
49 : : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : : ReorderBufferTXN *txn, Relation relation,
53 : : ReorderBufferChange *change);
54 : : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : : ReorderBufferChange *change);
57 : : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : : bool transactional, const char *prefix,
60 : : Size sz, const char *message);
61 : : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : : RepOriginId origin_id);
63 : : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : : ReorderBufferTXN *txn);
65 : : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : : ReorderBufferTXN *txn,
71 : : XLogRecPtr prepare_end_lsn,
72 : : TimestampTz prepare_time);
73 : : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : : ReorderBufferTXN *txn);
75 : : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : : ReorderBufferTXN *txn);
77 : : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : : ReorderBufferTXN *txn,
79 : : XLogRecPtr abort_lsn);
80 : : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : : ReorderBufferTXN *txn,
82 : : XLogRecPtr commit_lsn);
83 : : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 : :
86 : : static bool publications_valid;
87 : :
88 : : static List *LoadPublications(List *pubnames);
89 : : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : : uint32 hashvalue);
91 : : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : : bool send_origin);
94 : :
95 : : /*
96 : : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : : * "delete"). See RelationSyncEntry.exprstate[].
98 : : */
99 : : enum RowFilterPubAction
100 : : {
101 : : PUBACTION_INSERT,
102 : : PUBACTION_UPDATE,
103 : : PUBACTION_DELETE,
104 : : };
105 : :
106 : : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 : :
108 : : /*
109 : : * Entry in the map used to remember which relation schemas we sent.
110 : : *
111 : : * The schema_sent flag determines if the current schema record for the
112 : : * relation (and for its ancestor if publish_as_relid is set) was already
113 : : * sent to the subscriber (in which case we don't need to send it again).
114 : : *
115 : : * The schema cache on downstream is however updated only at commit time,
116 : : * and with streamed transactions the commit order may be different from
117 : : * the order the transactions are sent in. Also, the (sub) transactions
118 : : * might get aborted so we need to send the schema for each (sub) transaction
119 : : * so that we don't lose the schema information on abort. For handling this,
120 : : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : : * the schema.
122 : : *
123 : : * For partitions, 'pubactions' considers not only the table's own
124 : : * publications, but also those of all of its ancestors.
125 : : */
126 : : typedef struct RelationSyncEntry
127 : : {
128 : : Oid relid; /* relation oid */
129 : :
130 : : bool replicate_valid; /* overall validity flag for entry */
131 : :
132 : : bool schema_sent;
133 : :
134 : : /*
135 : : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : : * columns and the 'publish_generated_columns' parameter is set to
137 : : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : : * indicating that no generated columns should be published, unless
139 : : * explicitly specified in the column list.
140 : : */
141 : : PublishGencolsType include_gencols_type;
142 : : List *streamed_txns; /* streamed toplevel transactions with this
143 : : * schema */
144 : :
145 : : /* are we publishing this rel? */
146 : : PublicationActions pubactions;
147 : :
148 : : /*
149 : : * ExprState array for row filter. Different publication actions don't
150 : : * allow multiple expressions to always be combined into one, because
151 : : * updates or deletes restrict the column in expression to be part of the
152 : : * replica identity index whereas inserts do not have this restriction, so
153 : : * there is one ExprState per publication action.
154 : : */
155 : : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : : EState *estate; /* executor state used for row filter */
157 : : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 : :
160 : : /*
161 : : * OID of the relation to publish changes as. For a partition, this may
162 : : * be set to one of its ancestors whose schema will be used when
163 : : * replicating changes, if publish_via_partition_root is set for the
164 : : * publication.
165 : : */
166 : : Oid publish_as_relid;
167 : :
168 : : /*
169 : : * Map used when replicating using an ancestor's schema to convert tuples
170 : : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : : * having identical TupleDesc.
173 : : */
174 : : AttrMap *attrmap;
175 : :
176 : : /*
177 : : * Columns included in the publication, or NULL if all columns are
178 : : * included implicitly. Note that the attnums in this bitmap are not
179 : : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : : */
181 : : Bitmapset *columns;
182 : :
183 : : /*
184 : : * Private context to store additional data for this entry - state for the
185 : : * row filter expressions, column list, etc.
186 : : */
187 : : MemoryContext entry_cxt;
188 : : } RelationSyncEntry;
189 : :
190 : : /*
191 : : * Maintain a per-transaction level variable to track whether the transaction
192 : : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : : * messages for empty transactions which saves network bandwidth.
195 : : *
196 : : * This optimization is not used for prepared transactions because if the
197 : : * WALSender restarts after prepare of a transaction and before commit prepared
198 : : * of the same transaction then we won't be able to figure out if we have
199 : : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : : * because we would have lost the in-memory txndata information that was
201 : : * present prior to the restart. This will result in sending a spurious
202 : : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : : * downstream which would lead to an error when it tries to process it.
204 : : *
205 : : * XXX We could achieve this optimization by changing protocol to send
206 : : * additional information so that downstream can detect that the corresponding
207 : : * prepare has not been sent. However, adding such a check for every
208 : : * transaction in the downstream could be costly so we might want to do it
209 : : * optionally.
210 : : *
211 : : * We also don't have this optimization for streamed transactions because
212 : : * they can contain prepared transactions.
213 : : */
214 : : typedef struct PGOutputTxnData
215 : : {
216 : : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : : } PGOutputTxnData;
218 : :
219 : : /* Map used to remember which relation schemas we sent. */
220 : : static HTAB *RelationSyncCache = NULL;
221 : :
222 : : static void init_rel_sync_cache(MemoryContext cachectx);
223 : : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : : Relation relation);
226 : : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : : LogicalDecodingContext *ctx,
228 : : RelationSyncEntry *relentry);
229 : : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : : uint32 hashvalue);
232 : : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : : TransactionId xid);
234 : : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : : TransactionId xid);
236 : : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : : RelationSyncEntry *entry);
238 : :
239 : : /* row filter routines */
240 : : static EState *create_estate_for_relation(Relation rel);
241 : : static void pgoutput_row_filter_init(PGOutputData *data,
242 : : List *publications,
243 : : RelationSyncEntry *entry);
244 : : static bool pgoutput_row_filter_exec_expr(ExprState *state,
245 : : ExprContext *econtext);
246 : : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
247 : : TupleTableSlot **new_slot_ptr,
248 : : RelationSyncEntry *entry,
249 : : ReorderBufferChangeType *action);
250 : :
251 : : /* column list routines */
252 : : static void pgoutput_column_list_init(PGOutputData *data,
253 : : List *publications,
254 : : RelationSyncEntry *entry);
255 : :
256 : : /*
257 : : * Specify output plugin callbacks
258 : : */
259 : : void
3152 peter_e@gmx.net 260 : 717 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
261 : : {
262 : 717 : cb->startup_cb = pgoutput_startup;
263 : 717 : cb->begin_cb = pgoutput_begin_txn;
264 : 717 : cb->change_cb = pgoutput_change;
2709 265 : 717 : cb->truncate_cb = pgoutput_truncate;
1614 akapila@postgresql.o 266 : 717 : cb->message_cb = pgoutput_message;
3152 peter_e@gmx.net 267 : 717 : cb->commit_cb = pgoutput_commit_txn;
268 : :
1515 akapila@postgresql.o 269 : 717 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
270 : 717 : cb->prepare_cb = pgoutput_prepare_txn;
271 : 717 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
272 : 717 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
3152 peter_e@gmx.net 273 : 717 : cb->filter_by_origin_cb = pgoutput_origin_filter;
274 : 717 : cb->shutdown_cb = pgoutput_shutdown;
275 : :
276 : : /* transaction streaming */
1829 akapila@postgresql.o 277 : 717 : cb->stream_start_cb = pgoutput_stream_start;
278 : 717 : cb->stream_stop_cb = pgoutput_stream_stop;
279 : 717 : cb->stream_abort_cb = pgoutput_stream_abort;
280 : 717 : cb->stream_commit_cb = pgoutput_stream_commit;
281 : 717 : cb->stream_change_cb = pgoutput_change;
1614 282 : 717 : cb->stream_message_cb = pgoutput_message;
1829 283 : 717 : cb->stream_truncate_cb = pgoutput_truncate;
284 : : /* transaction streaming - two-phase commit */
1494 285 : 717 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
3152 peter_e@gmx.net 286 : 717 : }
287 : :
288 : : static void
1614 akapila@postgresql.o 289 : 396 : parse_output_parameters(List *options, PGOutputData *data)
290 : : {
291 : : ListCell *lc;
3152 peter_e@gmx.net 292 : 396 : bool protocol_version_given = false;
293 : 396 : bool publication_names_given = false;
1876 tgl@sss.pgh.pa.us 294 : 396 : bool binary_option_given = false;
1614 akapila@postgresql.o 295 : 396 : bool messages_option_given = false;
1829 296 : 396 : bool streaming_given = false;
1515 297 : 396 : bool two_phase_option_given = false;
1143 298 : 396 : bool origin_option_given = false;
299 : :
300 : : /* Initialize optional parameters to defaults */
1614 301 : 396 : data->binary = false;
971 302 : 396 : data->streaming = LOGICALREP_STREAM_OFF;
1614 303 : 396 : data->messages = false;
1515 304 : 396 : data->two_phase = false;
52 fujii@postgresql.org 305 :GNC 396 : data->publish_no_origin = false;
306 : :
3152 peter_e@gmx.net 307 [ + - + + :CBC 1976 : foreach(lc, options)
+ + ]
308 : : {
309 : 1580 : DefElem *defel = (DefElem *) lfirst(lc);
310 : :
311 [ + - - + ]: 1580 : Assert(defel->arg == NULL || IsA(defel->arg, String));
312 : :
313 : : /* Check each param, whether or not we recognize it */
314 [ + + ]: 1580 : if (strcmp(defel->defname, "proto_version") == 0)
315 : : {
316 : : unsigned long parsed;
317 : : char *endptr;
318 : :
319 [ - + ]: 396 : if (protocol_version_given)
3152 peter_e@gmx.net 320 [ # # ]:UBC 0 : ereport(ERROR,
321 : : (errcode(ERRCODE_SYNTAX_ERROR),
322 : : errmsg("conflicting or redundant options")));
3152 peter_e@gmx.net 323 :CBC 396 : protocol_version_given = true;
324 : :
1300 peter@eisentraut.org 325 : 396 : errno = 0;
326 : 396 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
327 [ + - - + ]: 396 : if (errno != 0 || *endptr != '\0')
3152 peter_e@gmx.net 328 [ # # ]:UBC 0 : ereport(ERROR,
329 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 : : errmsg("invalid proto_version")));
331 : :
1300 peter@eisentraut.org 332 [ - + ]:CBC 396 : if (parsed > PG_UINT32_MAX)
3152 peter_e@gmx.net 333 [ # # ]:UBC 0 : ereport(ERROR,
334 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
335 : : errmsg("proto_version \"%s\" out of range",
336 : : strVal(defel->arg))));
337 : :
1614 akapila@postgresql.o 338 :CBC 396 : data->protocol_version = (uint32) parsed;
339 : : }
3152 peter_e@gmx.net 340 [ + + ]: 1184 : else if (strcmp(defel->defname, "publication_names") == 0)
341 : : {
342 [ - + ]: 396 : if (publication_names_given)
3152 peter_e@gmx.net 343 [ # # ]:UBC 0 : ereport(ERROR,
344 : : (errcode(ERRCODE_SYNTAX_ERROR),
345 : : errmsg("conflicting or redundant options")));
3152 peter_e@gmx.net 346 :CBC 396 : publication_names_given = true;
347 : :
348 [ - + ]: 396 : if (!SplitIdentifierString(strVal(defel->arg), ',',
349 : : &data->publication_names))
3034 bruce@momjian.us 350 [ # # ]:UBC 0 : ereport(ERROR,
351 : : (errcode(ERRCODE_INVALID_NAME),
352 : : errmsg("invalid publication_names syntax")));
353 : : }
1876 tgl@sss.pgh.pa.us 354 [ + + ]:CBC 788 : else if (strcmp(defel->defname, "binary") == 0)
355 : : {
356 [ - + ]: 11 : if (binary_option_given)
1876 tgl@sss.pgh.pa.us 357 [ # # ]:UBC 0 : ereport(ERROR,
358 : : (errcode(ERRCODE_SYNTAX_ERROR),
359 : : errmsg("conflicting or redundant options")));
1876 tgl@sss.pgh.pa.us 360 :CBC 11 : binary_option_given = true;
361 : :
1614 akapila@postgresql.o 362 : 11 : data->binary = defGetBoolean(defel);
363 : : }
364 [ + + ]: 777 : else if (strcmp(defel->defname, "messages") == 0)
365 : : {
366 [ - + ]: 4 : if (messages_option_given)
1614 akapila@postgresql.o 367 [ # # ]:UBC 0 : ereport(ERROR,
368 : : (errcode(ERRCODE_SYNTAX_ERROR),
369 : : errmsg("conflicting or redundant options")));
1614 akapila@postgresql.o 370 :CBC 4 : messages_option_given = true;
371 : :
372 : 4 : data->messages = defGetBoolean(defel);
373 : : }
1829 374 [ + + ]: 773 : else if (strcmp(defel->defname, "streaming") == 0)
375 : : {
376 [ - + ]: 378 : if (streaming_given)
1829 akapila@postgresql.o 377 [ # # ]:UBC 0 : ereport(ERROR,
378 : : (errcode(ERRCODE_SYNTAX_ERROR),
379 : : errmsg("conflicting or redundant options")));
1829 akapila@postgresql.o 380 :CBC 378 : streaming_given = true;
381 : :
971 382 : 378 : data->streaming = defGetStreamingMode(defel);
383 : : }
1515 384 [ + + ]: 395 : else if (strcmp(defel->defname, "two_phase") == 0)
385 : : {
386 [ - + ]: 8 : if (two_phase_option_given)
1515 akapila@postgresql.o 387 [ # # ]:UBC 0 : ereport(ERROR,
388 : : (errcode(ERRCODE_SYNTAX_ERROR),
389 : : errmsg("conflicting or redundant options")));
1515 akapila@postgresql.o 390 :CBC 8 : two_phase_option_given = true;
391 : :
392 : 8 : data->two_phase = defGetBoolean(defel);
393 : : }
1143 394 [ + - ]: 387 : else if (strcmp(defel->defname, "origin") == 0)
395 : : {
396 : : char *origin;
397 : :
398 [ - + ]: 387 : if (origin_option_given)
1143 akapila@postgresql.o 399 [ # # ]:UBC 0 : ereport(ERROR,
400 : : errcode(ERRCODE_SYNTAX_ERROR),
401 : : errmsg("conflicting or redundant options"));
1143 akapila@postgresql.o 402 :CBC 387 : origin_option_given = true;
403 : :
710 404 : 387 : origin = defGetString(defel);
405 [ + + ]: 387 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
406 : 23 : data->publish_no_origin = true;
407 [ + - ]: 364 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
408 : 364 : data->publish_no_origin = false;
409 : : else
1143 akapila@postgresql.o 410 [ # # ]:UBC 0 : ereport(ERROR,
411 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
412 : : errmsg("unrecognized origin value: \"%s\"", origin));
413 : : }
414 : : else
3152 peter_e@gmx.net 415 [ # # ]: 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
416 : : }
417 : :
418 : : /* Check required options */
627 akapila@postgresql.o 419 [ - + ]:CBC 396 : if (!protocol_version_given)
627 akapila@postgresql.o 420 [ # # ]:UBC 0 : ereport(ERROR,
421 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
422 : : errmsg("option \"%s\" missing", "proto_version"));
627 akapila@postgresql.o 423 [ - + ]:CBC 396 : if (!publication_names_given)
627 akapila@postgresql.o 424 [ # # ]:UBC 0 : ereport(ERROR,
425 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
426 : : errmsg("option \"%s\" missing", "publication_names"));
3152 peter_e@gmx.net 427 :CBC 396 : }
428 : :
429 : : /*
430 : : * Initialize this plugin
431 : : */
432 : : static void
3034 bruce@momjian.us 433 : 717 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
434 : : bool is_init)
435 : : {
436 : 717 : PGOutputData *data = palloc0(sizeof(PGOutputData));
437 : : static bool publication_callback_registered = false;
438 : :
439 : : /* Create our memory context for private allocations. */
3152 peter_e@gmx.net 440 : 717 : data->context = AllocSetContextCreate(ctx->context,
441 : : "logical replication output context",
442 : : ALLOCSET_DEFAULT_SIZES);
443 : :
1292 akapila@postgresql.o 444 : 717 : data->cachectx = AllocSetContextCreate(ctx->context,
445 : : "logical replication cache context",
446 : : ALLOCSET_DEFAULT_SIZES);
447 : :
271 michael@paquier.xyz 448 : 717 : data->pubctx = AllocSetContextCreate(ctx->context,
449 : : "logical replication publication list context",
450 : : ALLOCSET_SMALL_SIZES);
451 : :
3152 peter_e@gmx.net 452 : 717 : ctx->output_plugin_private = data;
453 : :
454 : : /* This plugin uses binary protocol. */
455 : 717 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
456 : :
457 : : /*
458 : : * This is replication start and not slot initialization.
459 : : *
460 : : * Parse and validate options passed by the client.
461 : : */
462 [ + + ]: 717 : if (!is_init)
463 : : {
464 : : /* Parse the params and ERROR if we see any we don't recognize */
1614 akapila@postgresql.o 465 : 396 : parse_output_parameters(ctx->output_plugin_options, data);
466 : :
467 : : /* Check if we support requested protocol */
1806 468 [ - + ]: 396 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
3152 peter_e@gmx.net 469 [ # # ]:UBC 0 : ereport(ERROR,
470 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
471 : : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
472 : : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
473 : :
3152 peter_e@gmx.net 474 [ - + ]:CBC 396 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
3152 peter_e@gmx.net 475 [ # # ]:UBC 0 : ereport(ERROR,
476 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
477 : : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
478 : : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
479 : :
480 : : /*
481 : : * Decide whether to enable streaming. It is disabled by default, in
482 : : * which case we just update the flag in decoding context. Otherwise
483 : : * we only allow it with sufficient version of the protocol, and when
484 : : * the output plugin supports it.
485 : : */
971 akapila@postgresql.o 486 [ + + ]:CBC 396 : if (data->streaming == LOGICALREP_STREAM_OFF)
1829 487 : 18 : ctx->streaming = false;
971 488 [ + + ]: 378 : else if (data->streaming == LOGICALREP_STREAM_ON &&
489 [ - + ]: 27 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
1829 akapila@postgresql.o 490 [ # # ]:UBC 0 : ereport(ERROR,
491 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
492 : : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
493 : : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
971 akapila@postgresql.o 494 [ + + ]:CBC 378 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
495 [ - + ]: 351 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
971 akapila@postgresql.o 496 [ # # ]:UBC 0 : ereport(ERROR,
497 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 : : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
499 : : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
1829 akapila@postgresql.o 500 [ - + ]:CBC 378 : else if (!ctx->streaming)
1829 akapila@postgresql.o 501 [ # # ]:UBC 0 : ereport(ERROR,
502 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 : : errmsg("streaming requested, but not supported by output plugin")));
504 : :
505 : : /*
506 : : * Here, we just check whether the two-phase option is passed by
507 : : * plugin and decide whether to enable it at later point of time. It
508 : : * remains enabled if the previous start-up has done so. But we only
509 : : * allow the option to be passed in with sufficient version of the
510 : : * protocol, and when the output plugin supports it.
511 : : */
1515 akapila@postgresql.o 512 [ + + ]:CBC 396 : if (!data->two_phase)
513 : 388 : ctx->twophase_opt_given = false;
514 [ - + ]: 8 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
1515 akapila@postgresql.o 515 [ # # ]:UBC 0 : ereport(ERROR,
516 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 : : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
518 : : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
1515 akapila@postgresql.o 519 [ - + ]:CBC 8 : else if (!ctx->twophase)
1515 akapila@postgresql.o 520 [ # # ]:UBC 0 : ereport(ERROR,
521 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
522 : : errmsg("two-phase commit requested, but not supported by output plugin")));
523 : : else
1515 akapila@postgresql.o 524 :CBC 8 : ctx->twophase_opt_given = true;
525 : :
526 : : /* Init publication state. */
3152 peter_e@gmx.net 527 : 396 : data->publications = NIL;
528 : 396 : publications_valid = false;
529 : :
530 : : /*
531 : : * Register callback for pg_publication if we didn't already do that
532 : : * during some previous call in this process.
533 : : */
926 tgl@sss.pgh.pa.us 534 [ + + ]: 396 : if (!publication_callback_registered)
535 : : {
536 : 394 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
537 : : publication_invalidation_cb,
538 : : (Datum) 0);
177 akapila@postgresql.o 539 : 394 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
540 : : (Datum) 0);
926 tgl@sss.pgh.pa.us 541 : 394 : publication_callback_registered = true;
542 : : }
543 : :
544 : : /* Initialize relation schema cache. */
3152 peter_e@gmx.net 545 : 396 : init_rel_sync_cache(CacheMemoryContext);
546 : : }
547 : : else
548 : : {
549 : : /*
550 : : * Disable the streaming and prepared transactions during the slot
551 : : * initialization mode.
552 : : */
1829 akapila@postgresql.o 553 : 321 : ctx->streaming = false;
1515 554 : 321 : ctx->twophase = false;
555 : : }
3152 peter_e@gmx.net 556 : 717 : }
557 : :
558 : : /*
559 : : * BEGIN callback.
560 : : *
561 : : * Don't send the BEGIN message here instead postpone it until the first
562 : : * change. In logical replication, a common scenario is to replicate a set of
563 : : * tables (instead of all tables) and transactions whose changes were on
564 : : * the table(s) that are not published will produce empty transactions. These
565 : : * empty transactions will send BEGIN and COMMIT messages to subscribers,
566 : : * using bandwidth on something with little/no use for logical replication.
567 : : */
568 : : static void
1213 tgl@sss.pgh.pa.us 569 : 890 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
570 : : {
571 : 890 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
572 : : sizeof(PGOutputTxnData));
573 : :
1256 akapila@postgresql.o 574 : 890 : txn->output_plugin_private = txndata;
575 : 890 : }
576 : :
577 : : /*
578 : : * Send BEGIN.
579 : : *
580 : : * This is called while processing the first change of the transaction.
581 : : */
582 : : static void
583 : 447 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
584 : : {
3034 bruce@momjian.us 585 : 447 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1256 akapila@postgresql.o 586 : 447 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
587 : :
588 [ - + ]: 447 : Assert(txndata);
589 [ - + ]: 447 : Assert(!txndata->sent_begin_txn);
590 : :
3152 peter_e@gmx.net 591 : 447 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
592 : 447 : logicalrep_write_begin(ctx->out, txn);
1256 akapila@postgresql.o 593 : 447 : txndata->sent_begin_txn = true;
594 : :
1515 595 : 447 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
596 : : send_replication_origin);
597 : :
3152 peter_e@gmx.net 598 : 447 : OutputPluginWrite(ctx, true);
599 : 446 : }
600 : :
601 : : /*
602 : : * COMMIT callback
603 : : */
604 : : static void
605 : 887 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
606 : : XLogRecPtr commit_lsn)
607 : : {
1256 akapila@postgresql.o 608 : 887 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
609 : : bool sent_begin_txn;
610 : :
611 [ - + ]: 887 : Assert(txndata);
612 : :
613 : : /*
614 : : * We don't need to send the commit message unless some relevant change
615 : : * from this transaction has been sent to the downstream.
616 : : */
617 : 887 : sent_begin_txn = txndata->sent_begin_txn;
941 618 : 887 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
1256 619 : 887 : pfree(txndata);
620 : 887 : txn->output_plugin_private = NULL;
621 : :
622 [ + + ]: 887 : if (!sent_begin_txn)
623 : : {
624 [ + + ]: 442 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
625 : 442 : return;
626 : : }
627 : :
3152 peter_e@gmx.net 628 : 445 : OutputPluginPrepareWrite(ctx, true);
629 : 445 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
630 : 445 : OutputPluginWrite(ctx, true);
631 : : }
632 : :
633 : : /*
634 : : * BEGIN PREPARE callback
635 : : */
636 : : static void
1515 akapila@postgresql.o 637 : 17 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
638 : : {
639 : 17 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
640 : :
641 : 17 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
642 : 17 : logicalrep_write_begin_prepare(ctx->out, txn);
643 : :
644 : 17 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
645 : : send_replication_origin);
646 : :
647 : 17 : OutputPluginWrite(ctx, true);
648 : 17 : }
649 : :
650 : : /*
651 : : * PREPARE callback
652 : : */
653 : : static void
654 : 17 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
655 : : XLogRecPtr prepare_lsn)
656 : : {
941 657 : 17 : OutputPluginUpdateProgress(ctx, false);
658 : :
1515 659 : 17 : OutputPluginPrepareWrite(ctx, true);
660 : 17 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
661 : 17 : OutputPluginWrite(ctx, true);
662 : 17 : }
663 : :
664 : : /*
665 : : * COMMIT PREPARED callback
666 : : */
667 : : static void
668 : 22 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
669 : : XLogRecPtr commit_lsn)
670 : : {
941 671 : 22 : OutputPluginUpdateProgress(ctx, false);
672 : :
1515 673 : 22 : OutputPluginPrepareWrite(ctx, true);
674 : 22 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
675 : 22 : OutputPluginWrite(ctx, true);
676 : 22 : }
677 : :
678 : : /*
679 : : * ROLLBACK PREPARED callback
680 : : */
681 : : static void
682 : 6 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
683 : : ReorderBufferTXN *txn,
684 : : XLogRecPtr prepare_end_lsn,
685 : : TimestampTz prepare_time)
686 : : {
941 687 : 6 : OutputPluginUpdateProgress(ctx, false);
688 : :
1515 689 : 6 : OutputPluginPrepareWrite(ctx, true);
690 : 6 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
691 : : prepare_time);
692 : 6 : OutputPluginWrite(ctx, true);
693 : 6 : }
694 : :
695 : : /*
696 : : * Write the current schema of the relation and its ancestor (if any) if not
697 : : * done yet.
698 : : */
699 : : static void
2709 peter_e@gmx.net 700 : 182231 : maybe_send_schema(LogicalDecodingContext *ctx,
701 : : ReorderBufferChange *change,
702 : : Relation relation, RelationSyncEntry *relentry)
703 : : {
709 michael@paquier.xyz 704 : 182231 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
705 : : bool schema_sent;
1829 akapila@postgresql.o 706 : 182231 : TransactionId xid = InvalidTransactionId;
707 : 182231 : TransactionId topxid = InvalidTransactionId;
708 : :
709 : : /*
710 : : * Remember XID of the (sub)transaction for the change. We don't care if
711 : : * it's top-level transaction or not (we have already sent that XID in
712 : : * start of the current streaming block).
713 : : *
714 : : * If we're not in a streaming block, just use InvalidTransactionId and
715 : : * the write methods will not include it.
716 : : */
709 michael@paquier.xyz 717 [ + + ]: 182231 : if (data->in_streaming)
1829 akapila@postgresql.o 718 : 175921 : xid = change->txn->xid;
719 : :
904 720 [ + + ]: 182231 : if (rbtxn_is_subtxn(change->txn))
721 [ + - ]: 10169 : topxid = rbtxn_get_toptxn(change->txn)->xid;
722 : : else
1829 723 : 172062 : topxid = xid;
724 : :
725 : : /*
726 : : * Do we need to send the schema? We do track streamed transactions
727 : : * separately, because those may be applied later (and the regular
728 : : * transactions won't see their effects until then) and in an order that
729 : : * we don't know at this point.
730 : : *
731 : : * XXX There is a scope of optimization here. Currently, we always send
732 : : * the schema first time in a streaming transaction but we can probably
733 : : * avoid that by checking 'relentry->schema_sent' flag. However, before
734 : : * doing that we need to study its impact on the case where we have a mix
735 : : * of streaming and non-streaming transactions.
736 : : */
709 michael@paquier.xyz 737 [ + + ]: 182231 : if (data->in_streaming)
1829 akapila@postgresql.o 738 : 175921 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
739 : : else
740 : 6310 : schema_sent = relentry->schema_sent;
741 : :
742 : : /* Nothing to do if we already sent the schema. */
743 [ + + ]: 182231 : if (schema_sent)
1977 peter@eisentraut.org 744 : 181898 : return;
745 : :
746 : : /*
747 : : * Send the schema. If the changes will be published using an ancestor's
748 : : * schema, not the relation's own, send that ancestor's schema before
749 : : * sending relation's own (XXX - maybe sending only the former suffices?).
750 : : */
751 [ + + ]: 333 : if (relentry->publish_as_relid != RelationGetRelid(relation))
752 : : {
753 : 32 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
754 : :
303 akapila@postgresql.o 755 : 32 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
1977 peter@eisentraut.org 756 : 32 : RelationClose(ancestor);
757 : : }
758 : :
303 akapila@postgresql.o 759 : 333 : send_relation_and_attrs(relation, xid, ctx, relentry);
760 : :
709 michael@paquier.xyz 761 [ + + ]: 333 : if (data->in_streaming)
1829 akapila@postgresql.o 762 : 68 : set_schema_sent_in_streamed_txn(relentry, topxid);
763 : : else
764 : 265 : relentry->schema_sent = true;
765 : : }
766 : :
767 : : /*
768 : : * Sends a relation
769 : : */
770 : : static void
771 : 365 : send_relation_and_attrs(Relation relation, TransactionId xid,
772 : : LogicalDecodingContext *ctx,
773 : : RelationSyncEntry *relentry)
774 : : {
1977 peter@eisentraut.org 775 : 365 : TupleDesc desc = RelationGetDescr(relation);
303 akapila@postgresql.o 776 : 365 : Bitmapset *columns = relentry->columns;
226 777 : 365 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
778 : : int i;
779 : :
780 : : /*
781 : : * Write out type info if needed. We do that only for user-created types.
782 : : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
783 : : * objects with hand-assigned OIDs to be "built in", not for instance any
784 : : * function or type defined in the information_schema. This is important
785 : : * because only hand-assigned OIDs can be expected to remain stable across
786 : : * major versions.
787 : : */
1977 peter@eisentraut.org 788 [ + + ]: 1129 : for (i = 0; i < desc->natts; i++)
789 : : {
790 : 764 : Form_pg_attribute att = TupleDescAttr(desc, i);
791 : :
226 akapila@postgresql.o 792 [ + + ]: 764 : if (!logicalrep_should_publish_column(att, columns,
793 : : include_gencols_type))
1977 peter@eisentraut.org 794 : 71 : continue;
795 : :
796 [ + + ]: 693 : if (att->atttypid < FirstGenbkiObjectId)
797 : 675 : continue;
798 : :
2709 peter_e@gmx.net 799 : 18 : OutputPluginPrepareWrite(ctx, false);
1829 akapila@postgresql.o 800 : 18 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
2709 peter_e@gmx.net 801 : 18 : OutputPluginWrite(ctx, false);
802 : : }
803 : :
1977 peter@eisentraut.org 804 : 365 : OutputPluginPrepareWrite(ctx, false);
226 akapila@postgresql.o 805 : 365 : logicalrep_write_rel(ctx->out, xid, relation, columns,
806 : : include_gencols_type);
1977 peter@eisentraut.org 807 : 365 : OutputPluginWrite(ctx, false);
2709 peter_e@gmx.net 808 : 365 : }
809 : :
810 : : /*
811 : : * Executor state preparation for evaluation of row filter expressions for the
812 : : * specified relation.
813 : : */
814 : : static EState *
1292 akapila@postgresql.o 815 : 17 : create_estate_for_relation(Relation rel)
816 : : {
817 : : EState *estate;
818 : : RangeTblEntry *rte;
915 tgl@sss.pgh.pa.us 819 : 17 : List *perminfos = NIL;
820 : :
1292 akapila@postgresql.o 821 : 17 : estate = CreateExecutorState();
822 : :
823 : 17 : rte = makeNode(RangeTblEntry);
824 : 17 : rte->rtekind = RTE_RELATION;
825 : 17 : rte->relid = RelationGetRelid(rel);
826 : 17 : rte->relkind = rel->rd_rel->relkind;
827 : 17 : rte->rellockmode = AccessShareLock;
828 : :
915 tgl@sss.pgh.pa.us 829 : 17 : addRTEPermissionInfo(&perminfos, rte);
830 : :
211 amitlan@postgresql.o 831 : 17 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
832 : : bms_make_singleton(1));
833 : :
1292 akapila@postgresql.o 834 : 17 : estate->es_output_cid = GetCurrentCommandId(false);
835 : :
836 : 17 : return estate;
837 : : }
838 : :
839 : : /*
840 : : * Evaluates row filter.
841 : : *
842 : : * If the row filter evaluates to NULL, it is taken as false i.e. the change
843 : : * isn't replicated.
844 : : */
845 : : static bool
846 : 38 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
847 : : {
848 : : Datum ret;
849 : : bool isnull;
850 : :
851 [ - + ]: 38 : Assert(state != NULL);
852 : :
853 : 38 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
854 : :
855 [ - + - - : 38 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
- - - - ]
856 : : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
857 : : isnull ? "true" : "false");
858 : :
859 [ + + ]: 38 : if (isnull)
860 : 1 : return false;
861 : :
862 : 37 : return DatumGetBool(ret);
863 : : }
864 : :
865 : : /*
866 : : * Make sure the per-entry memory context exists.
867 : : */
868 : : static void
1260 tomas.vondra@postgre 869 : 316 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
870 : : {
871 : : Relation relation;
872 : :
873 : : /* The context may already exist, in which case bail out. */
874 [ + + ]: 316 : if (entry->entry_cxt)
875 : 17 : return;
876 : :
877 : 299 : relation = RelationIdGetRelation(entry->publish_as_relid);
878 : :
879 : 299 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
880 : : "entry private context",
881 : : ALLOCSET_SMALL_SIZES);
882 : :
883 : 299 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
884 : : RelationGetRelationName(relation));
885 : : }
886 : :
887 : : /*
888 : : * Initialize the row filter.
889 : : */
890 : : static void
1292 akapila@postgresql.o 891 : 299 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
892 : : RelationSyncEntry *entry)
893 : : {
894 : : ListCell *lc;
895 : 299 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
896 : 299 : bool no_filter[] = {false, false, false}; /* One per pubaction */
897 : : MemoryContext oldctx;
898 : : int idx;
899 : 299 : bool has_filter = true;
1079 900 : 299 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
901 : :
902 : : /*
903 : : * Find if there are any row filters for this relation. If there are, then
904 : : * prepare the necessary ExprState and cache it in entry->exprstate. To
905 : : * build an expression state, we need to ensure the following:
906 : : *
907 : : * All the given publication-table mappings must be checked.
908 : : *
909 : : * Multiple publications might have multiple row filters for this
910 : : * relation. Since row filter usage depends on the DML operation, there
911 : : * are multiple lists (one for each operation) to which row filters will
912 : : * be appended.
913 : : *
914 : : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
915 : : * expression" so it takes precedence.
916 : : */
1292 917 [ + - + + : 320 : foreach(lc, publications)
+ + ]
918 : : {
919 : 303 : Publication *pub = lfirst(lc);
920 : 303 : HeapTuple rftuple = NULL;
921 : 303 : Datum rfdatum = 0;
1079 922 : 303 : bool pub_no_filter = true;
923 : :
924 : : /*
925 : : * If the publication is FOR ALL TABLES, or the publication includes a
926 : : * FOR TABLES IN SCHEMA where the table belongs to the referred
927 : : * schema, then it is treated the same as if there are no row filters
928 : : * (even if other publications have a row filter).
929 : : */
930 [ + + ]: 303 : if (!pub->alltables &&
931 [ + + ]: 224 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
932 : : ObjectIdGetDatum(schemaid),
933 : : ObjectIdGetDatum(pub->oid)))
934 : : {
935 : : /*
936 : : * Check for the presence of a row filter in this publication.
937 : : */
1292 938 : 217 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
939 : : ObjectIdGetDatum(entry->publish_as_relid),
940 : : ObjectIdGetDatum(pub->oid));
941 : :
942 [ + + ]: 217 : if (HeapTupleIsValid(rftuple))
943 : : {
944 : : /* Null indicates no filter. */
945 : 205 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
946 : : Anum_pg_publication_rel_prqual,
947 : : &pub_no_filter);
948 : : }
949 : : }
950 : :
951 [ + + ]: 303 : if (pub_no_filter)
952 : : {
953 [ + + ]: 289 : if (rftuple)
954 : 191 : ReleaseSysCache(rftuple);
955 : :
956 : 289 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
957 : 289 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
958 : 289 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
959 : :
960 : : /*
961 : : * Quick exit if all the DML actions are publicized via this
962 : : * publication.
963 : : */
964 [ + - ]: 289 : if (no_filter[PUBACTION_INSERT] &&
965 [ + + ]: 289 : no_filter[PUBACTION_UPDATE] &&
966 [ + - ]: 282 : no_filter[PUBACTION_DELETE])
967 : : {
968 : 282 : has_filter = false;
969 : 282 : break;
970 : : }
971 : :
972 : : /* No additional work for this publication. Next one. */
973 : 7 : continue;
974 : : }
975 : :
976 : : /* Form the per pubaction row filter lists. */
977 [ + - + - ]: 14 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
978 : 14 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
979 : 14 : TextDatumGetCString(rfdatum));
980 [ + - + - ]: 14 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
981 : 14 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
982 : 14 : TextDatumGetCString(rfdatum));
983 [ + - + - ]: 14 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
984 : 14 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
985 : 14 : TextDatumGetCString(rfdatum));
986 : :
987 : 14 : ReleaseSysCache(rftuple);
988 : : } /* loop all subscribed publications */
989 : :
990 : : /* Clean the row filter */
991 [ + + ]: 1196 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
992 : : {
993 [ + + ]: 897 : if (no_filter[idx])
994 : : {
995 : 855 : list_free_deep(rfnodes[idx]);
996 : 855 : rfnodes[idx] = NIL;
997 : : }
998 : : }
999 : :
1000 [ + + ]: 299 : if (has_filter)
1001 : : {
1002 : 17 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1003 : :
1260 tomas.vondra@postgre 1004 : 17 : pgoutput_ensure_entry_cxt(data, entry);
1005 : :
1006 : : /*
1007 : : * Now all the filters for all pubactions are known. Combine them when
1008 : : * their pubactions are the same.
1009 : : */
1010 : 17 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1292 akapila@postgresql.o 1011 : 17 : entry->estate = create_estate_for_relation(relation);
1012 [ + + ]: 68 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1013 : : {
1014 : 51 : List *filters = NIL;
1015 : : Expr *rfnode;
1016 : :
1017 [ + + ]: 51 : if (rfnodes[idx] == NIL)
1018 : 21 : continue;
1019 : :
1020 [ + - + + : 63 : foreach(lc, rfnodes[idx])
+ + ]
211 peter@eisentraut.org 1021 : 33 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1022 : :
1023 : : /* combine the row filter and cache the ExprState */
1292 akapila@postgresql.o 1024 : 30 : rfnode = make_orclause(filters);
1025 : 30 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1026 : : } /* for each pubaction */
1027 : 17 : MemoryContextSwitchTo(oldctx);
1028 : :
1029 : 17 : RelationClose(relation);
1030 : : }
1031 : 299 : }
1032 : :
1033 : : /*
1034 : : * If the table contains a generated column, check for any conflicting
1035 : : * values of 'publish_generated_columns' parameter in the publications.
1036 : : */
1037 : : static void
303 1038 : 299 : check_and_init_gencol(PGOutputData *data, List *publications,
1039 : : RelationSyncEntry *entry)
1040 : : {
1041 : 299 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1042 : 299 : TupleDesc desc = RelationGetDescr(relation);
1043 : 299 : bool gencolpresent = false;
1044 : 299 : bool first = true;
1045 : :
1046 : : /* Check if there is any generated column present. */
1047 [ + + ]: 913 : for (int i = 0; i < desc->natts; i++)
1048 : : {
1049 : 621 : Form_pg_attribute att = TupleDescAttr(desc, i);
1050 : :
1051 [ + + ]: 621 : if (att->attgenerated)
1052 : : {
1053 : 7 : gencolpresent = true;
1054 : 7 : break;
1055 : : }
1056 : : }
1057 : :
1058 : : /* There are no generated columns to be published. */
1059 [ + + ]: 299 : if (!gencolpresent)
1060 : : {
226 1061 : 292 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
303 1062 : 292 : return;
1063 : : }
1064 : :
1065 : : /*
1066 : : * There may be a conflicting value for 'publish_generated_columns'
1067 : : * parameter in the publications.
1068 : : */
1069 [ + - + + : 22 : foreach_ptr(Publication, pub, publications)
+ + ]
1070 : : {
1071 : : /*
1072 : : * The column list takes precedence over the
1073 : : * 'publish_generated_columns' parameter. Those will be checked later,
1074 : : * see pgoutput_column_list_init.
1075 : : */
1076 [ + + ]: 8 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1077 : 3 : continue;
1078 : :
1079 [ + - ]: 5 : if (first)
1080 : : {
226 1081 : 5 : entry->include_gencols_type = pub->pubgencols_type;
303 1082 : 5 : first = false;
1083 : : }
226 akapila@postgresql.o 1084 [ # # ]:UBC 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
303 1085 [ # # ]: 0 : ereport(ERROR,
1086 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1087 : : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1088 : : get_namespace_name(RelationGetNamespace(relation)),
1089 : : RelationGetRelationName(relation)));
1090 : : }
1091 : : }
1092 : :
1093 : : /*
1094 : : * Initialize the column list.
1095 : : */
1096 : : static void
1260 tomas.vondra@postgre 1097 :CBC 299 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1098 : : RelationSyncEntry *entry)
1099 : : {
1100 : : ListCell *lc;
1192 akapila@postgresql.o 1101 : 299 : bool first = true;
1102 : 299 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
303 1103 : 299 : bool found_pub_collist = false;
1104 : 299 : Bitmapset *relcols = NULL;
1105 : :
1106 : 299 : pgoutput_ensure_entry_cxt(data, entry);
1107 : :
1108 : : /*
1109 : : * Find if there are any column lists for this relation. If there are,
1110 : : * build a bitmap using the column lists.
1111 : : *
1112 : : * Multiple publications might have multiple column lists for this
1113 : : * relation.
1114 : : *
1115 : : * Note that we don't support the case where the column list is different
1116 : : * for the same table when combining publications. See comments atop
1117 : : * fetch_table_list. But one can later change the publication so we still
1118 : : * need to check all the given publication-table mappings and report an
1119 : : * error if any publications have a different column list.
1120 : : */
1260 tomas.vondra@postgre 1121 [ + - + + : 608 : foreach(lc, publications)
+ + ]
1122 : : {
1123 : 310 : Publication *pub = lfirst(lc);
1192 akapila@postgresql.o 1124 : 310 : Bitmapset *cols = NULL;
1125 : :
1126 : : /* Retrieve the bitmap of columns for a column list publication. */
303 1127 : 310 : found_pub_collist |= check_and_fetch_column_list(pub,
1128 : : entry->publish_as_relid,
1129 : : entry->entry_cxt, &cols);
1130 : :
1131 : : /*
1132 : : * For non-column list publications — e.g. TABLE (without a column
1133 : : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1134 : : * of the table (including generated columns when
1135 : : * 'publish_generated_columns' parameter is true).
1136 : : */
1137 [ + + ]: 310 : if (!cols)
1138 : : {
1139 : : /*
1140 : : * Cache the table columns for the first publication with no
1141 : : * specified column list to detect publication with a different
1142 : : * column list.
1143 : : */
1144 [ + + + + ]: 271 : if (!relcols && (list_length(publications) > 1))
1145 : : {
1146 : 9 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1147 : :
226 1148 : 9 : relcols = pub_form_cols_map(relation,
1149 : : entry->include_gencols_type);
303 1150 : 9 : MemoryContextSwitchTo(oldcxt);
1151 : : }
1152 : :
1153 : 271 : cols = relcols;
1154 : : }
1155 : :
1192 1156 [ + + ]: 310 : if (first)
1157 : : {
1158 : 299 : entry->columns = cols;
1159 : 299 : first = false;
1160 : : }
1161 [ + + ]: 11 : else if (!bms_equal(entry->columns, cols))
1162 [ + - ]: 1 : ereport(ERROR,
1163 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1164 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1165 : : get_namespace_name(RelationGetNamespace(relation)),
1166 : : RelationGetRelationName(relation)));
1167 : : } /* loop all subscribed publications */
1168 : :
1169 : : /*
1170 : : * If no column list publications exist, columns to be published will be
1171 : : * computed later according to the 'publish_generated_columns' parameter.
1172 : : */
303 1173 [ + + ]: 298 : if (!found_pub_collist)
1174 : 262 : entry->columns = NULL;
1175 : :
1192 1176 : 298 : RelationClose(relation);
1260 tomas.vondra@postgre 1177 : 298 : }
1178 : :
1179 : : /*
1180 : : * Initialize the slot for storing new and old tuples, and build the map that
1181 : : * will be used to convert the relation's tuples into the ancestor's format.
1182 : : */
1183 : : static void
1292 akapila@postgresql.o 1184 : 299 : init_tuple_slot(PGOutputData *data, Relation relation,
1185 : : RelationSyncEntry *entry)
1186 : : {
1187 : : MemoryContext oldctx;
1188 : : TupleDesc oldtupdesc;
1189 : : TupleDesc newtupdesc;
1190 : :
1191 : 299 : oldctx = MemoryContextSwitchTo(data->cachectx);
1192 : :
1193 : : /*
1194 : : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1195 : : * live as long as the cache remains.
1196 : : */
649 1197 : 299 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1198 : 299 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1199 : :
1292 1200 : 299 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1201 : 299 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1202 : :
1203 : 299 : MemoryContextSwitchTo(oldctx);
1204 : :
1205 : : /*
1206 : : * Cache the map that will be used to convert the relation's tuples into
1207 : : * the ancestor's format, if needed.
1208 : : */
1209 [ + + ]: 299 : if (entry->publish_as_relid != RelationGetRelid(relation))
1210 : : {
1211 : 37 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1212 : 37 : TupleDesc indesc = RelationGetDescr(relation);
1213 : 37 : TupleDesc outdesc = RelationGetDescr(ancestor);
1214 : :
1215 : : /* Map must live as long as the logical decoding context. */
250 michael@paquier.xyz 1216 : 37 : oldctx = MemoryContextSwitchTo(data->cachectx);
1217 : :
1012 alvherre@alvh.no-ip. 1218 : 37 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1219 : :
1292 akapila@postgresql.o 1220 : 37 : MemoryContextSwitchTo(oldctx);
1221 : 37 : RelationClose(ancestor);
1222 : : }
1223 : 299 : }
1224 : :
1225 : : /*
1226 : : * Change is checked against the row filter if any.
1227 : : *
1228 : : * Returns true if the change is to be replicated, else false.
1229 : : *
1230 : : * For inserts, evaluate the row filter for new tuple.
1231 : : * For deletes, evaluate the row filter for old tuple.
1232 : : * For updates, evaluate the row filter for old and new tuple.
1233 : : *
1234 : : * For updates, if both evaluations are true, we allow sending the UPDATE and
1235 : : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1236 : : * only one of the tuples matches the row filter expression, we transform
1237 : : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1238 : : * following rules:
1239 : : *
1240 : : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1241 : : * Case 2: old-row (no match) new row (match) -> INSERT
1242 : : * Case 3: old-row (match) new-row (no match) -> DELETE
1243 : : * Case 4: old-row (match) new row (match) -> UPDATE
1244 : : *
1245 : : * The new action is updated in the action parameter.
1246 : : *
1247 : : * The new slot could be updated when transforming the UPDATE into INSERT,
1248 : : * because the original new tuple might not have column values from the replica
1249 : : * identity.
1250 : : *
1251 : : * Examples:
1252 : : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1253 : : * Since the old tuple satisfies, the initial table synchronization copied this
1254 : : * row (or another method was used to guarantee that there is data
1255 : : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1256 : : * row filter, so from a data consistency perspective, that row should be
1257 : : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1258 : : * statement and be sent to the subscriber. Keeping this row on the subscriber
1259 : : * is undesirable because it doesn't reflect what was defined in the row filter
1260 : : * expression on the publisher. This row on the subscriber would likely not be
1261 : : * modified by replication again. If someone inserted a new row with the same
1262 : : * old identifier, replication could stop due to a constraint violation.
1263 : : *
1264 : : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1265 : : * Since the old tuple doesn't satisfy, the initial table synchronization
1266 : : * probably didn't copy this row. However, after the UPDATE the new tuple does
1267 : : * satisfy the row filter, so from a data consistency perspective, that row
1268 : : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1269 : : * statements have no effect (it matches no row -- see
1270 : : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1271 : : * INSERT statement and be sent to the subscriber. However, this might surprise
1272 : : * someone who expects the data set to satisfy the row filter expression on the
1273 : : * provider.
1274 : : */
1275 : : static bool
1276 : 182228 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1277 : : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1278 : : ReorderBufferChangeType *action)
1279 : : {
1280 : : TupleDesc desc;
1281 : : int i;
1282 : : bool old_matched,
1283 : : new_matched,
1284 : : result;
1285 : : TupleTableSlot *tmp_new_slot;
1286 : 182228 : TupleTableSlot *new_slot = *new_slot_ptr;
1287 : : ExprContext *ecxt;
1288 : : ExprState *filter_exprstate;
1289 : :
1290 : : /*
1291 : : * We need this map to avoid relying on ReorderBufferChangeType enums
1292 : : * having specific values.
1293 : : */
1294 : : static const int map_changetype_pubaction[] = {
1295 : : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1296 : : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1297 : : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1298 : : };
1299 : :
1300 [ + + + + : 182228 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
- + ]
1301 : : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1302 : : *action == REORDER_BUFFER_CHANGE_DELETE);
1303 : :
1304 [ + + - + ]: 182228 : Assert(new_slot || old_slot);
1305 : :
1306 : : /* Get the corresponding row filter */
1307 : 182228 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1308 : :
1309 : : /* Bail out if there is no row filter */
1310 [ + + ]: 182228 : if (!filter_exprstate)
1311 : 182194 : return true;
1312 : :
1313 [ - + ]: 34 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1314 : : get_namespace_name(RelationGetNamespace(relation)),
1315 : : RelationGetRelationName(relation));
1316 : :
1317 [ + + ]: 34 : ResetPerTupleExprContext(entry->estate);
1318 : :
1319 [ + + ]: 34 : ecxt = GetPerTupleExprContext(entry->estate);
1320 : :
1321 : : /*
1322 : : * For the following occasions where there is only one tuple, we can
1323 : : * evaluate the row filter for that tuple and return.
1324 : : *
1325 : : * For inserts, we only have the new tuple.
1326 : : *
1327 : : * For updates, we can have only a new tuple when none of the replica
1328 : : * identity columns changed and none of those columns have external data
1329 : : * but we still need to evaluate the row filter for the new tuple as the
1330 : : * existing values of those columns might not match the filter. Also,
1331 : : * users can use constant expressions in the row filter, so we anyway need
1332 : : * to evaluate it for the new tuple.
1333 : : *
1334 : : * For deletes, we only have the old tuple.
1335 : : */
1336 [ + + + + ]: 34 : if (!new_slot || !old_slot)
1337 : : {
1338 [ + + ]: 30 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1339 : 30 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1340 : :
1341 : 30 : return result;
1342 : : }
1343 : :
1344 : : /*
1345 : : * Both the old and new tuples must be valid only for updates and need to
1346 : : * be checked against the row filter.
1347 : : */
1348 [ - + ]: 4 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1349 : :
1350 : 4 : slot_getallattrs(new_slot);
1351 : 4 : slot_getallattrs(old_slot);
1352 : :
1353 : 4 : tmp_new_slot = NULL;
1354 : 4 : desc = RelationGetDescr(relation);
1355 : :
1356 : : /*
1357 : : * The new tuple might not have all the replica identity columns, in which
1358 : : * case it needs to be copied over from the old tuple.
1359 : : */
1360 [ + + ]: 12 : for (i = 0; i < desc->natts; i++)
1361 : : {
260 drowley@postgresql.o 1362 : 8 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1363 : :
1364 : : /*
1365 : : * if the column in the new tuple or old tuple is null, nothing to do
1366 : : */
1292 akapila@postgresql.o 1367 [ + + - + ]: 8 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1368 : 1 : continue;
1369 : :
1370 : : /*
1371 : : * Unchanged toasted replica identity columns are only logged in the
1372 : : * old tuple. Copy this over to the new tuple. The changed (or WAL
1373 : : * Logged) toast values are always assembled in memory and set as
1374 : : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1375 : : */
1376 [ + + + + ]: 11 : if (att->attlen == -1 &&
32 peter@eisentraut.org 1377 :GNC 4 : VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1378 [ + - ]: 1 : !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1379 : : {
1292 akapila@postgresql.o 1380 [ + - ]:CBC 1 : if (!tmp_new_slot)
1381 : : {
1382 : 1 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1383 : 1 : ExecClearTuple(tmp_new_slot);
1384 : :
1385 : 1 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1386 : 1 : desc->natts * sizeof(Datum));
1387 : 1 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1388 : 1 : desc->natts * sizeof(bool));
1389 : : }
1390 : :
1391 : 1 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1392 : 1 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1393 : : }
1394 : : }
1395 : :
1396 : 4 : ecxt->ecxt_scantuple = old_slot;
1397 : 4 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1398 : :
1399 [ + + ]: 4 : if (tmp_new_slot)
1400 : : {
1401 : 1 : ExecStoreVirtualTuple(tmp_new_slot);
1402 : 1 : ecxt->ecxt_scantuple = tmp_new_slot;
1403 : : }
1404 : : else
1405 : 3 : ecxt->ecxt_scantuple = new_slot;
1406 : :
1407 : 4 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1408 : :
1409 : : /*
1410 : : * Case 1: if both tuples don't match the row filter, bailout. Send
1411 : : * nothing.
1412 : : */
1413 [ + + - + ]: 4 : if (!old_matched && !new_matched)
1292 akapila@postgresql.o 1414 :UBC 0 : return false;
1415 : :
1416 : : /*
1417 : : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1418 : : * tuple does, transform the UPDATE into INSERT.
1419 : : *
1420 : : * Use the newly transformed tuple that must contain the column values for
1421 : : * all the replica identity columns. This is required to ensure that the
1422 : : * while inserting the tuple in the downstream node, we have all the
1423 : : * required column values.
1424 : : */
1292 akapila@postgresql.o 1425 [ + + + - ]:CBC 4 : if (!old_matched && new_matched)
1426 : : {
1427 : 2 : *action = REORDER_BUFFER_CHANGE_INSERT;
1428 : :
1429 [ + + ]: 2 : if (tmp_new_slot)
1430 : 1 : *new_slot_ptr = tmp_new_slot;
1431 : : }
1432 : :
1433 : : /*
1434 : : * Case 3: if the old tuple satisfies the row filter but the new tuple
1435 : : * doesn't, transform the UPDATE into DELETE.
1436 : : *
1437 : : * This transformation does not require another tuple. The Old tuple will
1438 : : * be used for DELETE.
1439 : : */
1440 [ + - + + ]: 2 : else if (old_matched && !new_matched)
1441 : 1 : *action = REORDER_BUFFER_CHANGE_DELETE;
1442 : :
1443 : : /*
1444 : : * Case 4: if both tuples match the row filter, transformation isn't
1445 : : * required. (*action is default UPDATE).
1446 : : */
1447 : :
1448 : 4 : return true;
1449 : : }
1450 : :
1451 : : /*
1452 : : * Sends the decoded DML over wire.
1453 : : *
1454 : : * This is called both in streaming and non-streaming modes.
1455 : : */
1456 : : static void
3152 peter_e@gmx.net 1457 : 186394 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1458 : : Relation relation, ReorderBufferChange *change)
1459 : : {
3034 bruce@momjian.us 1460 : 186394 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1256 akapila@postgresql.o 1461 : 186394 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1462 : : MemoryContext old;
1463 : : RelationSyncEntry *relentry;
1829 1464 : 186394 : TransactionId xid = InvalidTransactionId;
1698 1465 : 186394 : Relation ancestor = NULL;
1292 1466 : 186394 : Relation targetrel = relation;
1467 : 186394 : ReorderBufferChangeType action = change->action;
1468 : 186394 : TupleTableSlot *old_slot = NULL;
1469 : 186394 : TupleTableSlot *new_slot = NULL;
1470 : :
2752 peter_e@gmx.net 1471 [ - + ]: 186394 : if (!is_publishable_relation(relation))
1472 : 4165 : return;
1473 : :
1474 : : /*
1475 : : * Remember the xid for the change in streaming mode. We need to send xid
1476 : : * with each change in the streaming mode so that subscriber can make
1477 : : * their association and on aborts, it can discard the corresponding
1478 : : * changes.
1479 : : */
709 michael@paquier.xyz 1480 [ + + ]: 186394 : if (data->in_streaming)
1829 akapila@postgresql.o 1481 : 175921 : xid = change->txn->xid;
1482 : :
1292 1483 : 186394 : relentry = get_rel_sync_entry(data, relation);
1484 : :
1485 : : /* First check the table filter */
1486 [ + + + - ]: 186393 : switch (action)
1487 : : {
3152 peter_e@gmx.net 1488 : 108979 : case REORDER_BUFFER_CHANGE_INSERT:
1489 [ + + ]: 108979 : if (!relentry->pubactions.pubinsert)
1490 : 3074 : return;
1491 : 105905 : break;
1492 : 34488 : case REORDER_BUFFER_CHANGE_UPDATE:
1493 [ + + ]: 34488 : if (!relentry->pubactions.pubupdate)
1494 : 44 : return;
1495 : 34444 : break;
1496 : 42926 : case REORDER_BUFFER_CHANGE_DELETE:
1497 [ + + ]: 42926 : if (!relentry->pubactions.pubdelete)
1498 : 1047 : return;
1499 : :
1500 : : /*
1501 : : * This is only possible if deletes are allowed even when replica
1502 : : * identity is not defined for a table. Since the DELETE action
1503 : : * can't be published, we simply return.
1504 : : */
891 akapila@postgresql.o 1505 [ - + ]: 41879 : if (!change->data.tp.oldtuple)
1506 : : {
891 akapila@postgresql.o 1507 [ # # ]:UBC 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1508 : 0 : return;
1509 : : }
3152 peter_e@gmx.net 1510 :CBC 41879 : break;
3152 peter_e@gmx.net 1511 :UBC 0 : default:
1512 : 0 : Assert(false);
1513 : : }
1514 : :
1515 : : /* Avoid leaking memory by using and resetting our own context */
3152 peter_e@gmx.net 1516 :CBC 182228 : old = MemoryContextSwitchTo(data->context);
1517 : :
1518 : : /* Switch relation if publishing via root. */
891 akapila@postgresql.o 1519 [ + + ]: 182228 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1520 : : {
1521 [ - + ]: 70 : Assert(relation->rd_rel->relispartition);
1522 : 70 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1523 : 70 : targetrel = ancestor;
1524 : : }
1525 : :
1526 [ + + ]: 182228 : if (change->data.tp.oldtuple)
1527 : : {
1528 : 42016 : old_slot = relentry->old_slot;
586 msawada@postgresql.o 1529 : 42016 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1530 : :
1531 : : /* Convert tuple if needed. */
891 akapila@postgresql.o 1532 [ + + ]: 42016 : if (relentry->attrmap)
1533 : : {
1534 : 5 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1535 : : &TTSOpsVirtual);
1536 : :
1537 : 5 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1538 : : }
1539 : : }
1540 : :
1541 [ + + ]: 182228 : if (change->data.tp.newtuple)
1542 : : {
1543 : 140349 : new_slot = relentry->new_slot;
586 msawada@postgresql.o 1544 : 140349 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1545 : :
1546 : : /* Convert tuple if needed. */
891 akapila@postgresql.o 1547 [ + + ]: 140349 : if (relentry->attrmap)
1548 : : {
1549 : 20 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1550 : : &TTSOpsVirtual);
1551 : :
1552 : 20 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1553 : : }
1554 : : }
1555 : :
1556 : : /*
1557 : : * Check row filter.
1558 : : *
1559 : : * Updates could be transformed to inserts or deletes based on the results
1560 : : * of the row filter for old and new tuple.
1561 : : */
1562 [ + + ]: 182228 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1563 : 12 : goto cleanup;
1564 : :
1565 : : /*
1566 : : * Send BEGIN if we haven't yet.
1567 : : *
1568 : : * We send the BEGIN message after ensuring that we will actually send the
1569 : : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1570 : : * transactions.
1571 : : */
1572 [ + + + + ]: 182216 : if (txndata && !txndata->sent_begin_txn)
1573 : 435 : pgoutput_send_begin(ctx, txn);
1574 : :
1575 : : /*
1576 : : * Schema should be sent using the original relation because it also sends
1577 : : * the ancestor's relation.
1578 : : */
1579 : 182215 : maybe_send_schema(ctx, change, relation, relentry);
1580 : :
1581 : 182215 : OutputPluginPrepareWrite(ctx, true);
1582 : :
1583 : : /* Send the data */
1584 [ + + + - ]: 182215 : switch (action)
1585 : : {
1586 : 105894 : case REORDER_BUFFER_CHANGE_INSERT:
1587 : 105894 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
303 1588 : 105894 : data->binary, relentry->columns,
1589 : : relentry->include_gencols_type);
891 1590 : 105894 : break;
1591 : 34441 : case REORDER_BUFFER_CHANGE_UPDATE:
1592 : 34441 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
303 1593 : 34441 : new_slot, data->binary, relentry->columns,
1594 : : relentry->include_gencols_type);
1292 1595 : 34441 : break;
3152 peter_e@gmx.net 1596 : 41880 : case REORDER_BUFFER_CHANGE_DELETE:
891 akapila@postgresql.o 1597 : 41880 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
303 1598 : 41880 : data->binary, relentry->columns,
1599 : : relentry->include_gencols_type);
3152 peter_e@gmx.net 1600 : 41880 : break;
3152 peter_e@gmx.net 1601 :UBC 0 : default:
1602 : 0 : Assert(false);
1603 : : }
1604 : :
891 akapila@postgresql.o 1605 :CBC 182215 : OutputPluginWrite(ctx, true);
1606 : :
1607 : 182226 : cleanup:
1698 1608 [ + + ]: 182226 : if (RelationIsValid(ancestor))
1609 : : {
1610 : 69 : RelationClose(ancestor);
1611 : 69 : ancestor = NULL;
1612 : : }
1613 : :
1614 : : /* Drop the new slots that were used to store the converted tuples. */
436 1615 [ + + ]: 182226 : if (relentry->attrmap)
1616 : : {
1617 [ + + ]: 25 : if (old_slot)
1618 : 5 : ExecDropSingleTupleTableSlot(old_slot);
1619 : :
1620 [ + + ]: 25 : if (new_slot)
1621 : 20 : ExecDropSingleTupleTableSlot(new_slot);
1622 : : }
1623 : :
3152 peter_e@gmx.net 1624 : 182226 : MemoryContextSwitchTo(old);
1625 : 182226 : MemoryContextReset(data->context);
1626 : : }
1627 : :
1628 : : static void
2709 1629 : 14 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1630 : : int nrelations, Relation relations[], ReorderBufferChange *change)
1631 : : {
1632 : 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1256 akapila@postgresql.o 1633 : 14 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1634 : : MemoryContext old;
1635 : : RelationSyncEntry *relentry;
1636 : : int i;
1637 : : int nrelids;
1638 : : Oid *relids;
1829 1639 : 14 : TransactionId xid = InvalidTransactionId;
1640 : :
1641 : : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
709 michael@paquier.xyz 1642 [ - + ]: 14 : if (data->in_streaming)
1829 akapila@postgresql.o 1643 :UBC 0 : xid = change->txn->xid;
1644 : :
2709 peter_e@gmx.net 1645 :CBC 14 : old = MemoryContextSwitchTo(data->context);
1646 : :
1647 : 14 : relids = palloc0(nrelations * sizeof(Oid));
1648 : 14 : nrelids = 0;
1649 : :
1650 [ + + ]: 47 : for (i = 0; i < nrelations; i++)
1651 : : {
1652 : 33 : Relation relation = relations[i];
1653 : 33 : Oid relid = RelationGetRelid(relation);
1654 : :
1655 [ - + ]: 33 : if (!is_publishable_relation(relation))
2709 peter_e@gmx.net 1656 :UBC 0 : continue;
1657 : :
1292 akapila@postgresql.o 1658 :CBC 33 : relentry = get_rel_sync_entry(data, relation);
1659 : :
2709 peter_e@gmx.net 1660 [ + + ]: 33 : if (!relentry->pubactions.pubtruncate)
1661 : 14 : continue;
1662 : :
1663 : : /*
1664 : : * Don't send partitions if the publication wants to send only the
1665 : : * root tables through it.
1666 : : */
1977 peter@eisentraut.org 1667 [ + + ]: 19 : if (relation->rd_rel->relispartition &&
1668 [ + + ]: 15 : relentry->publish_as_relid != relid)
2006 1669 : 3 : continue;
1670 : :
2709 peter_e@gmx.net 1671 : 16 : relids[nrelids++] = relid;
1672 : :
1673 : : /* Send BEGIN if we haven't yet */
1256 akapila@postgresql.o 1674 [ + - + + ]: 16 : if (txndata && !txndata->sent_begin_txn)
1675 : 10 : pgoutput_send_begin(ctx, txn);
1676 : :
1493 fujii@postgresql.org 1677 : 16 : maybe_send_schema(ctx, change, relation, relentry);
1678 : : }
1679 : :
2686 peter_e@gmx.net 1680 [ + + ]: 14 : if (nrelids > 0)
1681 : : {
1682 : 10 : OutputPluginPrepareWrite(ctx, true);
1683 : 10 : logicalrep_write_truncate(ctx->out,
1684 : : xid,
1685 : : nrelids,
1686 : : relids,
1687 : 10 : change->data.truncate.cascade,
1688 : 10 : change->data.truncate.restart_seqs);
1689 : 10 : OutputPluginWrite(ctx, true);
1690 : : }
1691 : :
2709 1692 : 14 : MemoryContextSwitchTo(old);
1693 : 14 : MemoryContextReset(data->context);
1694 : 14 : }
1695 : :
1696 : : static void
1614 akapila@postgresql.o 1697 : 7 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1698 : : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1699 : : const char *message)
1700 : : {
1701 : 7 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1702 : 7 : TransactionId xid = InvalidTransactionId;
1703 : :
1704 [ + + ]: 7 : if (!data->messages)
1705 : 2 : return;
1706 : :
1707 : : /*
1708 : : * Remember the xid for the message in streaming mode. See
1709 : : * pgoutput_change.
1710 : : */
709 michael@paquier.xyz 1711 [ - + ]: 5 : if (data->in_streaming)
1614 akapila@postgresql.o 1712 :UBC 0 : xid = txn->xid;
1713 : :
1714 : : /*
1715 : : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1716 : : */
1256 akapila@postgresql.o 1717 [ + + ]:CBC 5 : if (transactional)
1718 : : {
1719 : 2 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1720 : :
1721 : : /* Send BEGIN if we haven't yet */
1722 [ + - + - ]: 2 : if (txndata && !txndata->sent_begin_txn)
1723 : 2 : pgoutput_send_begin(ctx, txn);
1724 : : }
1725 : :
1614 1726 : 5 : OutputPluginPrepareWrite(ctx, true);
1727 : 5 : logicalrep_write_message(ctx->out,
1728 : : xid,
1729 : : message_lsn,
1730 : : transactional,
1731 : : prefix,
1732 : : sz,
1733 : : message);
1734 : 5 : OutputPluginWrite(ctx, true);
1735 : : }
1736 : :
1737 : : /*
1738 : : * Return true if the data is associated with an origin and the user has
1739 : : * requested the changes that don't have an origin, false otherwise.
1740 : : */
1741 : : static bool
3152 peter_e@gmx.net 1742 : 307524 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1743 : : RepOriginId origin_id)
1744 : : {
710 akapila@postgresql.o 1745 : 307524 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1746 : :
1747 [ + + + + ]: 307524 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1143 1748 : 125 : return true;
1749 : :
3152 peter_e@gmx.net 1750 : 307399 : return false;
1751 : : }
1752 : :
1753 : : /*
1754 : : * Shutdown the output plugin.
1755 : : *
1756 : : * Note, we don't need to clean the data->context, data->cachectx, and
1757 : : * data->pubctx as they are child contexts of the ctx->context so they
1758 : : * will be cleaned up by logical decoding machinery.
1759 : : */
1760 : : static void
3034 bruce@momjian.us 1761 : 514 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1762 : : {
3152 peter_e@gmx.net 1763 [ + + ]: 514 : if (RelationSyncCache)
1764 : : {
1765 : 193 : hash_destroy(RelationSyncCache);
1766 : 193 : RelationSyncCache = NULL;
1767 : : }
1768 : 514 : }
1769 : :
1770 : : /*
1771 : : * Load publications from the list of publication names.
1772 : : *
1773 : : * Here, we skip the publications that don't exist yet. This will allow us
1774 : : * to silently continue the replication in the absence of a missing publication.
1775 : : * This is required because we allow the users to create publications after they
1776 : : * have specified the required publications at the time of replication start.
1777 : : */
1778 : : static List *
1779 : 194 : LoadPublications(List *pubnames)
1780 : : {
1781 : 194 : List *result = NIL;
1782 : : ListCell *lc;
1783 : :
3034 bruce@momjian.us 1784 [ + - + + : 433 : foreach(lc, pubnames)
+ + ]
1785 : : {
1786 : 239 : char *pubname = (char *) lfirst(lc);
176 akapila@postgresql.o 1787 : 239 : Publication *pub = GetPublicationByName(pubname, true);
1788 : :
1789 [ + + ]: 239 : if (pub)
1790 : 237 : result = lappend(result, pub);
1791 : : else
1792 [ + - ]: 2 : ereport(WARNING,
1793 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1794 : : errmsg("skipped loading publication \"%s\"", pubname),
1795 : : errdetail("The publication does not exist at this point in the WAL."),
1796 : : errhint("Create the publication if it does not exist."));
1797 : : }
1798 : :
3152 peter_e@gmx.net 1799 : 194 : return result;
1800 : : }
1801 : :
1802 : : /*
1803 : : * Publication syscache invalidation callback.
1804 : : *
1805 : : * Called for invalidations on pg_publication.
1806 : : */
1807 : : static void
1808 : 328 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1809 : : {
1810 : 328 : publications_valid = false;
1811 : 328 : }
1812 : :
1813 : : /*
1814 : : * START STREAM callback
1815 : : */
1816 : : static void
1829 akapila@postgresql.o 1817 : 627 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1818 : : ReorderBufferTXN *txn)
1819 : : {
709 michael@paquier.xyz 1820 : 627 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1829 akapila@postgresql.o 1821 : 627 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1822 : :
1823 : : /* we can't nest streaming of transactions */
709 michael@paquier.xyz 1824 [ - + ]: 627 : Assert(!data->in_streaming);
1825 : :
1826 : : /*
1827 : : * If we already sent the first stream for this transaction then don't
1828 : : * send the origin id in the subsequent streams.
1829 : : */
1829 akapila@postgresql.o 1830 [ + + ]: 627 : if (rbtxn_is_streamed(txn))
1831 : 568 : send_replication_origin = false;
1832 : :
1833 : 627 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1834 : 627 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1835 : :
1515 1836 : 627 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1837 : : send_replication_origin);
1838 : :
1829 1839 : 627 : OutputPluginWrite(ctx, true);
1840 : :
1841 : : /* we're streaming a chunk of transaction now */
709 michael@paquier.xyz 1842 : 627 : data->in_streaming = true;
1829 akapila@postgresql.o 1843 : 627 : }
1844 : :
1845 : : /*
1846 : : * STOP STREAM callback
1847 : : */
1848 : : static void
1849 : 627 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1850 : : ReorderBufferTXN *txn)
1851 : : {
709 michael@paquier.xyz 1852 : 627 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1853 : :
1854 : : /* we should be streaming a transaction */
1855 [ - + ]: 627 : Assert(data->in_streaming);
1856 : :
1829 akapila@postgresql.o 1857 : 627 : OutputPluginPrepareWrite(ctx, true);
1858 : 627 : logicalrep_write_stream_stop(ctx->out);
1859 : 627 : OutputPluginWrite(ctx, true);
1860 : :
1861 : : /* we've stopped streaming a transaction */
709 michael@paquier.xyz 1862 : 627 : data->in_streaming = false;
1829 akapila@postgresql.o 1863 : 627 : }
1864 : :
1865 : : /*
1866 : : * Notify downstream to discard the streamed transaction (along with all
1867 : : * its subtransactions, if it's a toplevel transaction).
1868 : : */
1869 : : static void
1870 : 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1871 : : ReorderBufferTXN *txn,
1872 : : XLogRecPtr abort_lsn)
1873 : : {
1874 : : ReorderBufferTXN *toptxn;
971 1875 : 26 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1876 : 26 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1877 : :
1878 : : /*
1879 : : * The abort should happen outside streaming block, even for streamed
1880 : : * transactions. The transaction has to be marked as streamed, though.
1881 : : */
709 michael@paquier.xyz 1882 [ - + ]: 26 : Assert(!data->in_streaming);
1883 : :
1884 : : /* determine the toplevel transaction */
904 akapila@postgresql.o 1885 [ + + ]: 26 : toptxn = rbtxn_get_toptxn(txn);
1886 : :
1829 1887 [ - + ]: 26 : Assert(rbtxn_is_streamed(toptxn));
1888 : :
1889 : 26 : OutputPluginPrepareWrite(ctx, true);
971 1890 : 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1891 : : txn->xact_time.abort_time, write_abort_info);
1892 : :
1829 1893 : 26 : OutputPluginWrite(ctx, true);
1894 : :
1895 : 26 : cleanup_rel_sync_cache(toptxn->xid, false);
1896 : 26 : }
1897 : :
1898 : : /*
1899 : : * Notify downstream to apply the streamed transaction (along with all
1900 : : * its subtransactions).
1901 : : */
1902 : : static void
1903 : 45 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1904 : : ReorderBufferTXN *txn,
1905 : : XLogRecPtr commit_lsn)
1906 : : {
709 michael@paquier.xyz 1907 : 45 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1908 : :
1909 : : /*
1910 : : * The commit should happen outside streaming block, even for streamed
1911 : : * transactions. The transaction has to be marked as streamed, though.
1912 : : */
1913 [ - + ]: 45 : Assert(!data->in_streaming);
1829 akapila@postgresql.o 1914 [ - + ]: 45 : Assert(rbtxn_is_streamed(txn));
1915 : :
941 1916 : 45 : OutputPluginUpdateProgress(ctx, false);
1917 : :
1829 1918 : 45 : OutputPluginPrepareWrite(ctx, true);
1919 : 45 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1920 : 45 : OutputPluginWrite(ctx, true);
1921 : :
1922 : 45 : cleanup_rel_sync_cache(txn->xid, true);
1923 : 45 : }
1924 : :
1925 : : /*
1926 : : * PREPARE callback (for streaming two-phase commit).
1927 : : *
1928 : : * Notify the downstream to prepare the transaction.
1929 : : */
1930 : : static void
1494 1931 : 11 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1932 : : ReorderBufferTXN *txn,
1933 : : XLogRecPtr prepare_lsn)
1934 : : {
1935 [ - + ]: 11 : Assert(rbtxn_is_streamed(txn));
1936 : :
941 1937 : 11 : OutputPluginUpdateProgress(ctx, false);
1494 1938 : 11 : OutputPluginPrepareWrite(ctx, true);
1939 : 11 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1940 : 11 : OutputPluginWrite(ctx, true);
1941 : 11 : }
1942 : :
1943 : : /*
1944 : : * Initialize the relation schema sync cache for a decoding session.
1945 : : *
1946 : : * The hash table is destroyed at the end of a decoding session. While
1947 : : * relcache invalidations still exist and will still be invoked, they
1948 : : * will just see the null hash table global and take no action.
1949 : : */
1950 : : static void
3152 peter_e@gmx.net 1951 : 396 : init_rel_sync_cache(MemoryContext cachectx)
1952 : : {
1953 : : HASHCTL ctl;
1954 : : static bool relation_callbacks_registered = false;
1955 : :
1956 : : /* Nothing to do if hash table already exists */
1957 [ - + ]: 396 : if (RelationSyncCache != NULL)
1958 : 2 : return;
1959 : :
1960 : : /* Make a new hash table for the cache */
1961 : 396 : ctl.keysize = sizeof(Oid);
1962 : 396 : ctl.entrysize = sizeof(RelationSyncEntry);
1963 : 396 : ctl.hcxt = cachectx;
1964 : :
1965 : 396 : RelationSyncCache = hash_create("logical replication output relation cache",
1966 : : 128, &ctl,
1967 : : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1968 : :
1969 [ - + ]: 396 : Assert(RelationSyncCache != NULL);
1970 : :
1971 : : /* No more to do if we already registered callbacks */
926 tgl@sss.pgh.pa.us 1972 [ + + ]: 396 : if (relation_callbacks_registered)
1973 : 2 : return;
1974 : :
1975 : : /* We must update the cache entry for a relation after a relcache flush */
3152 peter_e@gmx.net 1976 : 394 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1977 : :
1978 : : /*
1979 : : * Flush all cache entries after a pg_namespace change, in case it was a
1980 : : * schema rename affecting a relation being replicated.
1981 : : *
1982 : : * XXX: It is not a good idea to invalidate all the relation entries in
1983 : : * RelationSyncCache on schema rename. We can optimize it to invalidate
1984 : : * only the required relations by either having a specific invalidation
1985 : : * message containing impacted relations or by having schema information
1986 : : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1987 : : * passed to the callback.
1988 : : */
974 tgl@sss.pgh.pa.us 1989 : 394 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1990 : : rel_sync_cache_publication_cb,
1991 : : (Datum) 0);
1992 : :
926 1993 : 394 : relation_callbacks_registered = true;
1994 : : }
1995 : :
1996 : : /*
1997 : : * We expect relatively small number of streamed transactions.
1998 : : */
1999 : : static bool
1829 akapila@postgresql.o 2000 : 175921 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2001 : : {
1160 alvherre@alvh.no-ip. 2002 : 175921 : return list_member_xid(entry->streamed_txns, xid);
2003 : : }
2004 : :
2005 : : /*
2006 : : * Add the xid in the rel sync entry for which we have already sent the schema
2007 : : * of the relation.
2008 : : */
2009 : : static void
1829 akapila@postgresql.o 2010 : 68 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2011 : : {
2012 : : MemoryContext oldctx;
2013 : :
2014 : 68 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2015 : :
1160 alvherre@alvh.no-ip. 2016 : 68 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2017 : :
1829 akapila@postgresql.o 2018 : 68 : MemoryContextSwitchTo(oldctx);
2019 : 68 : }
2020 : :
2021 : : /*
2022 : : * Find or create entry in the relation schema cache.
2023 : : *
2024 : : * This looks up publications that the given relation is directly or
2025 : : * indirectly part of (the latter if it's really the relation's ancestor that
2026 : : * is part of a publication) and fills up the found entry with the information
2027 : : * about which operations to publish and whether to use an ancestor's schema
2028 : : * when publishing.
2029 : : */
2030 : : static RelationSyncEntry *
1292 2031 : 186427 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2032 : : {
2033 : : RelationSyncEntry *entry;
2034 : : bool found;
2035 : : MemoryContext oldctx;
2036 : 186427 : Oid relid = RelationGetRelid(relation);
2037 : :
3152 peter_e@gmx.net 2038 [ - + ]: 186427 : Assert(RelationSyncCache != NULL);
2039 : :
2040 : : /* Find cached relation info, creating if not found */
2041 : 186427 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2042 : : &relid,
2043 : : HASH_ENTER, &found);
2044 [ - + ]: 186427 : Assert(entry != NULL);
2045 : :
2046 : : /* initialize entry, if it's new */
1816 akapila@postgresql.o 2047 [ + + ]: 186427 : if (!found)
2048 : : {
1310 2049 : 291 : entry->replicate_valid = false;
1816 2050 : 291 : entry->schema_sent = false;
226 2051 : 291 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1816 2052 : 291 : entry->streamed_txns = NIL;
2053 : 291 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1248 tomas.vondra@postgre 2054 : 291 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1292 akapila@postgresql.o 2055 : 291 : entry->new_slot = NULL;
2056 : 291 : entry->old_slot = NULL;
2057 : 291 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1260 tomas.vondra@postgre 2058 : 291 : entry->entry_cxt = NULL;
1816 akapila@postgresql.o 2059 : 291 : entry->publish_as_relid = InvalidOid;
1260 tomas.vondra@postgre 2060 : 291 : entry->columns = NULL;
1292 akapila@postgresql.o 2061 : 291 : entry->attrmap = NULL;
2062 : : }
2063 : :
2064 : : /* Validate the entry */
1816 2065 [ + + ]: 186427 : if (!entry->replicate_valid)
2066 : : {
1410 2067 : 366 : Oid schemaId = get_rel_namespace(relid);
3152 peter_e@gmx.net 2068 : 366 : List *pubids = GetRelationPublications(relid);
2069 : :
2070 : : /*
2071 : : * We don't acquire a lock on the namespace system table as we build
2072 : : * the cache entry using a historic snapshot and all the later changes
2073 : : * are absorbed while decoding WAL.
2074 : : */
1248 tomas.vondra@postgre 2075 : 366 : List *schemaPubids = GetSchemaPublications(schemaId);
2076 : : ListCell *lc;
1977 peter@eisentraut.org 2077 : 366 : Oid publish_as_relid = relid;
1270 tomas.vondra@postgre 2078 : 366 : int publish_ancestor_level = 0;
1340 michael@paquier.xyz 2079 : 366 : bool am_partition = get_rel_relispartition(relid);
1248 tomas.vondra@postgre 2080 : 366 : char relkind = get_rel_relkind(relid);
1292 akapila@postgresql.o 2081 : 366 : List *rel_publications = NIL;
2082 : :
2083 : : /* Reload publications if needed before use. */
3152 peter_e@gmx.net 2084 [ + + ]: 366 : if (!publications_valid)
2085 : : {
271 michael@paquier.xyz 2086 : 194 : MemoryContextReset(data->pubctx);
2087 : :
2088 : 194 : oldctx = MemoryContextSwitchTo(data->pubctx);
3152 peter_e@gmx.net 2089 : 194 : data->publications = LoadPublications(data->publication_names);
2090 : 194 : MemoryContextSwitchTo(oldctx);
2091 : 194 : publications_valid = true;
2092 : : }
2093 : :
2094 : : /*
2095 : : * Reset schema_sent status as the relation definition may have
2096 : : * changed. Also reset pubactions to empty in case rel was dropped
2097 : : * from a publication. Also free any objects that depended on the
2098 : : * earlier definition.
2099 : : */
1310 akapila@postgresql.o 2100 : 366 : entry->schema_sent = false;
226 2101 : 366 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1310 2102 : 366 : list_free(entry->streamed_txns);
2103 : 366 : entry->streamed_txns = NIL;
1260 tomas.vondra@postgre 2104 : 366 : bms_free(entry->columns);
2105 : 366 : entry->columns = NULL;
1310 akapila@postgresql.o 2106 : 366 : entry->pubactions.pubinsert = false;
2107 : 366 : entry->pubactions.pubupdate = false;
2108 : 366 : entry->pubactions.pubdelete = false;
2109 : 366 : entry->pubactions.pubtruncate = false;
2110 : :
2111 : : /*
2112 : : * Tuple slots cleanups. (Will be rebuilt later if needed).
2113 : : */
1292 2114 [ + + ]: 366 : if (entry->old_slot)
2115 : : {
289 michael@paquier.xyz 2116 : 59 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2117 : :
2118 [ - + ]: 59 : Assert(desc->tdrefcount == -1);
2119 : :
1292 akapila@postgresql.o 2120 : 59 : ExecDropSingleTupleTableSlot(entry->old_slot);
2121 : :
2122 : : /*
2123 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2124 : : * do it now to avoid any leaks.
2125 : : */
289 michael@paquier.xyz 2126 : 59 : FreeTupleDesc(desc);
2127 : : }
1292 akapila@postgresql.o 2128 [ + + ]: 366 : if (entry->new_slot)
2129 : : {
289 michael@paquier.xyz 2130 : 59 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2131 : :
2132 [ - + ]: 59 : Assert(desc->tdrefcount == -1);
2133 : :
1292 akapila@postgresql.o 2134 : 59 : ExecDropSingleTupleTableSlot(entry->new_slot);
2135 : :
2136 : : /*
2137 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2138 : : * do it now to avoid any leaks.
2139 : : */
289 michael@paquier.xyz 2140 : 59 : FreeTupleDesc(desc);
2141 : : }
2142 : :
1292 akapila@postgresql.o 2143 : 366 : entry->old_slot = NULL;
2144 : 366 : entry->new_slot = NULL;
2145 : :
2146 [ + + ]: 366 : if (entry->attrmap)
2147 : 3 : free_attrmap(entry->attrmap);
2148 : 366 : entry->attrmap = NULL;
2149 : :
2150 : : /*
2151 : : * Row filter cache cleanups.
2152 : : */
1260 tomas.vondra@postgre 2153 [ + + ]: 366 : if (entry->entry_cxt)
2154 : 59 : MemoryContextDelete(entry->entry_cxt);
2155 : :
2156 : 366 : entry->entry_cxt = NULL;
1292 akapila@postgresql.o 2157 : 366 : entry->estate = NULL;
2158 : 366 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2159 : :
2160 : : /*
2161 : : * Build publication cache. We can't use one provided by relcache as
2162 : : * relcache considers all publications that the given relation is in,
2163 : : * but here we only need to consider ones that the subscriber
2164 : : * requested.
2165 : : */
3152 peter_e@gmx.net 2166 [ + + + + : 897 : foreach(lc, data->publications)
+ + ]
2167 : : {
2168 : 531 : Publication *pub = lfirst(lc);
1977 peter@eisentraut.org 2169 : 531 : bool publish = false;
2170 : :
2171 : : /*
2172 : : * Under what relid should we publish changes in this publication?
2173 : : * We'll use the top-most relid across all publications. Also
2174 : : * track the ancestor level for this publication.
2175 : : */
1213 tgl@sss.pgh.pa.us 2176 : 531 : Oid pub_relid = relid;
2177 : 531 : int ancestor_level = 0;
2178 : :
2179 : : /*
2180 : : * If this is a FOR ALL TABLES publication, pick the partition
2181 : : * root and set the ancestor level accordingly.
2182 : : */
1248 tomas.vondra@postgre 2183 [ + + ]: 531 : if (pub->alltables)
2184 : : {
1977 peter@eisentraut.org 2185 : 81 : publish = true;
2186 [ + + + - ]: 81 : if (pub->pubviaroot && am_partition)
2187 : : {
1270 tomas.vondra@postgre 2188 : 13 : List *ancestors = get_partition_ancestors(relid);
2189 : :
2190 : 13 : pub_relid = llast_oid(ancestors);
2191 : 13 : ancestor_level = list_length(ancestors);
2192 : : }
2193 : : }
2194 : :
1977 peter@eisentraut.org 2195 [ + + ]: 531 : if (!publish)
2196 : : {
1941 tgl@sss.pgh.pa.us 2197 : 450 : bool ancestor_published = false;
2198 : :
2199 : : /*
2200 : : * For a partition, check if any of the ancestors are
2201 : : * published. If so, note down the topmost ancestor that is
2202 : : * published via this publication, which will be used as the
2203 : : * relation via which to publish the partition's changes.
2204 : : */
1977 peter@eisentraut.org 2205 [ + + ]: 450 : if (am_partition)
2206 : : {
2207 : : Oid ancestor;
2208 : : int level;
1941 tgl@sss.pgh.pa.us 2209 : 121 : List *ancestors = get_partition_ancestors(relid);
2210 : :
1292 akapila@postgresql.o 2211 : 121 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2212 : : ancestors,
2213 : : &level);
2214 : :
2215 [ + + ]: 121 : if (ancestor != InvalidOid)
2216 : : {
2217 : 48 : ancestor_published = true;
2218 [ + + ]: 48 : if (pub->pubviaroot)
2219 : : {
1270 tomas.vondra@postgre 2220 : 25 : pub_relid = ancestor;
2221 : 25 : ancestor_level = level;
2222 : : }
2223 : : }
2224 : : }
2225 : :
1410 akapila@postgresql.o 2226 [ + + + + ]: 700 : if (list_member_oid(pubids, pub->oid) ||
2227 [ + + ]: 493 : list_member_oid(schemaPubids, pub->oid) ||
2228 : : ancestor_published)
1977 peter@eisentraut.org 2229 : 235 : publish = true;
2230 : : }
2231 : :
2232 : : /*
2233 : : * If the relation is to be published, determine actions to
2234 : : * publish, and list of columns, if appropriate.
2235 : : *
2236 : : * Don't publish changes for partitioned tables, because
2237 : : * publishing those of its partitions suffices, unless partition
2238 : : * changes won't be published due to pubviaroot being set.
2239 : : */
2240 [ + + + + ]: 531 : if (publish &&
2241 [ + + ]: 4 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2242 : : {
3152 peter_e@gmx.net 2243 : 313 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2244 : 313 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2245 : 313 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2709 2246 : 313 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2247 : :
2248 : : /*
2249 : : * We want to publish the changes as the top-most ancestor
2250 : : * across all publications. So we need to check if the already
2251 : : * calculated level is higher than the new one. If yes, we can
2252 : : * ignore the new value (as it's a child). Otherwise the new
2253 : : * value is an ancestor, so we keep it.
2254 : : */
1270 tomas.vondra@postgre 2255 [ + + ]: 313 : if (publish_ancestor_level > ancestor_level)
2256 : 1 : continue;
2257 : :
2258 : : /*
2259 : : * If we found an ancestor higher up in the tree, discard the
2260 : : * list of publications through which we replicate it, and use
2261 : : * the new ancestor.
2262 : : */
1269 2263 [ + + ]: 312 : if (publish_ancestor_level < ancestor_level)
2264 : : {
2265 : 38 : publish_as_relid = pub_relid;
2266 : 38 : publish_ancestor_level = ancestor_level;
2267 : :
2268 : : /* reset the publication list for this relation */
2269 : 38 : rel_publications = NIL;
2270 : : }
2271 : : else
2272 : : {
2273 : : /* Same ancestor level, has to be the same OID. */
2274 [ - + ]: 274 : Assert(publish_as_relid == pub_relid);
2275 : : }
2276 : :
2277 : : /* Track publications for this ancestor. */
2278 : 312 : rel_publications = lappend(rel_publications, pub);
2279 : : }
2280 : : }
2281 : :
1292 akapila@postgresql.o 2282 : 366 : entry->publish_as_relid = publish_as_relid;
2283 : :
2284 : : /*
2285 : : * Initialize the tuple slot, map, and row filter. These are only used
2286 : : * when publishing inserts, updates, or deletes.
2287 : : */
2288 [ + + + - ]: 366 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2289 [ - + ]: 67 : entry->pubactions.pubdelete)
2290 : : {
2291 : : /* Initialize the tuple slot and map */
2292 : 299 : init_tuple_slot(data, relation, entry);
2293 : :
2294 : : /* Initialize the row filter */
2295 : 299 : pgoutput_row_filter_init(data, rel_publications, entry);
2296 : :
2297 : : /* Check whether to publish generated columns. */
303 2298 : 299 : check_and_init_gencol(data, rel_publications, entry);
2299 : :
2300 : : /* Initialize the column list */
1260 tomas.vondra@postgre 2301 : 299 : pgoutput_column_list_init(data, rel_publications, entry);
2302 : : }
2303 : :
3152 peter_e@gmx.net 2304 : 365 : list_free(pubids);
1310 akapila@postgresql.o 2305 : 365 : list_free(schemaPubids);
1292 2306 : 365 : list_free(rel_publications);
2307 : :
3152 peter_e@gmx.net 2308 : 365 : entry->replicate_valid = true;
2309 : : }
2310 : :
2311 : 186426 : return entry;
2312 : : }
2313 : :
2314 : : /*
2315 : : * Cleanup list of streamed transactions and update the schema_sent flag.
2316 : : *
2317 : : * When a streamed transaction commits or aborts, we need to remove the
2318 : : * toplevel XID from the schema cache. If the transaction aborted, the
2319 : : * subscriber will simply throw away the schema records we streamed, so
2320 : : * we don't need to do anything else.
2321 : : *
2322 : : * If the transaction is committed, the subscriber will update the relation
2323 : : * cache - so tweak the schema_sent flag accordingly.
2324 : : */
2325 : : static void
1829 akapila@postgresql.o 2326 : 71 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2327 : : {
2328 : : HASH_SEQ_STATUS hash_seq;
2329 : : RelationSyncEntry *entry;
2330 : :
2331 [ - + ]: 71 : Assert(RelationSyncCache != NULL);
2332 : :
2333 : 71 : hash_seq_init(&hash_seq, RelationSyncCache);
2334 [ + + ]: 145 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2335 : : {
2336 : : /*
2337 : : * We can set the schema_sent flag for an entry that has committed xid
2338 : : * in the list as that ensures that the subscriber would have the
2339 : : * corresponding schema and we don't need to send it unless there is
2340 : : * any invalidation for that relation.
2341 : : */
611 nathan@postgresql.or 2342 [ + + + - : 167 : foreach_xid(streamed_txn, entry->streamed_txns)
+ + ]
2343 : : {
2344 [ + + ]: 72 : if (xid == streamed_txn)
2345 : : {
1829 akapila@postgresql.o 2346 [ + + ]: 53 : if (is_commit)
2347 : 42 : entry->schema_sent = true;
2348 : :
2349 : 53 : entry->streamed_txns =
611 nathan@postgresql.or 2350 : 53 : foreach_delete_current(entry->streamed_txns, streamed_txn);
1829 akapila@postgresql.o 2351 : 53 : break;
2352 : : }
2353 : : }
2354 : : }
2355 : 71 : }
2356 : :
2357 : : /*
2358 : : * Relcache invalidation callback
2359 : : */
2360 : : static void
3152 peter_e@gmx.net 2361 : 3981 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2362 : : {
2363 : : RelationSyncEntry *entry;
2364 : :
2365 : : /*
2366 : : * We can get here if the plugin was used in SQL interface as the
2367 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2368 : : * no way to unregister the relcache invalidation callback.
2369 : : */
2370 [ + + ]: 3981 : if (RelationSyncCache == NULL)
2371 : 26 : return;
2372 : :
2373 : : /*
2374 : : * Nobody keeps pointers to entries in this hash table around outside
2375 : : * logical decoding callback calls - but invalidation events can come in
2376 : : * *during* a callback if we do any syscache access in the callback.
2377 : : * Because of that we must mark the cache entry as invalid but not damage
2378 : : * any of its substructure here. The next get_rel_sync_entry() call will
2379 : : * rebuild it all.
2380 : : */
1310 akapila@postgresql.o 2381 [ + + ]: 3955 : if (OidIsValid(relid))
2382 : : {
2383 : : /*
2384 : : * Getting invalidations for relations that aren't in the table is
2385 : : * entirely normal. So we don't care if it's found or not.
2386 : : */
2387 : 3897 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2388 : : HASH_FIND, NULL);
2389 [ + + ]: 3897 : if (entry != NULL)
2390 : 643 : entry->replicate_valid = false;
2391 : : }
2392 : : else
2393 : : {
2394 : : /* Whole cache must be flushed. */
2395 : : HASH_SEQ_STATUS status;
2396 : :
2397 : 58 : hash_seq_init(&status, RelationSyncCache);
2398 [ + + ]: 119 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2399 : : {
2400 : 61 : entry->replicate_valid = false;
2401 : : }
2402 : : }
2403 : : }
2404 : :
2405 : : /*
2406 : : * Publication relation/schema map syscache invalidation callback
2407 : : *
2408 : : * Called for invalidations on pg_namespace.
2409 : : */
2410 : : static void
3152 peter_e@gmx.net 2411 : 35 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2412 : : {
2413 : : HASH_SEQ_STATUS status;
2414 : : RelationSyncEntry *entry;
2415 : :
2416 : : /*
2417 : : * We can get here if the plugin was used in SQL interface as the
2418 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2419 : : * no way to unregister the invalidation callbacks.
2420 : : */
2421 [ + + ]: 35 : if (RelationSyncCache == NULL)
2422 : 10 : return;
2423 : :
2424 : : /*
2425 : : * We have no easy way to identify which cache entries this invalidation
2426 : : * event might have affected, so just mark them all invalid.
2427 : : */
2428 : 25 : hash_seq_init(&status, RelationSyncCache);
2429 [ + + ]: 46 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2430 : : {
2431 : 21 : entry->replicate_valid = false;
2432 : : }
2433 : : }
2434 : :
2435 : : /* Send Replication origin */
2436 : : static void
1515 akapila@postgresql.o 2437 : 1091 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2438 : : XLogRecPtr origin_lsn, bool send_origin)
2439 : : {
2440 [ + + ]: 1091 : if (send_origin)
2441 : : {
2442 : : char *origin;
2443 : :
2444 : : /*----------
2445 : : * XXX: which behaviour do we want here?
2446 : : *
2447 : : * Alternatives:
2448 : : * - don't send origin message if origin name not found
2449 : : * (that's what we do now)
2450 : : * - throw error - that will break replication, not good
2451 : : * - send some special "unknown" origin
2452 : : *----------
2453 : : */
2454 [ + - ]: 8 : if (replorigin_by_oid(origin_id, true, &origin))
2455 : : {
2456 : : /* Message boundary */
2457 : 8 : OutputPluginWrite(ctx, false);
2458 : 8 : OutputPluginPrepareWrite(ctx, true);
2459 : :
2460 : 8 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2461 : : }
2462 : : }
2463 : 1091 : }
|