Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgoutput.c
4 : : * Logical Replication output plugin
5 : : *
6 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgoutput/pgoutput.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/tupconvert.h"
16 : : #include "catalog/partition.h"
17 : : #include "catalog/pg_publication.h"
18 : : #include "catalog/pg_publication_rel.h"
19 : : #include "catalog/pg_subscription.h"
20 : : #include "commands/defrem.h"
21 : : #include "commands/subscriptioncmds.h"
22 : : #include "executor/executor.h"
23 : : #include "fmgr.h"
24 : : #include "nodes/makefuncs.h"
25 : : #include "parser/parse_relation.h"
26 : : #include "replication/logical.h"
27 : : #include "replication/logicalproto.h"
28 : : #include "replication/origin.h"
29 : : #include "replication/pgoutput.h"
30 : : #include "rewrite/rewriteHandler.h"
31 : : #include "utils/builtins.h"
32 : : #include "utils/inval.h"
33 : : #include "utils/lsyscache.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/rel.h"
36 : : #include "utils/syscache.h"
37 : : #include "utils/varlena.h"
38 : :
354 tgl@sss.pgh.pa.us 39 :CBC 577 : PG_MODULE_MAGIC_EXT(
40 : : .name = "pgoutput",
41 : : .version = PG_VERSION
42 : : );
43 : :
44 : : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : : OutputPluginOptions *opt, bool is_init);
46 : : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : : ReorderBufferTXN *txn);
49 : : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : : ReorderBufferTXN *txn, Relation relation,
53 : : ReorderBufferChange *change);
54 : : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : : ReorderBufferChange *change);
57 : : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : : bool transactional, const char *prefix,
60 : : Size sz, const char *message);
61 : : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : : ReplOriginId origin_id);
63 : : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : : ReorderBufferTXN *txn);
65 : : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : : ReorderBufferTXN *txn,
71 : : XLogRecPtr prepare_end_lsn,
72 : : TimestampTz prepare_time);
73 : : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : : ReorderBufferTXN *txn);
75 : : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : : ReorderBufferTXN *txn);
77 : : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : : ReorderBufferTXN *txn,
79 : : XLogRecPtr abort_lsn);
80 : : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : : ReorderBufferTXN *txn,
82 : : XLogRecPtr commit_lsn);
83 : : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 : :
86 : : static bool publications_valid;
87 : :
88 : : static List *LoadPublications(List *pubnames);
89 : : static void publication_invalidation_cb(Datum arg, SysCacheIdentifier cacheid,
90 : : uint32 hashvalue);
91 : : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : : ReplOriginId origin_id, XLogRecPtr origin_lsn,
93 : : bool send_origin);
94 : :
95 : : /*
96 : : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : : * "delete"). See RelationSyncEntry.exprstate[].
98 : : */
99 : : enum RowFilterPubAction
100 : : {
101 : : PUBACTION_INSERT,
102 : : PUBACTION_UPDATE,
103 : : PUBACTION_DELETE,
104 : : };
105 : :
106 : : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 : :
108 : : /*
109 : : * Entry in the map used to remember which relation schemas we sent.
110 : : *
111 : : * The schema_sent flag determines if the current schema record for the
112 : : * relation (and for its ancestor if publish_as_relid is set) was already
113 : : * sent to the subscriber (in which case we don't need to send it again).
114 : : *
115 : : * The schema cache on downstream is however updated only at commit time,
116 : : * and with streamed transactions the commit order may be different from
117 : : * the order the transactions are sent in. Also, the (sub) transactions
118 : : * might get aborted so we need to send the schema for each (sub) transaction
119 : : * so that we don't lose the schema information on abort. For handling this,
120 : : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : : * the schema.
122 : : *
123 : : * For partitions, 'pubactions' considers not only the table's own
124 : : * publications, but also those of all of its ancestors.
125 : : */
126 : : typedef struct RelationSyncEntry
127 : : {
128 : : Oid relid; /* relation oid */
129 : :
130 : : bool replicate_valid; /* overall validity flag for entry */
131 : :
132 : : bool schema_sent;
133 : :
134 : : /*
135 : : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : : * columns and the 'publish_generated_columns' parameter is set to
137 : : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : : * indicating that no generated columns should be published, unless
139 : : * explicitly specified in the column list.
140 : : */
141 : : PublishGencolsType include_gencols_type;
142 : : List *streamed_txns; /* streamed toplevel transactions with this
143 : : * schema */
144 : :
145 : : /* are we publishing this rel? */
146 : : PublicationActions pubactions;
147 : :
148 : : /*
149 : : * ExprState array for row filter. Different publication actions don't
150 : : * allow multiple expressions to always be combined into one, because
151 : : * updates or deletes restrict the column in expression to be part of the
152 : : * replica identity index whereas inserts do not have this restriction, so
153 : : * there is one ExprState per publication action.
154 : : */
155 : : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : : EState *estate; /* executor state used for row filter */
157 : : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 : :
160 : : /*
161 : : * OID of the relation to publish changes as. For a partition, this may
162 : : * be set to one of its ancestors whose schema will be used when
163 : : * replicating changes, if publish_via_partition_root is set for the
164 : : * publication.
165 : : */
166 : : Oid publish_as_relid;
167 : :
168 : : /*
169 : : * Map used when replicating using an ancestor's schema to convert tuples
170 : : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : : * having identical TupleDesc.
173 : : */
174 : : AttrMap *attrmap;
175 : :
176 : : /*
177 : : * Columns included in the publication, or NULL if all columns are
178 : : * included implicitly. Note that the attnums in this bitmap are not
179 : : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : : */
181 : : Bitmapset *columns;
182 : :
183 : : /*
184 : : * Private context to store additional data for this entry - state for the
185 : : * row filter expressions, column list, etc.
186 : : */
187 : : MemoryContext entry_cxt;
188 : : } RelationSyncEntry;
189 : :
190 : : /*
191 : : * Maintain a per-transaction level variable to track whether the transaction
192 : : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : : * messages for empty transactions which saves network bandwidth.
195 : : *
196 : : * This optimization is not used for prepared transactions because if the
197 : : * WALSender restarts after prepare of a transaction and before commit prepared
198 : : * of the same transaction then we won't be able to figure out if we have
199 : : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : : * because we would have lost the in-memory txndata information that was
201 : : * present prior to the restart. This will result in sending a spurious
202 : : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : : * downstream which would lead to an error when it tries to process it.
204 : : *
205 : : * XXX We could achieve this optimization by changing protocol to send
206 : : * additional information so that downstream can detect that the corresponding
207 : : * prepare has not been sent. However, adding such a check for every
208 : : * transaction in the downstream could be costly so we might want to do it
209 : : * optionally.
210 : : *
211 : : * We also don't have this optimization for streamed transactions because
212 : : * they can contain prepared transactions.
213 : : */
214 : : typedef struct PGOutputTxnData
215 : : {
216 : : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : : } PGOutputTxnData;
218 : :
219 : : /* Map used to remember which relation schemas we sent. */
220 : : static HTAB *RelationSyncCache = NULL;
221 : :
222 : : static void init_rel_sync_cache(MemoryContext cachectx);
223 : : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : : Relation relation);
226 : : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : : LogicalDecodingContext *ctx,
228 : : RelationSyncEntry *relentry);
229 : : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : : static void rel_sync_cache_publication_cb(Datum arg, SysCacheIdentifier cacheid,
231 : : uint32 hashvalue);
232 : : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : : TransactionId xid);
234 : : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : : TransactionId xid);
236 : : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : : RelationSyncEntry *entry);
238 : : static void pgoutput_memory_context_reset(void *arg);
239 : :
240 : : /* row filter routines */
241 : : static EState *create_estate_for_relation(Relation rel);
242 : : static void pgoutput_row_filter_init(PGOutputData *data,
243 : : List *publications,
244 : : RelationSyncEntry *entry);
245 : : static bool pgoutput_row_filter_exec_expr(ExprState *state,
246 : : ExprContext *econtext);
247 : : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
248 : : TupleTableSlot **new_slot_ptr,
249 : : RelationSyncEntry *entry,
250 : : ReorderBufferChangeType *action);
251 : :
252 : : /* column list routines */
253 : : static void pgoutput_column_list_init(PGOutputData *data,
254 : : List *publications,
255 : : RelationSyncEntry *entry);
256 : :
257 : : /*
258 : : * Specify output plugin callbacks
259 : : */
260 : : void
3342 peter_e@gmx.net 261 : 775 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
262 : : {
263 : 775 : cb->startup_cb = pgoutput_startup;
264 : 775 : cb->begin_cb = pgoutput_begin_txn;
265 : 775 : cb->change_cb = pgoutput_change;
2899 266 : 775 : cb->truncate_cb = pgoutput_truncate;
1804 akapila@postgresql.o 267 : 775 : cb->message_cb = pgoutput_message;
3342 peter_e@gmx.net 268 : 775 : cb->commit_cb = pgoutput_commit_txn;
269 : :
1705 akapila@postgresql.o 270 : 775 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
271 : 775 : cb->prepare_cb = pgoutput_prepare_txn;
272 : 775 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
273 : 775 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
3342 peter_e@gmx.net 274 : 775 : cb->filter_by_origin_cb = pgoutput_origin_filter;
275 : 775 : cb->shutdown_cb = pgoutput_shutdown;
276 : :
277 : : /* transaction streaming */
2019 akapila@postgresql.o 278 : 775 : cb->stream_start_cb = pgoutput_stream_start;
279 : 775 : cb->stream_stop_cb = pgoutput_stream_stop;
280 : 775 : cb->stream_abort_cb = pgoutput_stream_abort;
281 : 775 : cb->stream_commit_cb = pgoutput_stream_commit;
282 : 775 : cb->stream_change_cb = pgoutput_change;
1804 283 : 775 : cb->stream_message_cb = pgoutput_message;
2019 284 : 775 : cb->stream_truncate_cb = pgoutput_truncate;
285 : : /* transaction streaming - two-phase commit */
1684 286 : 775 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
3342 peter_e@gmx.net 287 : 775 : }
288 : :
289 : : static void
1804 akapila@postgresql.o 290 : 426 : parse_output_parameters(List *options, PGOutputData *data)
291 : : {
292 : : ListCell *lc;
3342 peter_e@gmx.net 293 : 426 : bool protocol_version_given = false;
294 : 426 : bool publication_names_given = false;
2066 tgl@sss.pgh.pa.us 295 : 426 : bool binary_option_given = false;
1804 akapila@postgresql.o 296 : 426 : bool messages_option_given = false;
2019 297 : 426 : bool streaming_given = false;
1705 298 : 426 : bool two_phase_option_given = false;
1333 299 : 426 : bool origin_option_given = false;
300 : :
301 : : /* Initialize optional parameters to defaults */
1804 302 : 426 : data->binary = false;
1161 303 : 426 : data->streaming = LOGICALREP_STREAM_OFF;
1804 304 : 426 : data->messages = false;
1705 305 : 426 : data->two_phase = false;
242 fujii@postgresql.org 306 :GNC 426 : data->publish_no_origin = false;
307 : :
3342 peter_e@gmx.net 308 [ + - + + :CBC 2118 : foreach(lc, options)
+ + ]
309 : : {
310 : 1692 : DefElem *defel = (DefElem *) lfirst(lc);
311 : :
312 [ + - - + ]: 1692 : Assert(defel->arg == NULL || IsA(defel->arg, String));
313 : :
314 : : /* Check each param, whether or not we recognize it */
315 [ + + ]: 1692 : if (strcmp(defel->defname, "proto_version") == 0)
316 : : {
317 : : unsigned long parsed;
318 : : char *endptr;
319 : :
320 [ - + ]: 426 : if (protocol_version_given)
3342 peter_e@gmx.net 321 [ # # ]:UBC 0 : ereport(ERROR,
322 : : (errcode(ERRCODE_SYNTAX_ERROR),
323 : : errmsg("conflicting or redundant options")));
3342 peter_e@gmx.net 324 :CBC 426 : protocol_version_given = true;
325 : :
1490 peter@eisentraut.org 326 : 426 : errno = 0;
327 : 426 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
328 [ + - - + ]: 426 : if (errno != 0 || *endptr != '\0')
3342 peter_e@gmx.net 329 [ # # ]:UBC 0 : ereport(ERROR,
330 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 : : errmsg("invalid proto_version")));
332 : :
1490 peter@eisentraut.org 333 [ - + ]:CBC 426 : if (parsed > PG_UINT32_MAX)
3342 peter_e@gmx.net 334 [ # # ]:UBC 0 : ereport(ERROR,
335 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
336 : : errmsg("proto_version \"%s\" out of range",
337 : : strVal(defel->arg))));
338 : :
1804 akapila@postgresql.o 339 :CBC 426 : data->protocol_version = (uint32) parsed;
340 : : }
3342 peter_e@gmx.net 341 [ + + ]: 1266 : else if (strcmp(defel->defname, "publication_names") == 0)
342 : : {
343 [ - + ]: 426 : if (publication_names_given)
3342 peter_e@gmx.net 344 [ # # ]:UBC 0 : ereport(ERROR,
345 : : (errcode(ERRCODE_SYNTAX_ERROR),
346 : : errmsg("conflicting or redundant options")));
3342 peter_e@gmx.net 347 :CBC 426 : publication_names_given = true;
348 : :
349 : : /*
350 : : * Pass a copy of the DefElem->arg since SplitIdentifierString
351 : : * modifies its input.
352 : : */
68 drowley@postgresql.o 353 [ - + ]: 426 : if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
354 : : &data->publication_names))
3224 bruce@momjian.us 355 [ # # ]:UBC 0 : ereport(ERROR,
356 : : (errcode(ERRCODE_INVALID_NAME),
357 : : errmsg("invalid publication_names syntax")));
358 : : }
2066 tgl@sss.pgh.pa.us 359 [ + + ]:CBC 840 : else if (strcmp(defel->defname, "binary") == 0)
360 : : {
361 [ - + ]: 10 : if (binary_option_given)
2066 tgl@sss.pgh.pa.us 362 [ # # ]:UBC 0 : ereport(ERROR,
363 : : (errcode(ERRCODE_SYNTAX_ERROR),
364 : : errmsg("conflicting or redundant options")));
2066 tgl@sss.pgh.pa.us 365 :CBC 10 : binary_option_given = true;
366 : :
1804 akapila@postgresql.o 367 : 10 : data->binary = defGetBoolean(defel);
368 : : }
369 [ + + ]: 830 : else if (strcmp(defel->defname, "messages") == 0)
370 : : {
371 [ - + ]: 4 : if (messages_option_given)
1804 akapila@postgresql.o 372 [ # # ]:UBC 0 : ereport(ERROR,
373 : : (errcode(ERRCODE_SYNTAX_ERROR),
374 : : errmsg("conflicting or redundant options")));
1804 akapila@postgresql.o 375 :CBC 4 : messages_option_given = true;
376 : :
377 : 4 : data->messages = defGetBoolean(defel);
378 : : }
2019 379 [ + + ]: 826 : else if (strcmp(defel->defname, "streaming") == 0)
380 : : {
381 [ - + ]: 405 : if (streaming_given)
2019 akapila@postgresql.o 382 [ # # ]:UBC 0 : ereport(ERROR,
383 : : (errcode(ERRCODE_SYNTAX_ERROR),
384 : : errmsg("conflicting or redundant options")));
2019 akapila@postgresql.o 385 :CBC 405 : streaming_given = true;
386 : :
1161 387 : 405 : data->streaming = defGetStreamingMode(defel);
388 : : }
1705 389 [ + + ]: 421 : else if (strcmp(defel->defname, "two_phase") == 0)
390 : : {
391 [ - + ]: 7 : if (two_phase_option_given)
1705 akapila@postgresql.o 392 [ # # ]:UBC 0 : ereport(ERROR,
393 : : (errcode(ERRCODE_SYNTAX_ERROR),
394 : : errmsg("conflicting or redundant options")));
1705 akapila@postgresql.o 395 :CBC 7 : two_phase_option_given = true;
396 : :
397 : 7 : data->two_phase = defGetBoolean(defel);
398 : : }
1333 399 [ + - ]: 414 : else if (strcmp(defel->defname, "origin") == 0)
400 : : {
401 : : char *origin;
402 : :
403 [ - + ]: 414 : if (origin_option_given)
1333 akapila@postgresql.o 404 [ # # ]:UBC 0 : ereport(ERROR,
405 : : errcode(ERRCODE_SYNTAX_ERROR),
406 : : errmsg("conflicting or redundant options"));
1333 akapila@postgresql.o 407 :CBC 414 : origin_option_given = true;
408 : :
900 409 : 414 : origin = defGetString(defel);
410 [ + + ]: 414 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
411 : 27 : data->publish_no_origin = true;
412 [ + - ]: 387 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
413 : 387 : data->publish_no_origin = false;
414 : : else
1333 akapila@postgresql.o 415 [ # # ]:UBC 0 : ereport(ERROR,
416 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : : errmsg("unrecognized origin value: \"%s\"", origin));
418 : : }
419 : : else
3342 peter_e@gmx.net 420 [ # # ]: 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
421 : : }
422 : :
423 : : /* Check required options */
817 akapila@postgresql.o 424 [ - + ]:CBC 426 : if (!protocol_version_given)
817 akapila@postgresql.o 425 [ # # ]:UBC 0 : ereport(ERROR,
426 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427 : : errmsg("option \"%s\" missing", "proto_version"));
817 akapila@postgresql.o 428 [ - + ]:CBC 426 : if (!publication_names_given)
817 akapila@postgresql.o 429 [ # # ]:UBC 0 : ereport(ERROR,
430 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
431 : : errmsg("option \"%s\" missing", "publication_names"));
3342 peter_e@gmx.net 432 :CBC 426 : }
433 : :
434 : : /*
435 : : * Memory context reset callback of PGOutputData->context.
436 : : */
437 : : static void
157 msawada@postgresql.o 438 : 1108 : pgoutput_memory_context_reset(void *arg)
439 : : {
440 [ + + ]: 1108 : if (RelationSyncCache)
441 : : {
442 : 205 : hash_destroy(RelationSyncCache);
443 : 205 : RelationSyncCache = NULL;
444 : : }
445 : 1108 : }
446 : :
447 : : /*
448 : : * Initialize this plugin
449 : : */
450 : : static void
3224 bruce@momjian.us 451 : 775 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
452 : : bool is_init)
453 : : {
95 michael@paquier.xyz 454 :GNC 775 : PGOutputData *data = palloc0_object(PGOutputData);
455 : : static bool publication_callback_registered = false;
456 : : MemoryContextCallback *mcallback;
457 : :
458 : : /* Create our memory context for private allocations. */
3342 peter_e@gmx.net 459 :CBC 775 : data->context = AllocSetContextCreate(ctx->context,
460 : : "logical replication output context",
461 : : ALLOCSET_DEFAULT_SIZES);
462 : :
1482 akapila@postgresql.o 463 : 775 : data->cachectx = AllocSetContextCreate(ctx->context,
464 : : "logical replication cache context",
465 : : ALLOCSET_DEFAULT_SIZES);
466 : :
461 michael@paquier.xyz 467 : 775 : data->pubctx = AllocSetContextCreate(ctx->context,
468 : : "logical replication publication list context",
469 : : ALLOCSET_SMALL_SIZES);
470 : :
471 : : /*
472 : : * Ensure to cleanup RelationSyncCache even when logical decoding invoked
473 : : * via SQL interface ends up with an error.
474 : : */
95 michael@paquier.xyz 475 :GNC 775 : mcallback = palloc0_object(MemoryContextCallback);
157 msawada@postgresql.o 476 :CBC 775 : mcallback->func = pgoutput_memory_context_reset;
477 : 775 : MemoryContextRegisterResetCallback(ctx->context, mcallback);
478 : :
3342 peter_e@gmx.net 479 : 775 : ctx->output_plugin_private = data;
480 : :
481 : : /* This plugin uses binary protocol. */
482 : 775 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
483 : :
484 : : /*
485 : : * This is replication start and not slot initialization.
486 : : *
487 : : * Parse and validate options passed by the client.
488 : : */
489 [ + + ]: 775 : if (!is_init)
490 : : {
491 : : /* Parse the params and ERROR if we see any we don't recognize */
1804 akapila@postgresql.o 492 : 426 : parse_output_parameters(ctx->output_plugin_options, data);
493 : :
494 : : /* Check if we support requested protocol */
1996 495 [ - + ]: 426 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
3342 peter_e@gmx.net 496 [ # # ]:UBC 0 : ereport(ERROR,
497 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
498 : : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
499 : : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
500 : :
3342 peter_e@gmx.net 501 [ - + ]:CBC 426 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
3342 peter_e@gmx.net 502 [ # # ]:UBC 0 : ereport(ERROR,
503 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
504 : : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
505 : : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
506 : :
507 : : /*
508 : : * Decide whether to enable streaming. It is disabled by default, in
509 : : * which case we just update the flag in decoding context. Otherwise
510 : : * we only allow it with sufficient version of the protocol, and when
511 : : * the output plugin supports it.
512 : : */
1161 akapila@postgresql.o 513 [ + + ]:CBC 426 : if (data->streaming == LOGICALREP_STREAM_OFF)
2019 514 : 21 : ctx->streaming = false;
1161 515 [ + + ]: 405 : else if (data->streaming == LOGICALREP_STREAM_ON &&
516 [ - + ]: 26 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
2019 akapila@postgresql.o 517 [ # # ]:UBC 0 : ereport(ERROR,
518 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
519 : : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
520 : : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
1161 akapila@postgresql.o 521 [ + + ]:CBC 405 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
522 [ - + ]: 379 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
1161 akapila@postgresql.o 523 [ # # ]:UBC 0 : ereport(ERROR,
524 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
525 : : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
526 : : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
2019 akapila@postgresql.o 527 [ - + ]:CBC 405 : else if (!ctx->streaming)
2019 akapila@postgresql.o 528 [ # # ]:UBC 0 : ereport(ERROR,
529 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
530 : : errmsg("streaming requested, but not supported by output plugin")));
531 : :
532 : : /*
533 : : * Here, we just check whether the two-phase option is passed by
534 : : * plugin and decide whether to enable it at later point of time. It
535 : : * remains enabled if the previous start-up has done so. But we only
536 : : * allow the option to be passed in with sufficient version of the
537 : : * protocol, and when the output plugin supports it.
538 : : */
1705 akapila@postgresql.o 539 [ + + ]:CBC 426 : if (!data->two_phase)
540 : 419 : ctx->twophase_opt_given = false;
541 [ - + ]: 7 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
1705 akapila@postgresql.o 542 [ # # ]:UBC 0 : ereport(ERROR,
543 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
544 : : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
545 : : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
1705 akapila@postgresql.o 546 [ - + ]:CBC 7 : else if (!ctx->twophase)
1705 akapila@postgresql.o 547 [ # # ]:UBC 0 : ereport(ERROR,
548 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
549 : : errmsg("two-phase commit requested, but not supported by output plugin")));
550 : : else
1705 akapila@postgresql.o 551 :CBC 7 : ctx->twophase_opt_given = true;
552 : :
553 : : /* Init publication state. */
3342 peter_e@gmx.net 554 : 426 : data->publications = NIL;
555 : 426 : publications_valid = false;
556 : :
557 : : /*
558 : : * Register callback for pg_publication if we didn't already do that
559 : : * during some previous call in this process.
560 : : */
1116 tgl@sss.pgh.pa.us 561 [ + + ]: 426 : if (!publication_callback_registered)
562 : : {
563 : 424 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
564 : : publication_invalidation_cb,
565 : : (Datum) 0);
367 akapila@postgresql.o 566 : 424 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
567 : : (Datum) 0);
1116 tgl@sss.pgh.pa.us 568 : 424 : publication_callback_registered = true;
569 : : }
570 : :
571 : : /* Initialize relation schema cache. */
3342 peter_e@gmx.net 572 : 426 : init_rel_sync_cache(CacheMemoryContext);
573 : : }
574 : : else
575 : : {
576 : : /*
577 : : * Disable the streaming and prepared transactions during the slot
578 : : * initialization mode.
579 : : */
2019 akapila@postgresql.o 580 : 349 : ctx->streaming = false;
1705 581 : 349 : ctx->twophase = false;
582 : : }
3342 peter_e@gmx.net 583 : 775 : }
584 : :
585 : : /*
586 : : * BEGIN callback.
587 : : *
588 : : * Don't send the BEGIN message here instead postpone it until the first
589 : : * change. In logical replication, a common scenario is to replicate a set of
590 : : * tables (instead of all tables) and transactions whose changes were on
591 : : * the table(s) that are not published will produce empty transactions. These
592 : : * empty transactions will send BEGIN and COMMIT messages to subscribers,
593 : : * using bandwidth on something with little/no use for logical replication.
594 : : */
595 : : static void
1403 tgl@sss.pgh.pa.us 596 : 1096 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
597 : : {
598 : 1096 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
599 : : sizeof(PGOutputTxnData));
600 : :
1446 akapila@postgresql.o 601 : 1096 : txn->output_plugin_private = txndata;
602 : 1096 : }
603 : :
604 : : /*
605 : : * Send BEGIN.
606 : : *
607 : : * This is called while processing the first change of the transaction.
608 : : */
609 : : static void
610 : 466 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
611 : : {
46 msawada@postgresql.o 612 :GNC 466 : bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
1446 akapila@postgresql.o 613 :CBC 466 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
614 : :
615 [ - + ]: 466 : Assert(txndata);
616 [ - + ]: 466 : Assert(!txndata->sent_begin_txn);
617 : :
3342 peter_e@gmx.net 618 : 466 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
619 : 466 : logicalrep_write_begin(ctx->out, txn);
1446 akapila@postgresql.o 620 : 466 : txndata->sent_begin_txn = true;
621 : :
1705 622 : 466 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
623 : : send_replication_origin);
624 : :
3342 peter_e@gmx.net 625 : 466 : OutputPluginWrite(ctx, true);
626 : 465 : }
627 : :
628 : : /*
629 : : * COMMIT callback
630 : : */
631 : : static void
632 : 1094 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
633 : : XLogRecPtr commit_lsn)
634 : : {
1446 akapila@postgresql.o 635 : 1094 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
636 : : bool sent_begin_txn;
637 : :
638 [ - + ]: 1094 : Assert(txndata);
639 : :
640 : : /*
641 : : * We don't need to send the commit message unless some relevant change
642 : : * from this transaction has been sent to the downstream.
643 : : */
644 : 1094 : sent_begin_txn = txndata->sent_begin_txn;
1131 645 : 1094 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
1446 646 : 1094 : pfree(txndata);
647 : 1094 : txn->output_plugin_private = NULL;
648 : :
649 [ + + ]: 1094 : if (!sent_begin_txn)
650 : : {
651 [ + + ]: 629 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
652 : 629 : return;
653 : : }
654 : :
3342 peter_e@gmx.net 655 : 465 : OutputPluginPrepareWrite(ctx, true);
656 : 465 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
657 : 465 : OutputPluginWrite(ctx, true);
658 : : }
659 : :
660 : : /*
661 : : * BEGIN PREPARE callback
662 : : */
663 : : static void
1705 akapila@postgresql.o 664 : 17 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
665 : : {
46 msawada@postgresql.o 666 :GNC 17 : bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
667 : :
1705 akapila@postgresql.o 668 :CBC 17 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
669 : 17 : logicalrep_write_begin_prepare(ctx->out, txn);
670 : :
671 : 17 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
672 : : send_replication_origin);
673 : :
674 : 17 : OutputPluginWrite(ctx, true);
675 : 17 : }
676 : :
677 : : /*
678 : : * PREPARE callback
679 : : */
680 : : static void
681 : 17 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
682 : : XLogRecPtr prepare_lsn)
683 : : {
1131 684 : 17 : OutputPluginUpdateProgress(ctx, false);
685 : :
1705 686 : 17 : OutputPluginPrepareWrite(ctx, true);
687 : 17 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
688 : 17 : OutputPluginWrite(ctx, true);
689 : 17 : }
690 : :
691 : : /*
692 : : * COMMIT PREPARED callback
693 : : */
694 : : static void
695 : 24 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
696 : : XLogRecPtr commit_lsn)
697 : : {
1131 698 : 24 : OutputPluginUpdateProgress(ctx, false);
699 : :
1705 700 : 24 : OutputPluginPrepareWrite(ctx, true);
701 : 24 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
702 : 24 : OutputPluginWrite(ctx, true);
703 : 24 : }
704 : :
705 : : /*
706 : : * ROLLBACK PREPARED callback
707 : : */
708 : : static void
709 : 5 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
710 : : ReorderBufferTXN *txn,
711 : : XLogRecPtr prepare_end_lsn,
712 : : TimestampTz prepare_time)
713 : : {
1131 714 : 5 : OutputPluginUpdateProgress(ctx, false);
715 : :
1705 716 : 5 : OutputPluginPrepareWrite(ctx, true);
717 : 5 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
718 : : prepare_time);
719 : 5 : OutputPluginWrite(ctx, true);
720 : 5 : }
721 : :
722 : : /*
723 : : * Write the current schema of the relation and its ancestor (if any) if not
724 : : * done yet.
725 : : */
726 : : static void
2899 peter_e@gmx.net 727 : 182267 : maybe_send_schema(LogicalDecodingContext *ctx,
728 : : ReorderBufferChange *change,
729 : : Relation relation, RelationSyncEntry *relentry)
730 : : {
899 michael@paquier.xyz 731 : 182267 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
732 : : bool schema_sent;
2019 akapila@postgresql.o 733 : 182267 : TransactionId xid = InvalidTransactionId;
734 : 182267 : TransactionId topxid = InvalidTransactionId;
735 : :
736 : : /*
737 : : * Remember XID of the (sub)transaction for the change. We don't care if
738 : : * it's top-level transaction or not (we have already sent that XID in
739 : : * start of the current streaming block).
740 : : *
741 : : * If we're not in a streaming block, just use InvalidTransactionId and
742 : : * the write methods will not include it.
743 : : */
899 michael@paquier.xyz 744 [ + + ]: 182267 : if (data->in_streaming)
2019 akapila@postgresql.o 745 : 175903 : xid = change->txn->xid;
746 : :
1094 747 [ + + ]: 182267 : if (rbtxn_is_subtxn(change->txn))
748 [ + - ]: 10169 : topxid = rbtxn_get_toptxn(change->txn)->xid;
749 : : else
2019 750 : 172098 : topxid = xid;
751 : :
752 : : /*
753 : : * Do we need to send the schema? We do track streamed transactions
754 : : * separately, because those may be applied later (and the regular
755 : : * transactions won't see their effects until then) and in an order that
756 : : * we don't know at this point.
757 : : *
758 : : * XXX There is a scope of optimization here. Currently, we always send
759 : : * the schema first time in a streaming transaction but we can probably
760 : : * avoid that by checking 'relentry->schema_sent' flag. However, before
761 : : * doing that we need to study its impact on the case where we have a mix
762 : : * of streaming and non-streaming transactions.
763 : : */
899 michael@paquier.xyz 764 [ + + ]: 182267 : if (data->in_streaming)
2019 akapila@postgresql.o 765 : 175903 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
766 : : else
767 : 6364 : schema_sent = relentry->schema_sent;
768 : :
769 : : /* Nothing to do if we already sent the schema. */
770 [ + + ]: 182267 : if (schema_sent)
2167 peter@eisentraut.org 771 : 181918 : return;
772 : :
773 : : /*
774 : : * Send the schema. If the changes will be published using an ancestor's
775 : : * schema, not the relation's own, send that ancestor's schema before
776 : : * sending relation's own (XXX - maybe sending only the former suffices?).
777 : : */
778 [ + + ]: 349 : if (relentry->publish_as_relid != RelationGetRelid(relation))
779 : : {
780 : 34 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
781 : :
493 akapila@postgresql.o 782 : 34 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
2167 peter@eisentraut.org 783 : 34 : RelationClose(ancestor);
784 : : }
785 : :
493 akapila@postgresql.o 786 : 349 : send_relation_and_attrs(relation, xid, ctx, relentry);
787 : :
899 michael@paquier.xyz 788 [ + + ]: 349 : if (data->in_streaming)
2019 akapila@postgresql.o 789 : 66 : set_schema_sent_in_streamed_txn(relentry, topxid);
790 : : else
791 : 283 : relentry->schema_sent = true;
792 : : }
793 : :
794 : : /*
795 : : * Sends a relation
796 : : */
797 : : static void
798 : 383 : send_relation_and_attrs(Relation relation, TransactionId xid,
799 : : LogicalDecodingContext *ctx,
800 : : RelationSyncEntry *relentry)
801 : : {
2167 peter@eisentraut.org 802 : 383 : TupleDesc desc = RelationGetDescr(relation);
493 akapila@postgresql.o 803 : 383 : Bitmapset *columns = relentry->columns;
416 804 : 383 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
805 : : int i;
806 : :
807 : : /*
808 : : * Write out type info if needed. We do that only for user-created types.
809 : : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
810 : : * objects with hand-assigned OIDs to be "built in", not for instance any
811 : : * function or type defined in the information_schema. This is important
812 : : * because only hand-assigned OIDs can be expected to remain stable across
813 : : * major versions.
814 : : */
2167 peter@eisentraut.org 815 [ + + ]: 1179 : for (i = 0; i < desc->natts; i++)
816 : : {
817 : 796 : Form_pg_attribute att = TupleDescAttr(desc, i);
818 : :
416 akapila@postgresql.o 819 [ + + ]: 796 : if (!logicalrep_should_publish_column(att, columns,
820 : : include_gencols_type))
2167 peter@eisentraut.org 821 : 71 : continue;
822 : :
823 [ + + ]: 725 : if (att->atttypid < FirstGenbkiObjectId)
824 : 707 : continue;
825 : :
2899 peter_e@gmx.net 826 : 18 : OutputPluginPrepareWrite(ctx, false);
2019 akapila@postgresql.o 827 : 18 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
2899 peter_e@gmx.net 828 : 18 : OutputPluginWrite(ctx, false);
829 : : }
830 : :
2167 peter@eisentraut.org 831 : 383 : OutputPluginPrepareWrite(ctx, false);
416 akapila@postgresql.o 832 : 383 : logicalrep_write_rel(ctx->out, xid, relation, columns,
833 : : include_gencols_type);
2167 peter@eisentraut.org 834 : 383 : OutputPluginWrite(ctx, false);
2899 peter_e@gmx.net 835 : 383 : }
836 : :
837 : : /*
838 : : * Executor state preparation for evaluation of row filter expressions for the
839 : : * specified relation.
840 : : */
841 : : static EState *
1482 akapila@postgresql.o 842 : 17 : create_estate_for_relation(Relation rel)
843 : : {
844 : : EState *estate;
845 : : RangeTblEntry *rte;
1105 tgl@sss.pgh.pa.us 846 : 17 : List *perminfos = NIL;
847 : :
1482 akapila@postgresql.o 848 : 17 : estate = CreateExecutorState();
849 : :
850 : 17 : rte = makeNode(RangeTblEntry);
851 : 17 : rte->rtekind = RTE_RELATION;
852 : 17 : rte->relid = RelationGetRelid(rel);
853 : 17 : rte->relkind = rel->rd_rel->relkind;
854 : 17 : rte->rellockmode = AccessShareLock;
855 : :
1105 tgl@sss.pgh.pa.us 856 : 17 : addRTEPermissionInfo(&perminfos, rte);
857 : :
401 amitlan@postgresql.o 858 : 17 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
859 : : bms_make_singleton(1));
860 : :
1482 akapila@postgresql.o 861 : 17 : estate->es_output_cid = GetCurrentCommandId(false);
862 : :
863 : 17 : return estate;
864 : : }
865 : :
866 : : /*
867 : : * Evaluates row filter.
868 : : *
869 : : * If the row filter evaluates to NULL, it is taken as false i.e. the change
870 : : * isn't replicated.
871 : : */
872 : : static bool
873 : 38 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
874 : : {
875 : : Datum ret;
876 : : bool isnull;
877 : :
878 [ - + ]: 38 : Assert(state != NULL);
879 : :
880 : 38 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
881 : :
882 [ - + - - : 38 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
- - - - ]
883 : : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
884 : : isnull ? "true" : "false");
885 : :
886 [ + + ]: 38 : if (isnull)
887 : 1 : return false;
888 : :
889 : 37 : return DatumGetBool(ret);
890 : : }
891 : :
892 : : /*
893 : : * Make sure the per-entry memory context exists.
894 : : */
895 : : static void
1450 tomas.vondra@postgre 896 : 335 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
897 : : {
898 : : Relation relation;
899 : :
900 : : /* The context may already exist, in which case bail out. */
901 [ + + ]: 335 : if (entry->entry_cxt)
902 : 17 : return;
903 : :
904 : 318 : relation = RelationIdGetRelation(entry->publish_as_relid);
905 : :
906 : 318 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
907 : : "entry private context",
908 : : ALLOCSET_SMALL_SIZES);
909 : :
910 : 318 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
911 : : RelationGetRelationName(relation));
912 : : }
913 : :
914 : : /*
915 : : * Initialize the row filter.
916 : : */
917 : : static void
1482 akapila@postgresql.o 918 : 318 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
919 : : RelationSyncEntry *entry)
920 : : {
921 : : ListCell *lc;
922 : 318 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
923 : 318 : bool no_filter[] = {false, false, false}; /* One per pubaction */
924 : : MemoryContext oldctx;
925 : : int idx;
926 : 318 : bool has_filter = true;
1269 927 : 318 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
928 : :
929 : : /*
930 : : * Find if there are any row filters for this relation. If there are, then
931 : : * prepare the necessary ExprState and cache it in entry->exprstate. To
932 : : * build an expression state, we need to ensure the following:
933 : : *
934 : : * All the given publication-table mappings must be checked.
935 : : *
936 : : * Multiple publications might have multiple row filters for this
937 : : * relation. Since row filter usage depends on the DML operation, there
938 : : * are multiple lists (one for each operation) to which row filters will
939 : : * be appended.
940 : : *
941 : : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
942 : : * expression" so it takes precedence.
943 : : */
1482 944 [ + - + + : 339 : foreach(lc, publications)
+ + ]
945 : : {
946 : 322 : Publication *pub = lfirst(lc);
947 : 322 : HeapTuple rftuple = NULL;
948 : 322 : Datum rfdatum = 0;
1269 949 : 322 : bool pub_no_filter = true;
950 : :
951 : : /*
952 : : * If the publication is FOR ALL TABLES, or the publication includes a
953 : : * FOR TABLES IN SCHEMA where the table belongs to the referred
954 : : * schema, then it is treated the same as if there are no row filters
955 : : * (even if other publications have a row filter).
956 : : */
957 [ + + ]: 322 : if (!pub->alltables &&
958 [ + + ]: 238 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
959 : : ObjectIdGetDatum(schemaid),
960 : : ObjectIdGetDatum(pub->oid)))
961 : : {
962 : : /*
963 : : * Check for the presence of a row filter in this publication.
964 : : */
1482 965 : 231 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
966 : : ObjectIdGetDatum(entry->publish_as_relid),
967 : : ObjectIdGetDatum(pub->oid));
968 : :
969 [ + + ]: 231 : if (HeapTupleIsValid(rftuple))
970 : : {
971 : : /* Null indicates no filter. */
972 : 219 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
973 : : Anum_pg_publication_rel_prqual,
974 : : &pub_no_filter);
975 : : }
976 : : }
977 : :
978 [ + + ]: 322 : if (pub_no_filter)
979 : : {
980 [ + + ]: 308 : if (rftuple)
981 : 205 : ReleaseSysCache(rftuple);
982 : :
983 : 308 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
984 : 308 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
985 : 308 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
986 : :
987 : : /*
988 : : * Quick exit if all the DML actions are publicized via this
989 : : * publication.
990 : : */
991 [ + - ]: 308 : if (no_filter[PUBACTION_INSERT] &&
992 [ + + ]: 308 : no_filter[PUBACTION_UPDATE] &&
993 [ + - ]: 301 : no_filter[PUBACTION_DELETE])
994 : : {
995 : 301 : has_filter = false;
996 : 301 : break;
997 : : }
998 : :
999 : : /* No additional work for this publication. Next one. */
1000 : 7 : continue;
1001 : : }
1002 : :
1003 : : /* Form the per pubaction row filter lists. */
1004 [ + - + - ]: 14 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
1005 : 14 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
1006 : 14 : TextDatumGetCString(rfdatum));
1007 [ + - + - ]: 14 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
1008 : 14 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
1009 : 14 : TextDatumGetCString(rfdatum));
1010 [ + - + - ]: 14 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
1011 : 14 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
1012 : 14 : TextDatumGetCString(rfdatum));
1013 : :
1014 : 14 : ReleaseSysCache(rftuple);
1015 : : } /* loop all subscribed publications */
1016 : :
1017 : : /* Clean the row filter */
1018 [ + + ]: 1272 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1019 : : {
1020 [ + + ]: 954 : if (no_filter[idx])
1021 : : {
1022 : 912 : list_free_deep(rfnodes[idx]);
1023 : 912 : rfnodes[idx] = NIL;
1024 : : }
1025 : : }
1026 : :
1027 [ + + ]: 318 : if (has_filter)
1028 : : {
1029 : 17 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1030 : :
1450 tomas.vondra@postgre 1031 : 17 : pgoutput_ensure_entry_cxt(data, entry);
1032 : :
1033 : : /*
1034 : : * Now all the filters for all pubactions are known. Combine them when
1035 : : * their pubactions are the same.
1036 : : */
1037 : 17 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1482 akapila@postgresql.o 1038 : 17 : entry->estate = create_estate_for_relation(relation);
1039 [ + + ]: 68 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1040 : : {
1041 : 51 : List *filters = NIL;
1042 : : Expr *rfnode;
1043 : :
1044 [ + + ]: 51 : if (rfnodes[idx] == NIL)
1045 : 21 : continue;
1046 : :
1047 [ + - + + : 63 : foreach(lc, rfnodes[idx])
+ + ]
401 peter@eisentraut.org 1048 : 33 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1049 : :
1050 : : /* combine the row filter and cache the ExprState */
1482 akapila@postgresql.o 1051 : 30 : rfnode = make_orclause(filters);
1052 : 30 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1053 : : } /* for each pubaction */
1054 : 17 : MemoryContextSwitchTo(oldctx);
1055 : :
1056 : 17 : RelationClose(relation);
1057 : : }
1058 : 318 : }
1059 : :
1060 : : /*
1061 : : * If the table contains a generated column, check for any conflicting
1062 : : * values of 'publish_generated_columns' parameter in the publications.
1063 : : */
1064 : : static void
493 1065 : 318 : check_and_init_gencol(PGOutputData *data, List *publications,
1066 : : RelationSyncEntry *entry)
1067 : : {
1068 : 318 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1069 : 318 : TupleDesc desc = RelationGetDescr(relation);
1070 : 318 : bool gencolpresent = false;
1071 : 318 : bool first = true;
1072 : :
1073 : : /* Check if there is any generated column present. */
1074 [ + + ]: 967 : for (int i = 0; i < desc->natts; i++)
1075 : : {
144 drowley@postgresql.o 1076 :GNC 656 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1077 : :
493 akapila@postgresql.o 1078 [ + + ]:CBC 656 : if (att->attgenerated)
1079 : : {
1080 : 7 : gencolpresent = true;
1081 : 7 : break;
1082 : : }
1083 : : }
1084 : :
1085 : : /* There are no generated columns to be published. */
1086 [ + + ]: 318 : if (!gencolpresent)
1087 : : {
416 1088 : 311 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
493 1089 : 311 : return;
1090 : : }
1091 : :
1092 : : /*
1093 : : * There may be a conflicting value for 'publish_generated_columns'
1094 : : * parameter in the publications.
1095 : : */
1096 [ + - + + : 22 : foreach_ptr(Publication, pub, publications)
+ + ]
1097 : : {
1098 : : /*
1099 : : * The column list takes precedence over the
1100 : : * 'publish_generated_columns' parameter. Those will be checked later,
1101 : : * see pgoutput_column_list_init.
1102 : : */
1103 [ + + ]: 8 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1104 : 3 : continue;
1105 : :
1106 [ + - ]: 5 : if (first)
1107 : : {
416 1108 : 5 : entry->include_gencols_type = pub->pubgencols_type;
493 1109 : 5 : first = false;
1110 : : }
416 akapila@postgresql.o 1111 [ # # ]:UBC 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
493 1112 [ # # ]: 0 : ereport(ERROR,
1113 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1114 : : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1115 : : get_namespace_name(RelationGetNamespace(relation)),
1116 : : RelationGetRelationName(relation)));
1117 : : }
1118 : : }
1119 : :
1120 : : /*
1121 : : * Initialize the column list.
1122 : : */
1123 : : static void
1450 tomas.vondra@postgre 1124 :CBC 318 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1125 : : RelationSyncEntry *entry)
1126 : : {
1127 : : ListCell *lc;
1382 akapila@postgresql.o 1128 : 318 : bool first = true;
1129 : 318 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
493 1130 : 318 : bool found_pub_collist = false;
1131 : 318 : Bitmapset *relcols = NULL;
1132 : :
1133 : 318 : pgoutput_ensure_entry_cxt(data, entry);
1134 : :
1135 : : /*
1136 : : * Find if there are any column lists for this relation. If there are,
1137 : : * build a bitmap using the column lists.
1138 : : *
1139 : : * Multiple publications might have multiple column lists for this
1140 : : * relation.
1141 : : *
1142 : : * Note that we don't support the case where the column list is different
1143 : : * for the same table when combining publications. See comments atop
1144 : : * fetch_relation_list. But one can later change the publication so we
1145 : : * still need to check all the given publication-table mappings and report
1146 : : * an error if any publications have a different column list.
1147 : : */
1450 tomas.vondra@postgre 1148 [ + - + + : 646 : foreach(lc, publications)
+ + ]
1149 : : {
1150 : 329 : Publication *pub = lfirst(lc);
1382 akapila@postgresql.o 1151 : 329 : Bitmapset *cols = NULL;
1152 : :
1153 : : /* Retrieve the bitmap of columns for a column list publication. */
493 1154 : 329 : found_pub_collist |= check_and_fetch_column_list(pub,
1155 : : entry->publish_as_relid,
1156 : : entry->entry_cxt, &cols);
1157 : :
1158 : : /*
1159 : : * For non-column list publications — e.g. TABLE (without a column
1160 : : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1161 : : * of the table (including generated columns when
1162 : : * 'publish_generated_columns' parameter is true).
1163 : : */
1164 [ + + ]: 329 : if (!cols)
1165 : : {
1166 : : /*
1167 : : * Cache the table columns for the first publication with no
1168 : : * specified column list to detect publication with a different
1169 : : * column list.
1170 : : */
1171 [ + + + + ]: 290 : if (!relcols && (list_length(publications) > 1))
1172 : : {
1173 : 9 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1174 : :
416 1175 : 9 : relcols = pub_form_cols_map(relation,
1176 : : entry->include_gencols_type);
493 1177 : 9 : MemoryContextSwitchTo(oldcxt);
1178 : : }
1179 : :
1180 : 290 : cols = relcols;
1181 : : }
1182 : :
1382 1183 [ + + ]: 329 : if (first)
1184 : : {
1185 : 318 : entry->columns = cols;
1186 : 318 : first = false;
1187 : : }
1188 [ + + ]: 11 : else if (!bms_equal(entry->columns, cols))
1189 [ + - ]: 1 : ereport(ERROR,
1190 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1191 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1192 : : get_namespace_name(RelationGetNamespace(relation)),
1193 : : RelationGetRelationName(relation)));
1194 : : } /* loop all subscribed publications */
1195 : :
1196 : : /*
1197 : : * If no column list publications exist, columns to be published will be
1198 : : * computed later according to the 'publish_generated_columns' parameter.
1199 : : */
493 1200 [ + + ]: 317 : if (!found_pub_collist)
1201 : 281 : entry->columns = NULL;
1202 : :
1382 1203 : 317 : RelationClose(relation);
1450 tomas.vondra@postgre 1204 : 317 : }
1205 : :
1206 : : /*
1207 : : * Initialize the slot for storing new and old tuples, and build the map that
1208 : : * will be used to convert the relation's tuples into the ancestor's format.
1209 : : */
1210 : : static void
1482 akapila@postgresql.o 1211 : 318 : init_tuple_slot(PGOutputData *data, Relation relation,
1212 : : RelationSyncEntry *entry)
1213 : : {
1214 : : MemoryContext oldctx;
1215 : : TupleDesc oldtupdesc;
1216 : : TupleDesc newtupdesc;
1217 : :
1218 : 318 : oldctx = MemoryContextSwitchTo(data->cachectx);
1219 : :
1220 : : /*
1221 : : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1222 : : * live as long as the cache remains.
1223 : : */
839 1224 : 318 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1225 : 318 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1226 : :
1482 1227 : 318 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1228 : 318 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1229 : :
1230 : 318 : MemoryContextSwitchTo(oldctx);
1231 : :
1232 : : /*
1233 : : * Cache the map that will be used to convert the relation's tuples into
1234 : : * the ancestor's format, if needed.
1235 : : */
1236 [ + + ]: 318 : if (entry->publish_as_relid != RelationGetRelid(relation))
1237 : : {
1238 : 39 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1239 : 39 : TupleDesc indesc = RelationGetDescr(relation);
1240 : 39 : TupleDesc outdesc = RelationGetDescr(ancestor);
1241 : :
1242 : : /* Map must live as long as the logical decoding context. */
440 michael@paquier.xyz 1243 : 39 : oldctx = MemoryContextSwitchTo(data->cachectx);
1244 : :
1202 alvherre@alvh.no-ip. 1245 : 39 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1246 : :
1482 akapila@postgresql.o 1247 : 39 : MemoryContextSwitchTo(oldctx);
1248 : 39 : RelationClose(ancestor);
1249 : : }
1250 : 318 : }
1251 : :
1252 : : /*
1253 : : * Change is checked against the row filter if any.
1254 : : *
1255 : : * Returns true if the change is to be replicated, else false.
1256 : : *
1257 : : * For inserts, evaluate the row filter for new tuple.
1258 : : * For deletes, evaluate the row filter for old tuple.
1259 : : * For updates, evaluate the row filter for old and new tuple.
1260 : : *
1261 : : * For updates, if both evaluations are true, we allow sending the UPDATE and
1262 : : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1263 : : * only one of the tuples matches the row filter expression, we transform
1264 : : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1265 : : * following rules:
1266 : : *
1267 : : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1268 : : * Case 2: old-row (no match) new row (match) -> INSERT
1269 : : * Case 3: old-row (match) new-row (no match) -> DELETE
1270 : : * Case 4: old-row (match) new row (match) -> UPDATE
1271 : : *
1272 : : * The new action is updated in the action parameter.
1273 : : *
1274 : : * The new slot could be updated when transforming the UPDATE into INSERT,
1275 : : * because the original new tuple might not have column values from the replica
1276 : : * identity.
1277 : : *
1278 : : * Examples:
1279 : : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1280 : : * Since the old tuple satisfies, the initial table synchronization copied this
1281 : : * row (or another method was used to guarantee that there is data
1282 : : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1283 : : * row filter, so from a data consistency perspective, that row should be
1284 : : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1285 : : * statement and be sent to the subscriber. Keeping this row on the subscriber
1286 : : * is undesirable because it doesn't reflect what was defined in the row filter
1287 : : * expression on the publisher. This row on the subscriber would likely not be
1288 : : * modified by replication again. If someone inserted a new row with the same
1289 : : * old identifier, replication could stop due to a constraint violation.
1290 : : *
1291 : : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1292 : : * Since the old tuple doesn't satisfy, the initial table synchronization
1293 : : * probably didn't copy this row. However, after the UPDATE the new tuple does
1294 : : * satisfy the row filter, so from a data consistency perspective, that row
1295 : : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1296 : : * statements have no effect (it matches no row -- see
1297 : : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1298 : : * INSERT statement and be sent to the subscriber. However, this might surprise
1299 : : * someone who expects the data set to satisfy the row filter expression on the
1300 : : * provider.
1301 : : */
1302 : : static bool
1303 : 182262 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1304 : : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1305 : : ReorderBufferChangeType *action)
1306 : : {
1307 : : TupleDesc desc;
1308 : : int i;
1309 : : bool old_matched,
1310 : : new_matched,
1311 : : result;
1312 : : TupleTableSlot *tmp_new_slot;
1313 : 182262 : TupleTableSlot *new_slot = *new_slot_ptr;
1314 : : ExprContext *ecxt;
1315 : : ExprState *filter_exprstate;
1316 : :
1317 : : /*
1318 : : * We need this map to avoid relying on ReorderBufferChangeType enums
1319 : : * having specific values.
1320 : : */
1321 : : static const int map_changetype_pubaction[] = {
1322 : : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1323 : : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1324 : : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1325 : : };
1326 : :
1327 [ + + + + : 182262 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
- + ]
1328 : : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1329 : : *action == REORDER_BUFFER_CHANGE_DELETE);
1330 : :
1331 [ + + - + ]: 182262 : Assert(new_slot || old_slot);
1332 : :
1333 : : /* Get the corresponding row filter */
1334 : 182262 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1335 : :
1336 : : /* Bail out if there is no row filter */
1337 [ + + ]: 182262 : if (!filter_exprstate)
1338 : 182228 : return true;
1339 : :
1340 [ - + ]: 34 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1341 : : get_namespace_name(RelationGetNamespace(relation)),
1342 : : RelationGetRelationName(relation));
1343 : :
1344 [ + + ]: 34 : ResetPerTupleExprContext(entry->estate);
1345 : :
1346 [ + + ]: 34 : ecxt = GetPerTupleExprContext(entry->estate);
1347 : :
1348 : : /*
1349 : : * For the following occasions where there is only one tuple, we can
1350 : : * evaluate the row filter for that tuple and return.
1351 : : *
1352 : : * For inserts, we only have the new tuple.
1353 : : *
1354 : : * For updates, we can have only a new tuple when none of the replica
1355 : : * identity columns changed and none of those columns have external data
1356 : : * but we still need to evaluate the row filter for the new tuple as the
1357 : : * existing values of those columns might not match the filter. Also,
1358 : : * users can use constant expressions in the row filter, so we anyway need
1359 : : * to evaluate it for the new tuple.
1360 : : *
1361 : : * For deletes, we only have the old tuple.
1362 : : */
1363 [ + + + + ]: 34 : if (!new_slot || !old_slot)
1364 : : {
1365 [ + + ]: 30 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1366 : 30 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1367 : :
1368 : 30 : return result;
1369 : : }
1370 : :
1371 : : /*
1372 : : * Both the old and new tuples must be valid only for updates and need to
1373 : : * be checked against the row filter.
1374 : : */
1375 [ - + ]: 4 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1376 : :
1377 : 4 : slot_getallattrs(new_slot);
1378 : 4 : slot_getallattrs(old_slot);
1379 : :
1380 : 4 : tmp_new_slot = NULL;
1381 : 4 : desc = RelationGetDescr(relation);
1382 : :
1383 : : /*
1384 : : * The new tuple might not have all the replica identity columns, in which
1385 : : * case it needs to be copied over from the old tuple.
1386 : : */
1387 [ + + ]: 12 : for (i = 0; i < desc->natts; i++)
1388 : : {
450 drowley@postgresql.o 1389 : 8 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1390 : :
1391 : : /*
1392 : : * if the column in the new tuple or old tuple is null, nothing to do
1393 : : */
1482 akapila@postgresql.o 1394 [ + + - + ]: 8 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1395 : 1 : continue;
1396 : :
1397 : : /*
1398 : : * Unchanged toasted replica identity columns are only logged in the
1399 : : * old tuple. Copy this over to the new tuple. The changed (or WAL
1400 : : * Logged) toast values are always assembled in memory and set as
1401 : : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1402 : : */
1403 [ + + + + ]: 11 : if (att->attlen == -1 &&
222 peter@eisentraut.org 1404 :GNC 4 : VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1405 [ + - ]: 1 : !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1406 : : {
1482 akapila@postgresql.o 1407 [ + - ]:CBC 1 : if (!tmp_new_slot)
1408 : : {
1409 : 1 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1410 : 1 : ExecClearTuple(tmp_new_slot);
1411 : :
1412 : 1 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1413 : 1 : desc->natts * sizeof(Datum));
1414 : 1 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1415 : 1 : desc->natts * sizeof(bool));
1416 : : }
1417 : :
1418 : 1 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1419 : 1 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1420 : : }
1421 : : }
1422 : :
1423 : 4 : ecxt->ecxt_scantuple = old_slot;
1424 : 4 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1425 : :
1426 [ + + ]: 4 : if (tmp_new_slot)
1427 : : {
1428 : 1 : ExecStoreVirtualTuple(tmp_new_slot);
1429 : 1 : ecxt->ecxt_scantuple = tmp_new_slot;
1430 : : }
1431 : : else
1432 : 3 : ecxt->ecxt_scantuple = new_slot;
1433 : :
1434 : 4 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1435 : :
1436 : : /*
1437 : : * Case 1: if both tuples don't match the row filter, bailout. Send
1438 : : * nothing.
1439 : : */
1440 [ + + - + ]: 4 : if (!old_matched && !new_matched)
1482 akapila@postgresql.o 1441 :UBC 0 : return false;
1442 : :
1443 : : /*
1444 : : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1445 : : * tuple does, transform the UPDATE into INSERT.
1446 : : *
1447 : : * Use the newly transformed tuple that must contain the column values for
1448 : : * all the replica identity columns. This is required to ensure that the
1449 : : * while inserting the tuple in the downstream node, we have all the
1450 : : * required column values.
1451 : : */
1482 akapila@postgresql.o 1452 [ + + + - ]:CBC 4 : if (!old_matched && new_matched)
1453 : : {
1454 : 2 : *action = REORDER_BUFFER_CHANGE_INSERT;
1455 : :
1456 [ + + ]: 2 : if (tmp_new_slot)
1457 : 1 : *new_slot_ptr = tmp_new_slot;
1458 : : }
1459 : :
1460 : : /*
1461 : : * Case 3: if the old tuple satisfies the row filter but the new tuple
1462 : : * doesn't, transform the UPDATE into DELETE.
1463 : : *
1464 : : * This transformation does not require another tuple. The Old tuple will
1465 : : * be used for DELETE.
1466 : : */
1467 [ + - + + ]: 2 : else if (old_matched && !new_matched)
1468 : 1 : *action = REORDER_BUFFER_CHANGE_DELETE;
1469 : :
1470 : : /*
1471 : : * Case 4: if both tuples match the row filter, transformation isn't
1472 : : * required. (*action is default UPDATE).
1473 : : */
1474 : :
1475 : 4 : return true;
1476 : : }
1477 : :
1478 : : /*
1479 : : * Sends the decoded DML over wire.
1480 : : *
1481 : : * This is called both in streaming and non-streaming modes.
1482 : : */
1483 : : static void
3342 peter_e@gmx.net 1484 : 186517 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1485 : : Relation relation, ReorderBufferChange *change)
1486 : : {
3224 bruce@momjian.us 1487 : 186517 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1446 akapila@postgresql.o 1488 : 186517 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1489 : : MemoryContext old;
1490 : : RelationSyncEntry *relentry;
2019 1491 : 186517 : TransactionId xid = InvalidTransactionId;
1888 1492 : 186517 : Relation ancestor = NULL;
1482 1493 : 186517 : Relation targetrel = relation;
1494 : 186517 : ReorderBufferChangeType action = change->action;
1495 : 186517 : TupleTableSlot *old_slot = NULL;
1496 : 186517 : TupleTableSlot *new_slot = NULL;
1497 : :
2942 peter_e@gmx.net 1498 [ - + ]: 186517 : if (!is_publishable_relation(relation))
1499 : 4254 : return;
1500 : :
1501 : : /*
1502 : : * Remember the xid for the change in streaming mode. We need to send xid
1503 : : * with each change in the streaming mode so that subscriber can make
1504 : : * their association and on aborts, it can discard the corresponding
1505 : : * changes.
1506 : : */
899 michael@paquier.xyz 1507 [ + + ]: 186517 : if (data->in_streaming)
2019 akapila@postgresql.o 1508 : 175903 : xid = change->txn->xid;
1509 : :
1482 1510 : 186517 : relentry = get_rel_sync_entry(data, relation);
1511 : :
1512 : : /* First check the table filter */
1513 [ + + + - ]: 186516 : switch (action)
1514 : : {
3342 peter_e@gmx.net 1515 : 109091 : case REORDER_BUFFER_CHANGE_INSERT:
1516 [ + + ]: 109091 : if (!relentry->pubactions.pubinsert)
1517 : 3144 : return;
1518 : 105947 : break;
1519 : 34483 : case REORDER_BUFFER_CHANGE_UPDATE:
1520 [ + + ]: 34483 : if (!relentry->pubactions.pubupdate)
1521 : 44 : return;
1522 : 34439 : break;
1523 : 42942 : case REORDER_BUFFER_CHANGE_DELETE:
1524 [ + + ]: 42942 : if (!relentry->pubactions.pubdelete)
1525 : 1066 : return;
1526 : :
1527 : : /*
1528 : : * This is only possible if deletes are allowed even when replica
1529 : : * identity is not defined for a table. Since the DELETE action
1530 : : * can't be published, we simply return.
1531 : : */
1081 akapila@postgresql.o 1532 [ - + ]: 41876 : if (!change->data.tp.oldtuple)
1533 : : {
1081 akapila@postgresql.o 1534 [ # # ]:UBC 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1535 : 0 : return;
1536 : : }
3342 peter_e@gmx.net 1537 :CBC 41876 : break;
3342 peter_e@gmx.net 1538 :UBC 0 : default:
1539 : 0 : Assert(false);
1540 : : }
1541 : :
1542 : : /* Avoid leaking memory by using and resetting our own context */
3342 peter_e@gmx.net 1543 :CBC 182262 : old = MemoryContextSwitchTo(data->context);
1544 : :
1545 : : /* Switch relation if publishing via root. */
1081 akapila@postgresql.o 1546 [ + + ]: 182262 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1547 : : {
1548 [ - + ]: 70 : Assert(relation->rd_rel->relispartition);
1549 : 70 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1550 : 70 : targetrel = ancestor;
1551 : : }
1552 : :
1553 [ + + ]: 182262 : if (change->data.tp.oldtuple)
1554 : : {
1555 : 42014 : old_slot = relentry->old_slot;
776 msawada@postgresql.o 1556 : 42014 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1557 : :
1558 : : /* Convert tuple if needed. */
1081 akapila@postgresql.o 1559 [ + + ]: 42014 : if (relentry->attrmap)
1560 : : {
1561 : 5 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1562 : : &TTSOpsVirtual);
1563 : :
1564 : 5 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1565 : : }
1566 : : }
1567 : :
1568 [ + + ]: 182262 : if (change->data.tp.newtuple)
1569 : : {
1570 : 140386 : new_slot = relentry->new_slot;
776 msawada@postgresql.o 1571 : 140386 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1572 : :
1573 : : /* Convert tuple if needed. */
1081 akapila@postgresql.o 1574 [ + + ]: 140386 : if (relentry->attrmap)
1575 : : {
1576 : 20 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1577 : : &TTSOpsVirtual);
1578 : :
1579 : 20 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1580 : : }
1581 : : }
1582 : :
1583 : : /*
1584 : : * Check row filter.
1585 : : *
1586 : : * Updates could be transformed to inserts or deletes based on the results
1587 : : * of the row filter for old and new tuple.
1588 : : */
1589 [ + + ]: 182262 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1590 : 12 : goto cleanup;
1591 : :
1592 : : /*
1593 : : * Send BEGIN if we haven't yet.
1594 : : *
1595 : : * We send the BEGIN message after ensuring that we will actually send the
1596 : : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1597 : : * transactions.
1598 : : */
1599 [ + + + + ]: 182250 : if (txndata && !txndata->sent_begin_txn)
1600 : 452 : pgoutput_send_begin(ctx, txn);
1601 : :
1602 : : /*
1603 : : * Schema should be sent using the original relation because it also sends
1604 : : * the ancestor's relation.
1605 : : */
1606 : 182249 : maybe_send_schema(ctx, change, relation, relentry);
1607 : :
1608 : 182249 : OutputPluginPrepareWrite(ctx, true);
1609 : :
1610 : : /* Send the data */
1611 [ + + + - ]: 182249 : switch (action)
1612 : : {
1613 : 105936 : case REORDER_BUFFER_CHANGE_INSERT:
1614 : 105936 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
493 1615 : 105936 : data->binary, relentry->columns,
1616 : : relentry->include_gencols_type);
1081 1617 : 105936 : break;
1618 : 34436 : case REORDER_BUFFER_CHANGE_UPDATE:
1619 : 34436 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
493 1620 : 34436 : new_slot, data->binary, relentry->columns,
1621 : : relentry->include_gencols_type);
1482 1622 : 34436 : break;
3342 peter_e@gmx.net 1623 : 41877 : case REORDER_BUFFER_CHANGE_DELETE:
1081 akapila@postgresql.o 1624 : 41877 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
493 1625 : 41877 : data->binary, relentry->columns,
1626 : : relentry->include_gencols_type);
3342 peter_e@gmx.net 1627 : 41877 : break;
3342 peter_e@gmx.net 1628 :UBC 0 : default:
1629 : 0 : Assert(false);
1630 : : }
1631 : :
1081 akapila@postgresql.o 1632 :CBC 182249 : OutputPluginWrite(ctx, true);
1633 : :
1634 : 182261 : cleanup:
1888 1635 [ + + ]: 182261 : if (RelationIsValid(ancestor))
1636 : : {
1637 : 69 : RelationClose(ancestor);
1638 : 69 : ancestor = NULL;
1639 : : }
1640 : :
1641 : : /* Drop the new slots that were used to store the converted tuples. */
626 1642 [ + + ]: 182261 : if (relentry->attrmap)
1643 : : {
1644 [ + + ]: 25 : if (old_slot)
1645 : 5 : ExecDropSingleTupleTableSlot(old_slot);
1646 : :
1647 [ + + ]: 25 : if (new_slot)
1648 : 20 : ExecDropSingleTupleTableSlot(new_slot);
1649 : : }
1650 : :
3342 peter_e@gmx.net 1651 : 182261 : MemoryContextSwitchTo(old);
1652 : 182261 : MemoryContextReset(data->context);
1653 : : }
1654 : :
1655 : : static void
2899 1656 : 22 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1657 : : int nrelations, Relation relations[], ReorderBufferChange *change)
1658 : : {
1659 : 22 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1446 akapila@postgresql.o 1660 : 22 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1661 : : MemoryContext old;
1662 : : RelationSyncEntry *relentry;
1663 : : int i;
1664 : : int nrelids;
1665 : : Oid *relids;
2019 1666 : 22 : TransactionId xid = InvalidTransactionId;
1667 : :
1668 : : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
899 michael@paquier.xyz 1669 [ - + ]: 22 : if (data->in_streaming)
2019 akapila@postgresql.o 1670 :UBC 0 : xid = change->txn->xid;
1671 : :
2899 peter_e@gmx.net 1672 :CBC 22 : old = MemoryContextSwitchTo(data->context);
1673 : :
1674 : 22 : relids = palloc0(nrelations * sizeof(Oid));
1675 : 22 : nrelids = 0;
1676 : :
1677 [ + + ]: 63 : for (i = 0; i < nrelations; i++)
1678 : : {
1679 : 41 : Relation relation = relations[i];
1680 : 41 : Oid relid = RelationGetRelid(relation);
1681 : :
1682 [ - + ]: 41 : if (!is_publishable_relation(relation))
2899 peter_e@gmx.net 1683 :UBC 0 : continue;
1684 : :
1482 akapila@postgresql.o 1685 :CBC 41 : relentry = get_rel_sync_entry(data, relation);
1686 : :
2899 peter_e@gmx.net 1687 [ + + ]: 41 : if (!relentry->pubactions.pubtruncate)
1688 : 20 : continue;
1689 : :
1690 : : /*
1691 : : * Don't send partitions if the publication wants to send only the
1692 : : * root tables through it.
1693 : : */
2167 peter@eisentraut.org 1694 [ + + ]: 21 : if (relation->rd_rel->relispartition &&
1695 [ + + ]: 15 : relentry->publish_as_relid != relid)
2196 1696 : 3 : continue;
1697 : :
2899 peter_e@gmx.net 1698 : 18 : relids[nrelids++] = relid;
1699 : :
1700 : : /* Send BEGIN if we haven't yet */
1446 akapila@postgresql.o 1701 [ + - + + ]: 18 : if (txndata && !txndata->sent_begin_txn)
1702 : 12 : pgoutput_send_begin(ctx, txn);
1703 : :
1683 fujii@postgresql.org 1704 : 18 : maybe_send_schema(ctx, change, relation, relentry);
1705 : : }
1706 : :
2876 peter_e@gmx.net 1707 [ + + ]: 22 : if (nrelids > 0)
1708 : : {
1709 : 12 : OutputPluginPrepareWrite(ctx, true);
1710 : 12 : logicalrep_write_truncate(ctx->out,
1711 : : xid,
1712 : : nrelids,
1713 : : relids,
1714 : 12 : change->data.truncate.cascade,
1715 : 12 : change->data.truncate.restart_seqs);
1716 : 12 : OutputPluginWrite(ctx, true);
1717 : : }
1718 : :
2899 1719 : 22 : MemoryContextSwitchTo(old);
1720 : 22 : MemoryContextReset(data->context);
1721 : 22 : }
1722 : :
1723 : : static void
1804 akapila@postgresql.o 1724 : 7 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1725 : : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1726 : : const char *message)
1727 : : {
1728 : 7 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1729 : 7 : TransactionId xid = InvalidTransactionId;
1730 : :
1731 [ + + ]: 7 : if (!data->messages)
1732 : 2 : return;
1733 : :
1734 : : /*
1735 : : * Remember the xid for the message in streaming mode. See
1736 : : * pgoutput_change.
1737 : : */
899 michael@paquier.xyz 1738 [ - + ]: 5 : if (data->in_streaming)
1804 akapila@postgresql.o 1739 :UBC 0 : xid = txn->xid;
1740 : :
1741 : : /*
1742 : : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1743 : : */
1446 akapila@postgresql.o 1744 [ + + ]:CBC 5 : if (transactional)
1745 : : {
1746 : 2 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1747 : :
1748 : : /* Send BEGIN if we haven't yet */
1749 [ + - + - ]: 2 : if (txndata && !txndata->sent_begin_txn)
1750 : 2 : pgoutput_send_begin(ctx, txn);
1751 : : }
1752 : :
1804 1753 : 5 : OutputPluginPrepareWrite(ctx, true);
1754 : 5 : logicalrep_write_message(ctx->out,
1755 : : xid,
1756 : : message_lsn,
1757 : : transactional,
1758 : : prefix,
1759 : : sz,
1760 : : message);
1761 : 5 : OutputPluginWrite(ctx, true);
1762 : : }
1763 : :
1764 : : /*
1765 : : * Return true if the data is associated with an origin and the user has
1766 : : * requested the changes that don't have an origin, false otherwise.
1767 : : */
1768 : : static bool
3342 peter_e@gmx.net 1769 : 366300 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1770 : : ReplOriginId origin_id)
1771 : : {
900 akapila@postgresql.o 1772 : 366300 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1773 : :
46 msawada@postgresql.o 1774 [ + + + + ]:GNC 366300 : if (data->publish_no_origin && origin_id != InvalidReplOriginId)
1333 akapila@postgresql.o 1775 :CBC 182 : return true;
1776 : :
3342 peter_e@gmx.net 1777 : 366118 : return false;
1778 : : }
1779 : :
1780 : : /*
1781 : : * Shutdown the output plugin.
1782 : : *
1783 : : * Note, we don't need to clean the data->context, data->cachectx, and
1784 : : * data->pubctx as they are child contexts of the ctx->context so they
1785 : : * will be cleaned up by logical decoding machinery.
1786 : : */
1787 : : static void
3224 bruce@momjian.us 1788 : 554 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1789 : : {
157 msawada@postgresql.o 1790 : 554 : pgoutput_memory_context_reset(NULL);
3342 peter_e@gmx.net 1791 : 554 : }
1792 : :
1793 : : /*
1794 : : * Load publications from the list of publication names.
1795 : : *
1796 : : * Here, we skip the publications that don't exist yet. This will allow us
1797 : : * to silently continue the replication in the absence of a missing publication.
1798 : : * This is required because we allow the users to create publications after they
1799 : : * have specified the required publications at the time of replication start.
1800 : : */
1801 : : static List *
1802 : 226 : LoadPublications(List *pubnames)
1803 : : {
1804 : 226 : List *result = NIL;
1805 : : ListCell *lc;
1806 : :
3224 bruce@momjian.us 1807 [ + - + + : 507 : foreach(lc, pubnames)
+ + ]
1808 : : {
1809 : 281 : char *pubname = (char *) lfirst(lc);
366 akapila@postgresql.o 1810 : 281 : Publication *pub = GetPublicationByName(pubname, true);
1811 : :
1812 [ + + ]: 281 : if (pub)
1813 : 278 : result = lappend(result, pub);
1814 : : else
1815 [ + - ]: 3 : ereport(WARNING,
1816 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1817 : : errmsg("skipped loading publication \"%s\"", pubname),
1818 : : errdetail("The publication does not exist at this point in the WAL."),
1819 : : errhint("Create the publication if it does not exist."));
1820 : : }
1821 : :
3342 peter_e@gmx.net 1822 : 226 : return result;
1823 : : }
1824 : :
1825 : : /*
1826 : : * Publication syscache invalidation callback.
1827 : : *
1828 : : * Called for invalidations on pg_publication.
1829 : : */
1830 : : static void
25 michael@paquier.xyz 1831 :GNC 361 : publication_invalidation_cb(Datum arg, SysCacheIdentifier cacheid,
1832 : : uint32 hashvalue)
1833 : : {
3342 peter_e@gmx.net 1834 :CBC 361 : publications_valid = false;
1835 : 361 : }
1836 : :
1837 : : /*
1838 : : * START STREAM callback
1839 : : */
1840 : : static void
2019 akapila@postgresql.o 1841 : 600 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1842 : : ReorderBufferTXN *txn)
1843 : : {
899 michael@paquier.xyz 1844 : 600 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
46 msawada@postgresql.o 1845 :GNC 600 : bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
1846 : :
1847 : : /* we can't nest streaming of transactions */
899 michael@paquier.xyz 1848 [ - + ]:CBC 600 : Assert(!data->in_streaming);
1849 : :
1850 : : /*
1851 : : * If we already sent the first stream for this transaction then don't
1852 : : * send the origin id in the subsequent streams.
1853 : : */
2019 akapila@postgresql.o 1854 [ + + ]: 600 : if (rbtxn_is_streamed(txn))
1855 : 543 : send_replication_origin = false;
1856 : :
1857 : 600 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1858 : 600 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1859 : :
1705 1860 : 600 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1861 : : send_replication_origin);
1862 : :
2019 1863 : 600 : OutputPluginWrite(ctx, true);
1864 : :
1865 : : /* we're streaming a chunk of transaction now */
899 michael@paquier.xyz 1866 : 600 : data->in_streaming = true;
2019 akapila@postgresql.o 1867 : 600 : }
1868 : :
1869 : : /*
1870 : : * STOP STREAM callback
1871 : : */
1872 : : static void
1873 : 600 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1874 : : ReorderBufferTXN *txn)
1875 : : {
899 michael@paquier.xyz 1876 : 600 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1877 : :
1878 : : /* we should be streaming a transaction */
1879 [ - + ]: 600 : Assert(data->in_streaming);
1880 : :
2019 akapila@postgresql.o 1881 : 600 : OutputPluginPrepareWrite(ctx, true);
1882 : 600 : logicalrep_write_stream_stop(ctx->out);
1883 : 600 : OutputPluginWrite(ctx, true);
1884 : :
1885 : : /* we've stopped streaming a transaction */
899 michael@paquier.xyz 1886 : 600 : data->in_streaming = false;
2019 akapila@postgresql.o 1887 : 600 : }
1888 : :
1889 : : /*
1890 : : * Notify downstream to discard the streamed transaction (along with all
1891 : : * its subtransactions, if it's a toplevel transaction).
1892 : : */
1893 : : static void
1894 : 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1895 : : ReorderBufferTXN *txn,
1896 : : XLogRecPtr abort_lsn)
1897 : : {
1898 : : ReorderBufferTXN *toptxn;
1161 1899 : 26 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1900 : 26 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1901 : :
1902 : : /*
1903 : : * The abort should happen outside streaming block, even for streamed
1904 : : * transactions. The transaction has to be marked as streamed, though.
1905 : : */
899 michael@paquier.xyz 1906 [ - + ]: 26 : Assert(!data->in_streaming);
1907 : :
1908 : : /* determine the toplevel transaction */
1094 akapila@postgresql.o 1909 [ + + ]: 26 : toptxn = rbtxn_get_toptxn(txn);
1910 : :
2019 1911 [ - + ]: 26 : Assert(rbtxn_is_streamed(toptxn));
1912 : :
1913 : 26 : OutputPluginPrepareWrite(ctx, true);
1161 1914 : 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1915 : : txn->abort_time, write_abort_info);
1916 : :
2019 1917 : 26 : OutputPluginWrite(ctx, true);
1918 : :
1919 : 26 : cleanup_rel_sync_cache(toptxn->xid, false);
1920 : 26 : }
1921 : :
1922 : : /*
1923 : : * Notify downstream to apply the streamed transaction (along with all
1924 : : * its subtransactions).
1925 : : */
1926 : : static void
1927 : 44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1928 : : ReorderBufferTXN *txn,
1929 : : XLogRecPtr commit_lsn)
1930 : : {
899 michael@paquier.xyz 1931 : 44 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1932 : :
1933 : : /*
1934 : : * The commit should happen outside streaming block, even for streamed
1935 : : * transactions. The transaction has to be marked as streamed, though.
1936 : : */
1937 [ - + ]: 44 : Assert(!data->in_streaming);
2019 akapila@postgresql.o 1938 [ - + ]: 44 : Assert(rbtxn_is_streamed(txn));
1939 : :
1131 1940 : 44 : OutputPluginUpdateProgress(ctx, false);
1941 : :
2019 1942 : 44 : OutputPluginPrepareWrite(ctx, true);
1943 : 44 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1944 : 44 : OutputPluginWrite(ctx, true);
1945 : :
1946 : 44 : cleanup_rel_sync_cache(txn->xid, true);
1947 : 44 : }
1948 : :
1949 : : /*
1950 : : * PREPARE callback (for streaming two-phase commit).
1951 : : *
1952 : : * Notify the downstream to prepare the transaction.
1953 : : */
1954 : : static void
1684 1955 : 10 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1956 : : ReorderBufferTXN *txn,
1957 : : XLogRecPtr prepare_lsn)
1958 : : {
1959 [ - + ]: 10 : Assert(rbtxn_is_streamed(txn));
1960 : :
1131 1961 : 10 : OutputPluginUpdateProgress(ctx, false);
1684 1962 : 10 : OutputPluginPrepareWrite(ctx, true);
1963 : 10 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1964 : 10 : OutputPluginWrite(ctx, true);
1965 : 10 : }
1966 : :
1967 : : /*
1968 : : * Initialize the relation schema sync cache for a decoding session.
1969 : : *
1970 : : * The hash table is destroyed at the end of a decoding session. While
1971 : : * relcache invalidations still exist and will still be invoked, they
1972 : : * will just see the null hash table global and take no action.
1973 : : */
1974 : : static void
3342 peter_e@gmx.net 1975 : 426 : init_rel_sync_cache(MemoryContext cachectx)
1976 : : {
1977 : : HASHCTL ctl;
1978 : : static bool relation_callbacks_registered = false;
1979 : :
1980 : : /* Nothing to do if hash table already exists */
1981 [ - + ]: 426 : if (RelationSyncCache != NULL)
1982 : 2 : return;
1983 : :
1984 : : /* Make a new hash table for the cache */
1985 : 426 : ctl.keysize = sizeof(Oid);
1986 : 426 : ctl.entrysize = sizeof(RelationSyncEntry);
1987 : 426 : ctl.hcxt = cachectx;
1988 : :
1989 : 426 : RelationSyncCache = hash_create("logical replication output relation cache",
1990 : : 128, &ctl,
1991 : : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1992 : :
1993 [ - + ]: 426 : Assert(RelationSyncCache != NULL);
1994 : :
1995 : : /* No more to do if we already registered callbacks */
1116 tgl@sss.pgh.pa.us 1996 [ + + ]: 426 : if (relation_callbacks_registered)
1997 : 2 : return;
1998 : :
1999 : : /* We must update the cache entry for a relation after a relcache flush */
3342 peter_e@gmx.net 2000 : 424 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
2001 : :
2002 : : /*
2003 : : * Flush all cache entries after a pg_namespace change, in case it was a
2004 : : * schema rename affecting a relation being replicated.
2005 : : *
2006 : : * XXX: It is not a good idea to invalidate all the relation entries in
2007 : : * RelationSyncCache on schema rename. We can optimize it to invalidate
2008 : : * only the required relations by either having a specific invalidation
2009 : : * message containing impacted relations or by having schema information
2010 : : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
2011 : : * passed to the callback.
2012 : : */
1164 tgl@sss.pgh.pa.us 2013 : 424 : CacheRegisterSyscacheCallback(NAMESPACEOID,
2014 : : rel_sync_cache_publication_cb,
2015 : : (Datum) 0);
2016 : :
1116 2017 : 424 : relation_callbacks_registered = true;
2018 : : }
2019 : :
2020 : : /*
2021 : : * We expect relatively small number of streamed transactions.
2022 : : */
2023 : : static bool
2019 akapila@postgresql.o 2024 : 175903 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2025 : : {
1350 alvherre@alvh.no-ip. 2026 : 175903 : return list_member_xid(entry->streamed_txns, xid);
2027 : : }
2028 : :
2029 : : /*
2030 : : * Add the xid in the rel sync entry for which we have already sent the schema
2031 : : * of the relation.
2032 : : */
2033 : : static void
2019 akapila@postgresql.o 2034 : 66 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2035 : : {
2036 : : MemoryContext oldctx;
2037 : :
2038 : 66 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2039 : :
1350 alvherre@alvh.no-ip. 2040 : 66 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2041 : :
2019 akapila@postgresql.o 2042 : 66 : MemoryContextSwitchTo(oldctx);
2043 : 66 : }
2044 : :
2045 : : /*
2046 : : * Find or create entry in the relation schema cache.
2047 : : *
2048 : : * This looks up publications that the given relation is directly or
2049 : : * indirectly part of (the latter if it's really the relation's ancestor that
2050 : : * is part of a publication) and fills up the found entry with the information
2051 : : * about which operations to publish and whether to use an ancestor's schema
2052 : : * when publishing.
2053 : : */
2054 : : static RelationSyncEntry *
1482 2055 : 186558 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2056 : : {
2057 : : RelationSyncEntry *entry;
2058 : : bool found;
2059 : : MemoryContext oldctx;
2060 : 186558 : Oid relid = RelationGetRelid(relation);
2061 : :
3342 peter_e@gmx.net 2062 [ - + ]: 186558 : Assert(RelationSyncCache != NULL);
2063 : :
2064 : : /* Find cached relation info, creating if not found */
2065 : 186558 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2066 : : &relid,
2067 : : HASH_ENTER, &found);
2068 [ - + ]: 186558 : Assert(entry != NULL);
2069 : :
2070 : : /* initialize entry, if it's new */
2006 akapila@postgresql.o 2071 [ + + ]: 186558 : if (!found)
2072 : : {
1500 2073 : 327 : entry->replicate_valid = false;
2006 2074 : 327 : entry->schema_sent = false;
416 2075 : 327 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2006 2076 : 327 : entry->streamed_txns = NIL;
2077 : 327 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1438 tomas.vondra@postgre 2078 : 327 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1482 akapila@postgresql.o 2079 : 327 : entry->new_slot = NULL;
2080 : 327 : entry->old_slot = NULL;
2081 : 327 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1450 tomas.vondra@postgre 2082 : 327 : entry->entry_cxt = NULL;
2006 akapila@postgresql.o 2083 : 327 : entry->publish_as_relid = InvalidOid;
1450 tomas.vondra@postgre 2084 : 327 : entry->columns = NULL;
1482 akapila@postgresql.o 2085 : 327 : entry->attrmap = NULL;
2086 : : }
2087 : :
2088 : : /* Validate the entry */
2006 2089 [ + + ]: 186558 : if (!entry->replicate_valid)
2090 : : {
1600 2091 : 423 : Oid schemaId = get_rel_namespace(relid);
11 akapila@postgresql.o 2092 :GNC 423 : List *pubids = GetRelationIncludedPublications(relid);
2093 : :
2094 : : /*
2095 : : * We don't acquire a lock on the namespace system table as we build
2096 : : * the cache entry using a historic snapshot and all the later changes
2097 : : * are absorbed while decoding WAL.
2098 : : */
1438 tomas.vondra@postgre 2099 :CBC 423 : List *schemaPubids = GetSchemaPublications(schemaId);
2100 : : ListCell *lc;
2167 peter@eisentraut.org 2101 : 423 : Oid publish_as_relid = relid;
1460 tomas.vondra@postgre 2102 : 423 : int publish_ancestor_level = 0;
1530 michael@paquier.xyz 2103 : 423 : bool am_partition = get_rel_relispartition(relid);
1438 tomas.vondra@postgre 2104 : 423 : char relkind = get_rel_relkind(relid);
1482 akapila@postgresql.o 2105 : 423 : List *rel_publications = NIL;
2106 : :
2107 : : /* Reload publications if needed before use. */
3342 peter_e@gmx.net 2108 [ + + ]: 423 : if (!publications_valid)
2109 : : {
461 michael@paquier.xyz 2110 : 226 : MemoryContextReset(data->pubctx);
2111 : :
2112 : 226 : oldctx = MemoryContextSwitchTo(data->pubctx);
3342 peter_e@gmx.net 2113 : 226 : data->publications = LoadPublications(data->publication_names);
2114 : 226 : MemoryContextSwitchTo(oldctx);
2115 : 226 : publications_valid = true;
2116 : : }
2117 : :
2118 : : /*
2119 : : * Reset schema_sent status as the relation definition may have
2120 : : * changed. Also reset pubactions to empty in case rel was dropped
2121 : : * from a publication. Also free any objects that depended on the
2122 : : * earlier definition.
2123 : : */
1500 akapila@postgresql.o 2124 : 423 : entry->schema_sent = false;
416 2125 : 423 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1500 2126 : 423 : list_free(entry->streamed_txns);
2127 : 423 : entry->streamed_txns = NIL;
1450 tomas.vondra@postgre 2128 : 423 : bms_free(entry->columns);
2129 : 423 : entry->columns = NULL;
1500 akapila@postgresql.o 2130 : 423 : entry->pubactions.pubinsert = false;
2131 : 423 : entry->pubactions.pubupdate = false;
2132 : 423 : entry->pubactions.pubdelete = false;
2133 : 423 : entry->pubactions.pubtruncate = false;
2134 : :
2135 : : /*
2136 : : * Tuple slots cleanups. (Will be rebuilt later if needed).
2137 : : */
1482 2138 [ + + ]: 423 : if (entry->old_slot)
2139 : : {
479 michael@paquier.xyz 2140 : 63 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2141 : :
2142 [ - + ]: 63 : Assert(desc->tdrefcount == -1);
2143 : :
1482 akapila@postgresql.o 2144 : 63 : ExecDropSingleTupleTableSlot(entry->old_slot);
2145 : :
2146 : : /*
2147 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2148 : : * do it now to avoid any leaks.
2149 : : */
479 michael@paquier.xyz 2150 : 63 : FreeTupleDesc(desc);
2151 : : }
1482 akapila@postgresql.o 2152 [ + + ]: 423 : if (entry->new_slot)
2153 : : {
479 michael@paquier.xyz 2154 : 63 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2155 : :
2156 [ - + ]: 63 : Assert(desc->tdrefcount == -1);
2157 : :
1482 akapila@postgresql.o 2158 : 63 : ExecDropSingleTupleTableSlot(entry->new_slot);
2159 : :
2160 : : /*
2161 : : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2162 : : * do it now to avoid any leaks.
2163 : : */
479 michael@paquier.xyz 2164 : 63 : FreeTupleDesc(desc);
2165 : : }
2166 : :
1482 akapila@postgresql.o 2167 : 423 : entry->old_slot = NULL;
2168 : 423 : entry->new_slot = NULL;
2169 : :
2170 [ + + ]: 423 : if (entry->attrmap)
2171 : 3 : free_attrmap(entry->attrmap);
2172 : 423 : entry->attrmap = NULL;
2173 : :
2174 : : /*
2175 : : * Row filter cache cleanups.
2176 : : */
1450 tomas.vondra@postgre 2177 [ + + ]: 423 : if (entry->entry_cxt)
2178 : 63 : MemoryContextDelete(entry->entry_cxt);
2179 : :
2180 : 423 : entry->entry_cxt = NULL;
1482 akapila@postgresql.o 2181 : 423 : entry->estate = NULL;
2182 : 423 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2183 : :
2184 : : /*
2185 : : * Build publication cache. We can't use one provided by relcache as
2186 : : * relcache considers all publications that the given relation is in,
2187 : : * but here we only need to consider ones that the subscriber
2188 : : * requested.
2189 : : */
3342 peter_e@gmx.net 2190 [ + + + + : 1021 : foreach(lc, data->publications)
+ + ]
2191 : : {
2192 : 598 : Publication *pub = lfirst(lc);
2167 peter@eisentraut.org 2193 : 598 : bool publish = false;
2194 : :
2195 : : /*
2196 : : * Under what relid should we publish changes in this publication?
2197 : : * We'll use the top-most relid across all publications. Also
2198 : : * track the ancestor level for this publication.
2199 : : */
1403 tgl@sss.pgh.pa.us 2200 : 598 : Oid pub_relid = relid;
2201 : 598 : int ancestor_level = 0;
2202 : :
2203 : : /*
2204 : : * If this is a FOR ALL TABLES publication, pick the partition
2205 : : * root and set the ancestor level accordingly.
2206 : : */
1438 tomas.vondra@postgre 2207 [ + + ]: 598 : if (pub->alltables)
2208 : : {
11 akapila@postgresql.o 2209 :GNC 101 : List *exceptpubids = NIL;
2210 : :
2211 [ + + ]: 101 : if (am_partition)
2212 : : {
1460 tomas.vondra@postgre 2213 :CBC 33 : List *ancestors = get_partition_ancestors(relid);
11 akapila@postgresql.o 2214 :GNC 33 : Oid last_ancestor_relid = llast_oid(ancestors);
2215 : :
2216 : : /*
2217 : : * For a partition, changes are published via top-most
2218 : : * ancestor when pubviaroot is true, so populate pub_relid
2219 : : * accordingly.
2220 : : */
2221 [ + + ]: 33 : if (pub->pubviaroot)
2222 : : {
2223 : 19 : pub_relid = last_ancestor_relid;
2224 : 19 : ancestor_level = list_length(ancestors);
2225 : : }
2226 : :
2227 : : /*
2228 : : * Only the top-most ancestor can appear in the EXCEPT
2229 : : * clause. Therefore, for a partition, exclusion must be
2230 : : * evaluated at the top-most ancestor.
2231 : : */
2232 : 33 : exceptpubids = GetRelationExcludedPublications(last_ancestor_relid);
2233 : : }
2234 : : else
2235 : : {
2236 : : /*
2237 : : * For a regular table or a root partitioned table, check
2238 : : * exclusion on table itself.
2239 : : */
2240 : 68 : exceptpubids = GetRelationExcludedPublications(pub_relid);
2241 : : }
2242 : :
2243 [ + + ]: 101 : if (!list_member_oid(exceptpubids, pub->oid))
2244 : 86 : publish = true;
2245 : :
2246 : 101 : list_free(exceptpubids);
2247 : :
2248 [ + + ]: 101 : if (!publish)
2249 : 15 : continue;
2250 : : }
2251 : :
2167 peter@eisentraut.org 2252 [ + + ]:CBC 583 : if (!publish)
2253 : : {
2131 tgl@sss.pgh.pa.us 2254 : 497 : bool ancestor_published = false;
2255 : :
2256 : : /*
2257 : : * For a partition, check if any of the ancestors are
2258 : : * published. If so, note down the topmost ancestor that is
2259 : : * published via this publication, which will be used as the
2260 : : * relation via which to publish the partition's changes.
2261 : : */
2167 peter@eisentraut.org 2262 [ + + ]: 497 : if (am_partition)
2263 : : {
2264 : : Oid ancestor;
2265 : : int level;
2131 tgl@sss.pgh.pa.us 2266 : 121 : List *ancestors = get_partition_ancestors(relid);
2267 : :
1482 akapila@postgresql.o 2268 : 121 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2269 : : ancestors,
2270 : : &level);
2271 : :
2272 [ + + ]: 121 : if (ancestor != InvalidOid)
2273 : : {
2274 : 48 : ancestor_published = true;
2275 [ + + ]: 48 : if (pub->pubviaroot)
2276 : : {
1460 tomas.vondra@postgre 2277 : 25 : pub_relid = ancestor;
2278 : 25 : ancestor_level = level;
2279 : : }
2280 : : }
2281 : : }
2282 : :
1600 akapila@postgresql.o 2283 [ + + + + ]: 780 : if (list_member_oid(pubids, pub->oid) ||
2284 [ + + ]: 559 : list_member_oid(schemaPubids, pub->oid) ||
2285 : : ancestor_published)
2167 peter@eisentraut.org 2286 : 249 : publish = true;
2287 : : }
2288 : :
2289 : : /*
2290 : : * If the relation is to be published, determine actions to
2291 : : * publish, and list of columns, if appropriate.
2292 : : *
2293 : : * Don't publish changes for partitioned tables, because
2294 : : * publishing those of its partitions suffices, unless partition
2295 : : * changes won't be published due to pubviaroot being set.
2296 : : */
2297 [ + + + + ]: 583 : if (publish &&
2298 [ + + ]: 4 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2299 : : {
3342 peter_e@gmx.net 2300 : 332 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2301 : 332 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2302 : 332 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2899 2303 : 332 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2304 : :
2305 : : /*
2306 : : * We want to publish the changes as the top-most ancestor
2307 : : * across all publications. So we need to check if the already
2308 : : * calculated level is higher than the new one. If yes, we can
2309 : : * ignore the new value (as it's a child). Otherwise the new
2310 : : * value is an ancestor, so we keep it.
2311 : : */
1460 tomas.vondra@postgre 2312 [ + + ]: 332 : if (publish_ancestor_level > ancestor_level)
2313 : 1 : continue;
2314 : :
2315 : : /*
2316 : : * If we found an ancestor higher up in the tree, discard the
2317 : : * list of publications through which we replicate it, and use
2318 : : * the new ancestor.
2319 : : */
1459 2320 [ + + ]: 331 : if (publish_ancestor_level < ancestor_level)
2321 : : {
2322 : 40 : publish_as_relid = pub_relid;
2323 : 40 : publish_ancestor_level = ancestor_level;
2324 : :
2325 : : /* reset the publication list for this relation */
2326 : 40 : rel_publications = NIL;
2327 : : }
2328 : : else
2329 : : {
2330 : : /* Same ancestor level, has to be the same OID. */
2331 [ - + ]: 291 : Assert(publish_as_relid == pub_relid);
2332 : : }
2333 : :
2334 : : /* Track publications for this ancestor. */
2335 : 331 : rel_publications = lappend(rel_publications, pub);
2336 : : }
2337 : : }
2338 : :
1482 akapila@postgresql.o 2339 : 423 : entry->publish_as_relid = publish_as_relid;
2340 : :
2341 : : /*
2342 : : * Initialize the tuple slot, map, and row filter. These are only used
2343 : : * when publishing inserts, updates, or deletes.
2344 : : */
2345 [ + + + - ]: 423 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2346 [ - + ]: 105 : entry->pubactions.pubdelete)
2347 : : {
2348 : : /* Initialize the tuple slot and map */
2349 : 318 : init_tuple_slot(data, relation, entry);
2350 : :
2351 : : /* Initialize the row filter */
2352 : 318 : pgoutput_row_filter_init(data, rel_publications, entry);
2353 : :
2354 : : /* Check whether to publish generated columns. */
493 2355 : 318 : check_and_init_gencol(data, rel_publications, entry);
2356 : :
2357 : : /* Initialize the column list */
1450 tomas.vondra@postgre 2358 : 318 : pgoutput_column_list_init(data, rel_publications, entry);
2359 : : }
2360 : :
3342 peter_e@gmx.net 2361 : 422 : list_free(pubids);
1500 akapila@postgresql.o 2362 : 422 : list_free(schemaPubids);
1482 2363 : 422 : list_free(rel_publications);
2364 : :
3342 peter_e@gmx.net 2365 : 422 : entry->replicate_valid = true;
2366 : : }
2367 : :
2368 : 186557 : return entry;
2369 : : }
2370 : :
2371 : : /*
2372 : : * Cleanup list of streamed transactions and update the schema_sent flag.
2373 : : *
2374 : : * When a streamed transaction commits or aborts, we need to remove the
2375 : : * toplevel XID from the schema cache. If the transaction aborted, the
2376 : : * subscriber will simply throw away the schema records we streamed, so
2377 : : * we don't need to do anything else.
2378 : : *
2379 : : * If the transaction is committed, the subscriber will update the relation
2380 : : * cache - so tweak the schema_sent flag accordingly.
2381 : : */
2382 : : static void
2019 akapila@postgresql.o 2383 : 70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2384 : : {
2385 : : HASH_SEQ_STATUS hash_seq;
2386 : : RelationSyncEntry *entry;
2387 : :
2388 [ - + ]: 70 : Assert(RelationSyncCache != NULL);
2389 : :
2390 : 70 : hash_seq_init(&hash_seq, RelationSyncCache);
2391 [ + + ]: 143 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2392 : : {
2393 : : /*
2394 : : * We can set the schema_sent flag for an entry that has committed xid
2395 : : * in the list as that ensures that the subscriber would have the
2396 : : * corresponding schema and we don't need to send it unless there is
2397 : : * any invalidation for that relation.
2398 : : */
801 nathan@postgresql.or 2399 [ + + + - : 164 : foreach_xid(streamed_txn, entry->streamed_txns)
+ + ]
2400 : : {
2401 [ + + ]: 70 : if (xid == streamed_txn)
2402 : : {
2019 akapila@postgresql.o 2403 [ + + ]: 52 : if (is_commit)
2404 : 41 : entry->schema_sent = true;
2405 : :
2406 : 52 : entry->streamed_txns =
801 nathan@postgresql.or 2407 : 52 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2019 akapila@postgresql.o 2408 : 52 : break;
2409 : : }
2410 : : }
2411 : : }
2412 : 70 : }
2413 : :
2414 : : /*
2415 : : * Relcache invalidation callback
2416 : : */
2417 : : static void
3342 peter_e@gmx.net 2418 : 4174 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2419 : : {
2420 : : RelationSyncEntry *entry;
2421 : :
2422 : : /*
2423 : : * We can get here if the plugin was used in SQL interface as the
2424 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2425 : : * no way to unregister the relcache invalidation callback.
2426 : : */
2427 [ + + ]: 4174 : if (RelationSyncCache == NULL)
2428 : 32 : return;
2429 : :
2430 : : /*
2431 : : * Nobody keeps pointers to entries in this hash table around outside
2432 : : * logical decoding callback calls - but invalidation events can come in
2433 : : * *during* a callback if we do any syscache access in the callback.
2434 : : * Because of that we must mark the cache entry as invalid but not damage
2435 : : * any of its substructure here. The next get_rel_sync_entry() call will
2436 : : * rebuild it all.
2437 : : */
1500 akapila@postgresql.o 2438 [ + + ]: 4142 : if (OidIsValid(relid))
2439 : : {
2440 : : /*
2441 : : * Getting invalidations for relations that aren't in the table is
2442 : : * entirely normal. So we don't care if it's found or not.
2443 : : */
2444 : 4073 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2445 : : HASH_FIND, NULL);
2446 [ + + ]: 4073 : if (entry != NULL)
2447 : 715 : entry->replicate_valid = false;
2448 : : }
2449 : : else
2450 : : {
2451 : : /* Whole cache must be flushed. */
2452 : : HASH_SEQ_STATUS status;
2453 : :
2454 : 69 : hash_seq_init(&status, RelationSyncCache);
2455 [ + + ]: 138 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2456 : : {
2457 : 69 : entry->replicate_valid = false;
2458 : : }
2459 : : }
2460 : : }
2461 : :
2462 : : /*
2463 : : * Publication relation/schema map syscache invalidation callback
2464 : : *
2465 : : * Called for invalidations on pg_namespace.
2466 : : */
2467 : : static void
25 michael@paquier.xyz 2468 :GNC 41 : rel_sync_cache_publication_cb(Datum arg, SysCacheIdentifier cacheid,
2469 : : uint32 hashvalue)
2470 : : {
2471 : : HASH_SEQ_STATUS status;
2472 : : RelationSyncEntry *entry;
2473 : :
2474 : : /*
2475 : : * We can get here if the plugin was used in SQL interface as the
2476 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2477 : : * no way to unregister the invalidation callbacks.
2478 : : */
3342 peter_e@gmx.net 2479 [ + + ]:CBC 41 : if (RelationSyncCache == NULL)
2480 : 13 : return;
2481 : :
2482 : : /*
2483 : : * We have no easy way to identify which cache entries this invalidation
2484 : : * event might have affected, so just mark them all invalid.
2485 : : */
2486 : 28 : hash_seq_init(&status, RelationSyncCache);
2487 [ + + ]: 49 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2488 : : {
2489 : 21 : entry->replicate_valid = false;
2490 : : }
2491 : : }
2492 : :
2493 : : /* Send Replication origin */
2494 : : static void
46 msawada@postgresql.o 2495 :GNC 1083 : send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id,
2496 : : XLogRecPtr origin_lsn, bool send_origin)
2497 : : {
1705 akapila@postgresql.o 2498 [ + + ]:CBC 1083 : if (send_origin)
2499 : : {
2500 : : char *origin;
2501 : :
2502 : : /*----------
2503 : : * XXX: which behaviour do we want here?
2504 : : *
2505 : : * Alternatives:
2506 : : * - don't send origin message if origin name not found
2507 : : * (that's what we do now)
2508 : : * - throw error - that will break replication, not good
2509 : : * - send some special "unknown" origin
2510 : : *----------
2511 : : */
2512 [ + - ]: 9 : if (replorigin_by_oid(origin_id, true, &origin))
2513 : : {
2514 : : /* Message boundary */
2515 : 9 : OutputPluginWrite(ctx, false);
2516 : 9 : OutputPluginPrepareWrite(ctx, true);
2517 : :
2518 : 9 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2519 : : }
2520 : : }
2521 : 1083 : }
|