Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execReplication.c
4 : : * miscellaneous executor routines for logical replication
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/execReplication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/amapi.h"
18 : : #include "access/commit_ts.h"
19 : : #include "access/genam.h"
20 : : #include "access/gist.h"
21 : : #include "access/relscan.h"
22 : : #include "access/tableam.h"
23 : : #include "access/transam.h"
24 : : #include "access/xact.h"
25 : : #include "access/heapam.h"
26 : : #include "catalog/pg_am_d.h"
27 : : #include "commands/trigger.h"
28 : : #include "executor/executor.h"
29 : : #include "executor/nodeModifyTable.h"
30 : : #include "replication/conflict.h"
31 : : #include "replication/logicalrelation.h"
32 : : #include "storage/lmgr.h"
33 : : #include "utils/builtins.h"
34 : : #include "utils/lsyscache.h"
35 : : #include "utils/rel.h"
36 : : #include "utils/snapmgr.h"
37 : : #include "utils/syscache.h"
38 : : #include "utils/typcache.h"
39 : :
40 : :
41 : : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
42 : : TypeCacheEntry **eq, Bitmapset *columns);
43 : :
44 : : /*
45 : : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
46 : : * is setup to match 'rel' (*NOT* idxrel!).
47 : : *
48 : : * Returns how many columns to use for the index scan.
49 : : *
50 : : * This is not a generic routine, idxrel must be PK, RI, or an index that can be
51 : : * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
52 : : * for details.
53 : : *
54 : : * By definition, replication identity of a rel meets all limitations associated
55 : : * with that. Note that any other index could also meet these limitations.
56 : : */
57 : : static int
3342 peter_e@gmx.net 58 :CBC 72106 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
59 : : TupleTableSlot *searchslot)
60 : : {
61 : : int index_attoff;
1096 akapila@postgresql.o 62 : 72106 : int skey_attoff = 0;
63 : : Datum indclassDatum;
64 : : oidvector *opclass;
3342 peter_e@gmx.net 65 : 72106 : int2vector *indkey = &idxrel->rd_index->indkey;
66 : :
1086 dgustafsson@postgres 67 : 72106 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
68 : : Anum_pg_index_indclass);
3342 peter_e@gmx.net 69 : 72106 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
70 : :
71 : : /* Build scankey for every non-expression attribute in the index. */
1096 akapila@postgresql.o 72 [ + + ]: 144235 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
73 : 72129 : index_attoff++)
74 : : {
75 : : Oid operator;
76 : : Oid optype;
77 : : Oid opfamily;
78 : : RegProcedure regop;
79 : 72129 : int table_attno = indkey->values[index_attoff];
80 : : StrategyNumber eq_strategy;
81 : :
82 [ + + ]: 72129 : if (!AttributeNumberIsValid(table_attno))
83 : : {
84 : : /*
85 : : * XXX: Currently, we don't support expressions in the scan key,
86 : : * see code below.
87 : : */
88 : 2 : continue;
89 : : }
90 : :
91 : : /*
92 : : * Load the operator info. We need this to get the equality operator
93 : : * function for the scan key.
94 : : */
95 : 72127 : optype = get_opclass_input_type(opclass->values[index_attoff]);
96 : 72127 : opfamily = get_opclass_family(opclass->values[index_attoff]);
387 peter@eisentraut.org 97 : 72127 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
3342 peter_e@gmx.net 98 : 72127 : operator = get_opfamily_member(opfamily, optype,
99 : : optype,
100 : : eq_strategy);
101 : :
102 [ - + ]: 72127 : if (!OidIsValid(operator))
3156 tgl@sss.pgh.pa.us 103 [ # # ]:UBC 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
104 : : eq_strategy, optype, optype, opfamily);
105 : :
3342 peter_e@gmx.net 106 :CBC 72127 : regop = get_opcode(operator);
107 : :
108 : : /* Initialize the scankey. */
1096 akapila@postgresql.o 109 : 72127 : ScanKeyInit(&skey[skey_attoff],
110 : 72127 : index_attoff + 1,
111 : : eq_strategy,
112 : : regop,
113 : 72127 : searchslot->tts_values[table_attno - 1]);
114 : :
115 : 72127 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
116 : :
117 : : /* Check for null value. */
118 [ + + ]: 72127 : if (searchslot->tts_isnull[table_attno - 1])
119 : 1 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
120 : :
121 : 72127 : skey_attoff++;
122 : : }
123 : :
124 : : /* There must always be at least one attribute for the index scan. */
125 [ - + ]: 72106 : Assert(skey_attoff > 0);
126 : :
127 : 72106 : return skey_attoff;
128 : : }
129 : :
130 : :
131 : : /*
132 : : * Helper function to check if it is necessary to re-fetch and lock the tuple
133 : : * due to concurrent modifications. This function should be called after
134 : : * invoking table_tuple_lock.
135 : : */
136 : : static bool
572 137 : 72296 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
138 : : {
139 : 72296 : bool refetch = false;
140 : :
141 [ + - - - : 72296 : switch (res)
- ]
142 : : {
143 : 72296 : case TM_Ok:
144 : 72296 : break;
572 akapila@postgresql.o 145 :UBC 0 : case TM_Updated:
146 : : /* XXX: Improve handling here */
147 [ # # ]: 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
148 [ # # ]: 0 : ereport(LOG,
149 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
150 : : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
151 : : else
152 [ # # ]: 0 : ereport(LOG,
153 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
154 : : errmsg("concurrent update, retrying")));
155 : 0 : refetch = true;
156 : 0 : break;
157 : 0 : case TM_Deleted:
158 : : /* XXX: Improve handling here */
159 [ # # ]: 0 : ereport(LOG,
160 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
161 : : errmsg("concurrent delete, retrying")));
162 : 0 : refetch = true;
163 : 0 : break;
164 : 0 : case TM_Invisible:
165 [ # # ]: 0 : elog(ERROR, "attempted to lock invisible tuple");
166 : : break;
167 : 0 : default:
168 [ # # ]: 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
169 : : break;
170 : : }
171 : :
572 akapila@postgresql.o 172 :CBC 72296 : return refetch;
173 : : }
174 : :
175 : : /*
176 : : * Search the relation 'rel' for tuple using the index.
177 : : *
178 : : * If a matching tuple is found, lock it with lockmode, fill the slot with its
179 : : * contents, and return true. Return false otherwise.
180 : : */
181 : : bool
3342 peter_e@gmx.net 182 : 72105 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
183 : : LockTupleMode lockmode,
184 : : TupleTableSlot *searchslot,
185 : : TupleTableSlot *outslot)
186 : : {
187 : : ScanKeyData skey[INDEX_MAX_KEYS];
188 : : int skey_attoff;
189 : : IndexScanDesc scan;
190 : : SnapshotData snap;
191 : : TransactionId xwait;
192 : : Relation idxrel;
193 : : bool found;
1096 akapila@postgresql.o 194 : 72105 : TypeCacheEntry **eq = NULL;
195 : : bool isIdxSafeToSkipDuplicates;
196 : :
197 : : /* Open the index. */
3342 peter_e@gmx.net 198 : 72105 : idxrel = index_open(idxoid, RowExclusiveLock);
199 : :
1096 akapila@postgresql.o 200 : 72105 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
201 : :
3342 peter_e@gmx.net 202 : 72105 : InitDirtySnapshot(snap);
203 : :
204 : : /* Build scan key. */
1096 akapila@postgresql.o 205 : 72105 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
206 : :
207 : : /* Start an index scan. */
369 pg@bowt.ie 208 : 72105 : scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
209 : :
3342 peter_e@gmx.net 210 :UBC 0 : retry:
3342 peter_e@gmx.net 211 :CBC 72105 : found = false;
212 : :
1096 akapila@postgresql.o 213 : 72105 : index_rescan(scan, skey, skey_attoff, NULL, 0);
214 : :
215 : : /* Try to find the tuple */
216 [ + + ]: 72105 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
217 : : {
218 : : /*
219 : : * Avoid expensive equality check if the index is primary key or
220 : : * replica identity index.
221 : : */
222 [ + + ]: 72089 : if (!isIdxSafeToSkipDuplicates)
223 : : {
224 [ + - ]: 17 : if (eq == NULL)
95 michael@paquier.xyz 225 :GNC 17 : eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
226 : :
223 akapila@postgresql.o 227 [ - + ]: 17 : if (!tuples_equal(outslot, searchslot, eq, NULL))
1096 akapila@postgresql.o 228 :UBC 0 : continue;
229 : : }
230 : :
3342 peter_e@gmx.net 231 :CBC 72089 : ExecMaterializeSlot(outslot);
232 : :
233 : 144178 : xwait = TransactionIdIsValid(snap.xmin) ?
234 [ - + ]: 72089 : snap.xmin : snap.xmax;
235 : :
236 : : /*
237 : : * If the tuple is locked, wait for locking transaction to finish and
238 : : * retry.
239 : : */
240 [ - + ]: 72089 : if (TransactionIdIsValid(xwait))
241 : : {
3342 peter_e@gmx.net 242 :UBC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
243 : 0 : goto retry;
244 : : }
245 : :
246 : : /* Found our tuple and it's not locked */
1096 akapila@postgresql.o 247 :CBC 72089 : found = true;
248 : 72089 : break;
249 : : }
250 : :
251 : : /* Found tuple, try to lock it in the lockmode. */
3342 peter_e@gmx.net 252 [ + + ]: 72105 : if (found)
253 : : {
254 : : TM_FailureData tmfd;
255 : : TM_Result res;
256 : :
257 : 72089 : PushActiveSnapshot(GetLatestSnapshot());
258 : :
370 heikki.linnakangas@i 259 : 72089 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
260 : : outslot,
261 : : GetCurrentCommandId(false),
262 : : lockmode,
263 : : LockWaitBlock,
264 : : 0 /* don't follow updates */ ,
265 : : &tmfd);
266 : :
3342 peter_e@gmx.net 267 : 72089 : PopActiveSnapshot();
268 : :
572 akapila@postgresql.o 269 [ - + ]: 72089 : if (should_refetch_tuple(res, &tmfd))
572 akapila@postgresql.o 270 :UBC 0 : goto retry;
271 : : }
272 : :
3342 peter_e@gmx.net 273 :CBC 72105 : index_endscan(scan);
274 : :
275 : : /* Don't release lock until commit. */
276 : 72105 : index_close(idxrel, NoLock);
277 : :
278 : 72105 : return found;
279 : : }
280 : :
281 : : /*
282 : : * Compare the tuples in the slots by checking if they have equal values.
283 : : *
284 : : * If 'columns' is not null, only the columns specified within it will be
285 : : * considered for the equality check, ignoring all other columns.
286 : : */
287 : : static bool
2164 noah@leadboat.com 288 : 105326 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
289 : : TypeCacheEntry **eq, Bitmapset *columns)
290 : : {
291 : : int attrnum;
292 : :
2561 andres@anarazel.de 293 [ - + ]: 105326 : Assert(slot1->tts_tupleDescriptor->natts ==
294 : : slot2->tts_tupleDescriptor->natts);
295 : :
296 : 105326 : slot_getallattrs(slot1);
297 : 105326 : slot_getallattrs(slot2);
298 : :
299 : : /* Check equality of the attributes. */
300 [ + + ]: 105530 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
301 : : {
302 : : Form_pg_attribute att;
303 : : TypeCacheEntry *typentry;
304 : :
1090 akapila@postgresql.o 305 : 105364 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
306 : :
307 : : /*
308 : : * Ignore dropped and generated columns as the publisher doesn't send
309 : : * those
310 : : */
1088 311 [ + + - + ]: 105364 : if (att->attisdropped || att->attgenerated)
1090 312 : 1 : continue;
313 : :
314 : : /*
315 : : * Ignore columns that are not listed for checking.
316 : : */
223 akapila@postgresql.o 317 [ - + ]:GNC 105363 : if (columns &&
223 akapila@postgresql.o 318 [ # # ]:UNC 0 : !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
319 : : columns))
320 : 0 : continue;
321 : :
322 : : /*
323 : : * If one value is NULL and other is not, then they are certainly not
324 : : * equal
325 : : */
2561 andres@anarazel.de 326 [ - + ]:CBC 105363 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
3342 peter_e@gmx.net 327 :UBC 0 : return false;
328 : :
329 : : /*
330 : : * If both are NULL, they can be considered equal.
331 : : */
2561 andres@anarazel.de 332 [ + + - + ]:CBC 105363 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
3342 peter_e@gmx.net 333 : 1 : continue;
334 : :
2164 noah@leadboat.com 335 : 105362 : typentry = eq[attrnum];
336 [ + + ]: 105362 : if (typentry == NULL)
337 : : {
338 : 204 : typentry = lookup_type_cache(att->atttypid,
339 : : TYPECACHE_EQ_OPR_FINFO);
340 [ - + ]: 204 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
2164 noah@leadboat.com 341 [ # # ]:UBC 0 : ereport(ERROR,
342 : : (errcode(ERRCODE_UNDEFINED_FUNCTION),
343 : : errmsg("could not identify an equality operator for type %s",
344 : : format_type_be(att->atttypid))));
2164 noah@leadboat.com 345 :CBC 204 : eq[attrnum] = typentry;
346 : : }
347 : :
2550 peter@eisentraut.org 348 [ + + ]: 105362 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
349 : : att->attcollation,
2489 tgl@sss.pgh.pa.us 350 : 105362 : slot1->tts_values[attrnum],
351 : 105362 : slot2->tts_values[attrnum])))
3342 peter_e@gmx.net 352 : 105160 : return false;
353 : : }
354 : :
355 : 166 : return true;
356 : : }
357 : :
358 : : /*
359 : : * Search the relation 'rel' for tuple using the sequential scan.
360 : : *
361 : : * If a matching tuple is found, lock it with lockmode, fill the slot with its
362 : : * contents, and return true. Return false otherwise.
363 : : *
364 : : * Note that this stops on the first matching tuple.
365 : : *
366 : : * This can obviously be quite slow on tables that have more than few rows.
367 : : */
368 : : bool
369 : 151 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
370 : : TupleTableSlot *searchslot, TupleTableSlot *outslot)
371 : : {
372 : : TupleTableSlot *scanslot;
373 : : TableScanDesc scan;
374 : : SnapshotData snap;
375 : : TypeCacheEntry **eq;
376 : : TransactionId xwait;
377 : : bool found;
2561 andres@anarazel.de 378 : 151 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
379 : :
3342 peter_e@gmx.net 380 [ - + ]: 151 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
381 : :
95 michael@paquier.xyz 382 :GNC 151 : eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
383 : :
384 : : /* Start a heap scan. */
3342 peter_e@gmx.net 385 :CBC 151 : InitDirtySnapshot(snap);
2561 andres@anarazel.de 386 : 151 : scan = table_beginscan(rel, &snap, 0, NULL);
387 : 151 : scanslot = table_slot_create(rel, NULL);
388 : :
3342 peter_e@gmx.net 389 :UBC 0 : retry:
3342 peter_e@gmx.net 390 :CBC 151 : found = false;
391 : :
2561 andres@anarazel.de 392 : 151 : table_rescan(scan, NULL);
393 : :
394 : : /* Try to find the tuple */
395 [ + + ]: 105310 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
396 : : {
223 akapila@postgresql.o 397 [ + + ]:GNC 105306 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
3342 peter_e@gmx.net 398 :CBC 105159 : continue;
399 : :
400 : 147 : found = true;
2561 andres@anarazel.de 401 : 147 : ExecCopySlot(outslot, scanslot);
402 : :
3342 peter_e@gmx.net 403 : 294 : xwait = TransactionIdIsValid(snap.xmin) ?
404 [ - + ]: 147 : snap.xmin : snap.xmax;
405 : :
406 : : /*
407 : : * If the tuple is locked, wait for locking transaction to finish and
408 : : * retry.
409 : : */
410 [ - + ]: 147 : if (TransactionIdIsValid(xwait))
411 : : {
3342 peter_e@gmx.net 412 :UBC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
413 : 0 : goto retry;
414 : : }
415 : :
416 : : /* Found our tuple and it's not locked */
2232 alvherre@alvh.no-ip. 417 :CBC 147 : break;
418 : : }
419 : :
420 : : /* Found tuple, try to lock it in the lockmode. */
3342 peter_e@gmx.net 421 [ + + ]: 151 : if (found)
422 : : {
423 : : TM_FailureData tmfd;
424 : : TM_Result res;
425 : :
426 : 147 : PushActiveSnapshot(GetLatestSnapshot());
427 : :
370 heikki.linnakangas@i 428 : 147 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
429 : : outslot,
430 : : GetCurrentCommandId(false),
431 : : lockmode,
432 : : LockWaitBlock,
433 : : 0 /* don't follow updates */ ,
434 : : &tmfd);
435 : :
3342 peter_e@gmx.net 436 : 147 : PopActiveSnapshot();
437 : :
572 akapila@postgresql.o 438 [ - + ]: 147 : if (should_refetch_tuple(res, &tmfd))
572 akapila@postgresql.o 439 :UBC 0 : goto retry;
440 : : }
441 : :
2561 andres@anarazel.de 442 :CBC 151 : table_endscan(scan);
443 : 151 : ExecDropSingleTupleTableSlot(scanslot);
444 : :
3342 peter_e@gmx.net 445 : 151 : return found;
446 : : }
447 : :
448 : : /*
449 : : * Build additional index information necessary for conflict detection.
450 : : */
451 : : static void
341 akapila@postgresql.o 452 : 62 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
453 : : {
454 [ + + ]: 186 : for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
455 : : {
456 : 124 : Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
457 : 124 : IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
458 : :
459 [ + + ]: 124 : if (conflictindex != RelationGetRelid(indexRelation))
460 : 62 : continue;
461 : :
462 : : /*
463 : : * This Assert will fail if BuildSpeculativeIndexInfo() is called
464 : : * twice for the given index.
465 : : */
466 [ - + ]: 62 : Assert(indexRelationInfo->ii_UniqueOps == NULL);
467 : :
468 : 62 : BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
469 : : }
470 : 62 : }
471 : :
472 : : /*
473 : : * If the tuple is recently dead and was deleted by a transaction with a newer
474 : : * commit timestamp than previously recorded, update the associated transaction
475 : : * ID, commit time, and origin. This helps ensure that conflict detection uses
476 : : * the most recent and relevant deletion metadata.
477 : : */
478 : : static void
223 akapila@postgresql.o 479 :GNC 3 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
480 : : TransactionId oldestxmin,
481 : : TransactionId *delete_xid,
482 : : TimestampTz *delete_time,
483 : : ReplOriginId *delete_origin)
484 : : {
485 : : BufferHeapTupleTableSlot *hslot;
486 : : HeapTuple tuple;
487 : : Buffer buf;
488 : 3 : bool recently_dead = false;
489 : : TransactionId xmax;
490 : : TimestampTz localts;
491 : : ReplOriginId localorigin;
492 : :
493 : 3 : hslot = (BufferHeapTupleTableSlot *) scanslot;
494 : :
495 : 3 : tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
496 : 3 : buf = hslot->buffer;
497 : :
498 : 3 : LockBuffer(buf, BUFFER_LOCK_SHARE);
499 : :
500 : : /*
501 : : * We do not consider HEAPTUPLE_DEAD status because it indicates either
502 : : * tuples whose inserting transaction was aborted (meaning there is no
503 : : * commit timestamp or origin), or tuples deleted by a transaction older
504 : : * than oldestxmin, making it safe to ignore them during conflict
505 : : * detection (See comments atop worker.c for details).
506 : : */
507 [ + - ]: 3 : if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
508 : 3 : recently_dead = true;
509 : :
510 : 3 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
511 : :
512 [ - + ]: 3 : if (!recently_dead)
223 akapila@postgresql.o 513 :UNC 0 : return;
514 : :
223 akapila@postgresql.o 515 :GNC 3 : xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
516 [ - + ]: 3 : if (!TransactionIdIsValid(xmax))
223 akapila@postgresql.o 517 :UNC 0 : return;
518 : :
519 : : /* Select the dead tuple with the most recent commit timestamp */
223 akapila@postgresql.o 520 [ + - + - ]:GNC 6 : if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
521 : 3 : TimestampDifferenceExceeds(*delete_time, localts, 0))
522 : : {
523 : 3 : *delete_xid = xmax;
524 : 3 : *delete_time = localts;
525 : 3 : *delete_origin = localorigin;
526 : : }
527 : : }
528 : :
529 : : /*
530 : : * Searches the relation 'rel' for the most recently deleted tuple that matches
531 : : * the values in 'searchslot' and is not yet removable by VACUUM. The function
532 : : * returns the transaction ID, origin, and commit timestamp of the transaction
533 : : * that deleted this tuple.
534 : : *
535 : : * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
536 : : * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
537 : : * conflict detection.
538 : : *
539 : : * Instead of stopping at the first match, we scan all matching dead tuples to
540 : : * identify most recent deletion. This is crucial because only the latest
541 : : * deletion is relevant for resolving conflicts.
542 : : *
543 : : * For example, consider a scenario on the subscriber where a row is deleted,
544 : : * re-inserted, and then deleted again only on the subscriber:
545 : : *
546 : : * - (pk, 1) - deleted at 9:00,
547 : : * - (pk, 1) - deleted at 9:02,
548 : : *
549 : : * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
550 : : *
551 : : * If we mistakenly return the older deletion (9:00), the system may wrongly
552 : : * apply the remote update using a last-update-wins strategy. Instead, we must
553 : : * recognize the more recent deletion at 9:02 and skip the update. See
554 : : * comments atop worker.c for details. Note, as of now, conflict resolution
555 : : * is not implemented. Consequently, the system may incorrectly report the
556 : : * older tuple as the conflicted one, leading to misleading results.
557 : : *
558 : : * The commit timestamp of the deleting transaction is used to determine which
559 : : * tuple was deleted most recently.
560 : : */
561 : : bool
562 : 2 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
563 : : TransactionId oldestxmin,
564 : : TransactionId *delete_xid,
565 : : ReplOriginId *delete_origin,
566 : : TimestampTz *delete_time)
567 : : {
568 : : TupleTableSlot *scanslot;
569 : : TableScanDesc scan;
570 : : TypeCacheEntry **eq;
571 : : Bitmapset *indexbitmap;
572 : 2 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
573 : :
574 [ - + ]: 2 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
575 : :
576 : 2 : *delete_xid = InvalidTransactionId;
46 msawada@postgresql.o 577 : 2 : *delete_origin = InvalidReplOriginId;
223 akapila@postgresql.o 578 : 2 : *delete_time = 0;
579 : :
580 : : /*
581 : : * If the relation has a replica identity key or a primary key that is
582 : : * unusable for locating deleted tuples (see
583 : : * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
584 : : * necessary. In such cases, comparing the entire tuple is not required,
585 : : * since the remote tuple might not include all column values. Instead,
586 : : * the indexed columns alone are sufficient to identify the target tuple
587 : : * (see logicalrep_rel_mark_updatable).
588 : : */
589 : 2 : indexbitmap = RelationGetIndexAttrBitmap(rel,
590 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
591 : :
592 : : /* fallback to PK if no replica identity */
593 [ + - ]: 2 : if (!indexbitmap)
594 : 2 : indexbitmap = RelationGetIndexAttrBitmap(rel,
595 : : INDEX_ATTR_BITMAP_PRIMARY_KEY);
596 : :
95 michael@paquier.xyz 597 : 2 : eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
598 : :
599 : : /*
600 : : * Start a heap scan using SnapshotAny to identify dead tuples that are
601 : : * not visible under a standard MVCC snapshot. Tuples from transactions
602 : : * not yet committed or those just committed prior to the scan are
603 : : * excluded in update_most_recent_deletion_info().
604 : : */
223 akapila@postgresql.o 605 : 2 : scan = table_beginscan(rel, SnapshotAny, 0, NULL);
606 : 2 : scanslot = table_slot_create(rel, NULL);
607 : :
608 : 2 : table_rescan(scan, NULL);
609 : :
610 : : /* Try to find the tuple */
611 [ + + ]: 5 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
612 : : {
613 [ + + ]: 3 : if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
614 : 1 : continue;
615 : :
616 : 2 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
617 : : delete_time, delete_origin);
618 : : }
619 : :
620 : 2 : table_endscan(scan);
621 : 2 : ExecDropSingleTupleTableSlot(scanslot);
622 : :
623 : 2 : return *delete_time != 0;
624 : : }
625 : :
626 : : /*
627 : : * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
628 : : * the deleted tuple.
629 : : */
630 : : bool
631 : 1 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
632 : : TupleTableSlot *searchslot,
633 : : TransactionId oldestxmin,
634 : : TransactionId *delete_xid,
635 : : ReplOriginId *delete_origin,
636 : : TimestampTz *delete_time)
637 : : {
638 : : Relation idxrel;
639 : : ScanKeyData skey[INDEX_MAX_KEYS];
640 : : int skey_attoff;
641 : : IndexScanDesc scan;
642 : : TupleTableSlot *scanslot;
643 : 1 : TypeCacheEntry **eq = NULL;
644 : : bool isIdxSafeToSkipDuplicates;
645 : 1 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
646 : :
647 [ - + ]: 1 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
648 [ - + ]: 1 : Assert(OidIsValid(idxoid));
649 : :
650 : 1 : *delete_xid = InvalidTransactionId;
651 : 1 : *delete_time = 0;
46 msawada@postgresql.o 652 : 1 : *delete_origin = InvalidReplOriginId;
653 : :
223 akapila@postgresql.o 654 : 1 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
655 : :
656 : 1 : scanslot = table_slot_create(rel, NULL);
657 : :
658 : 1 : idxrel = index_open(idxoid, RowExclusiveLock);
659 : :
660 : : /* Build scan key. */
661 : 1 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
662 : :
663 : : /*
664 : : * Start an index scan using SnapshotAny to identify dead tuples that are
665 : : * not visible under a standard MVCC snapshot. Tuples from transactions
666 : : * not yet committed or those just committed prior to the scan are
667 : : * excluded in update_most_recent_deletion_info().
668 : : */
669 : 1 : scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
670 : :
671 : 1 : index_rescan(scan, skey, skey_attoff, NULL, 0);
672 : :
673 : : /* Try to find the tuple */
674 [ + + ]: 2 : while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
675 : : {
676 : : /*
677 : : * Avoid expensive equality check if the index is primary key or
678 : : * replica identity index.
679 : : */
680 [ - + ]: 1 : if (!isIdxSafeToSkipDuplicates)
681 : : {
223 akapila@postgresql.o 682 [ # # ]:UNC 0 : if (eq == NULL)
95 michael@paquier.xyz 683 : 0 : eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
684 : :
223 akapila@postgresql.o 685 [ # # ]: 0 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
686 : 0 : continue;
687 : : }
688 : :
223 akapila@postgresql.o 689 :GNC 1 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
690 : : delete_time, delete_origin);
691 : : }
692 : :
693 : 1 : index_endscan(scan);
694 : :
695 : 1 : index_close(idxrel, NoLock);
696 : :
697 : 1 : ExecDropSingleTupleTableSlot(scanslot);
698 : :
699 : 1 : return *delete_time != 0;
700 : : }
701 : :
702 : : /*
703 : : * Find the tuple that violates the passed unique index (conflictindex).
704 : : *
705 : : * If the conflicting tuple is found return true, otherwise false.
706 : : *
707 : : * We lock the tuple to avoid getting it deleted before the caller can fetch
708 : : * the required information. Note that if the tuple is deleted before a lock
709 : : * is acquired, we will retry to find the conflicting tuple again.
710 : : */
711 : : static bool
572 akapila@postgresql.o 712 :CBC 62 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
713 : : Oid conflictindex, TupleTableSlot *slot,
714 : : TupleTableSlot **conflictslot)
715 : : {
716 : 62 : Relation rel = resultRelInfo->ri_RelationDesc;
717 : : ItemPointerData conflictTid;
718 : : TM_FailureData tmfd;
719 : : TM_Result res;
720 : :
721 : 62 : *conflictslot = NULL;
722 : :
723 : : /*
724 : : * Build additional information required to check constraints violations.
725 : : * See check_exclusion_or_unique_constraint().
726 : : */
341 727 : 62 : BuildConflictIndexInfo(resultRelInfo, conflictindex);
728 : :
572 729 : 62 : retry:
730 [ + + ]: 61 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
572 akapila@postgresql.o 731 :GIC 62 : &conflictTid, &slot->tts_tid,
572 akapila@postgresql.o 732 :CBC 62 : list_make1_oid(conflictindex)))
733 : : {
734 [ - + ]: 1 : if (*conflictslot)
572 akapila@postgresql.o 735 :UBC 0 : ExecDropSingleTupleTableSlot(*conflictslot);
736 : :
572 akapila@postgresql.o 737 :CBC 1 : *conflictslot = NULL;
738 : 1 : return false;
739 : : }
740 : :
741 : 60 : *conflictslot = table_slot_create(rel, NULL);
742 : :
743 : 60 : PushActiveSnapshot(GetLatestSnapshot());
744 : :
370 heikki.linnakangas@i 745 : 60 : res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
746 : : *conflictslot,
747 : : GetCurrentCommandId(false),
748 : : LockTupleShare,
749 : : LockWaitBlock,
750 : : 0 /* don't follow updates */ ,
751 : : &tmfd);
752 : :
572 akapila@postgresql.o 753 : 60 : PopActiveSnapshot();
754 : :
755 [ - + ]: 60 : if (should_refetch_tuple(res, &tmfd))
572 akapila@postgresql.o 756 :UBC 0 : goto retry;
757 : :
572 akapila@postgresql.o 758 :CBC 60 : return true;
759 : : }
760 : :
761 : : /*
762 : : * Check all the unique indexes in 'recheckIndexes' for conflict with the
763 : : * tuple in 'remoteslot' and report if found.
764 : : */
765 : : static void
766 : 34 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
767 : : ConflictType type, List *recheckIndexes,
768 : : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
769 : : {
356 770 : 34 : List *conflicttuples = NIL;
771 : : TupleTableSlot *conflictslot;
772 : :
773 : : /* Check all the unique indexes for conflicts */
572 774 [ + - + + : 128 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ + ]
775 : : {
776 [ + - + + ]: 123 : if (list_member_oid(recheckIndexes, uniqueidx) &&
777 : 62 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
778 : : &conflictslot))
779 : : {
356 780 : 60 : ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
781 : :
782 : 60 : conflicttuple->slot = conflictslot;
783 : 60 : conflicttuple->indexoid = uniqueidx;
784 : :
785 : 60 : GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
786 : : &conflicttuple->origin, &conflicttuple->ts);
787 : :
788 : 60 : conflicttuples = lappend(conflicttuples, conflicttuple);
789 : : }
790 : : }
791 : :
792 : : /* Report the conflict, if found */
793 [ + + ]: 33 : if (conflicttuples)
794 [ + + ]: 32 : ReportApplyConflict(estate, resultRelInfo, ERROR,
795 : 32 : list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
796 : : searchslot, remoteslot, conflicttuples);
572 797 : 1 : }
798 : :
799 : : /*
800 : : * Insert tuple represented in the slot to the relation, update the indexes,
801 : : * and execute any constraints and per-row triggers.
802 : : *
803 : : * Caller is responsible for opening the indexes.
804 : : */
805 : : void
1978 heikki.linnakangas@i 806 : 76014 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
807 : : EState *estate, TupleTableSlot *slot)
808 : : {
3224 bruce@momjian.us 809 : 76014 : bool skip_tuple = false;
810 : 76014 : Relation rel = resultRelInfo->ri_RelationDesc;
811 : :
812 : : /* For now we support only tables. */
3342 peter_e@gmx.net 813 [ - + ]: 76014 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
814 : :
815 : 76014 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
816 : :
817 : : /* BEFORE ROW INSERT Triggers */
818 [ + + ]: 76014 : if (resultRelInfo->ri_TrigDesc &&
819 [ + + ]: 20 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
820 : : {
2574 andres@anarazel.de 821 [ + + ]: 3 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
2489 tgl@sss.pgh.pa.us 822 : 1 : skip_tuple = true; /* "do nothing" */
823 : : }
824 : :
3342 peter_e@gmx.net 825 [ + + ]: 76014 : if (!skip_tuple)
826 : : {
827 : 76013 : List *recheckIndexes = NIL;
828 : : List *conflictindexes;
572 akapila@postgresql.o 829 : 76013 : bool conflict = false;
830 : :
831 : : /* Compute stored generated columns */
2542 peter@eisentraut.org 832 [ + + ]: 76013 : if (rel->rd_att->constr &&
833 [ + + ]: 45503 : rel->rd_att->constr->has_generated_stored)
1978 heikki.linnakangas@i 834 : 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
835 : : CMD_INSERT);
836 : :
837 : : /* Check the constraints of the tuple */
3342 peter_e@gmx.net 838 [ + + ]: 76013 : if (rel->rd_att->constr)
2834 alvherre@alvh.no-ip. 839 : 45503 : ExecConstraints(resultRelInfo, slot, estate);
2006 tgl@sss.pgh.pa.us 840 [ + + ]: 76013 : if (rel->rd_rel->relispartition)
2834 alvherre@alvh.no-ip. 841 : 82 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
842 : :
843 : : /* OK, store the tuple and create index entries for it */
703 akorotkov@postgresql 844 : 76013 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
845 : :
572 akapila@postgresql.o 846 : 76013 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
847 : :
703 akorotkov@postgresql 848 [ + + ]: 76013 : if (resultRelInfo->ri_NumIndices > 0)
849 : : {
850 : : bits32 flags;
851 : :
26 alvherre@kurilemu.de 852 [ + + ]:GNC 55609 : if (conflictindexes != NIL)
853 : 55605 : flags = EIIT_NO_DUPE_ERROR;
854 : : else
855 : 4 : flags = 0;
1978 heikki.linnakangas@i 856 :CBC 55609 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
857 : : estate, flags,
858 : : slot, conflictindexes,
859 : : &conflict);
860 : : }
861 : :
862 : : /*
863 : : * Checks the conflict indexes to fetch the conflicting local row and
864 : : * reports the conflict. We perform this check here, instead of
865 : : * performing an additional index scan before the actual insertion and
866 : : * reporting the conflict if any conflicting rows are found. This is
867 : : * to avoid the overhead of executing the extra scan for each INSERT
868 : : * operation, even when no conflict arises, which could introduce
869 : : * significant overhead to replication, particularly in cases where
870 : : * conflicts are rare.
871 : : *
872 : : * XXX OTOH, this could lead to clean-up effort for dead tuples added
873 : : * in heap and index in case of conflicts. But as conflicts shouldn't
874 : : * be a frequent thing so we preferred to save the performance
875 : : * overhead of extra scan before each insertion.
876 : : */
572 akapila@postgresql.o 877 [ + + ]: 76013 : if (conflict)
878 : 32 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
879 : : recheckIndexes, NULL, slot);
880 : :
881 : : /* AFTER ROW INSERT Triggers */
2574 andres@anarazel.de 882 : 75982 : ExecARInsertTriggers(estate, resultRelInfo, slot,
883 : : recheckIndexes, NULL);
884 : :
885 : : /*
886 : : * XXX we should in theory pass a TransitionCaptureState object to the
887 : : * above to capture transition tuples, but after statement triggers
888 : : * don't actually get fired by replication yet anyway
889 : : */
890 : :
3342 peter_e@gmx.net 891 : 75982 : list_free(recheckIndexes);
892 : : }
893 : 75983 : }
894 : :
895 : : /*
896 : : * Find the searchslot tuple and update it with data in the slot,
897 : : * update the indexes, and execute any constraints and per-row triggers.
898 : : *
899 : : * Caller is responsible for opening the indexes.
900 : : */
901 : : void
1978 heikki.linnakangas@i 902 : 31923 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
903 : : EState *estate, EPQState *epqstate,
904 : : TupleTableSlot *searchslot, TupleTableSlot *slot)
905 : : {
3224 bruce@momjian.us 906 : 31923 : bool skip_tuple = false;
907 : 31923 : Relation rel = resultRelInfo->ri_RelationDesc;
2549 andres@anarazel.de 908 : 31923 : ItemPointer tid = &(searchslot->tts_tid);
909 : :
910 : : /*
911 : : * We support only non-system tables, with
912 : : * check_publication_add_relation() accountable.
913 : : */
3342 peter_e@gmx.net 914 [ - + ]: 31923 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
537 noah@leadboat.com 915 [ - + ]: 31923 : Assert(!IsCatalogRelation(rel));
916 : :
3342 peter_e@gmx.net 917 : 31923 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
918 : :
919 : : /* BEFORE ROW UPDATE Triggers */
920 [ + + ]: 31923 : if (resultRelInfo->ri_TrigDesc &&
921 [ + + ]: 10 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
922 : : {
2574 andres@anarazel.de 923 [ + + ]: 3 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
924 : : tid, NULL, slot, NULL, NULL, false))
2489 tgl@sss.pgh.pa.us 925 : 2 : skip_tuple = true; /* "do nothing" */
926 : : }
927 : :
3342 peter_e@gmx.net 928 [ + + ]: 31923 : if (!skip_tuple)
929 : : {
930 : 31921 : List *recheckIndexes = NIL;
931 : : TU_UpdateIndexes update_indexes;
932 : : List *conflictindexes;
572 akapila@postgresql.o 933 : 31921 : bool conflict = false;
934 : :
935 : : /* Compute stored generated columns */
2542 peter@eisentraut.org 936 [ + + ]: 31921 : if (rel->rd_att->constr &&
937 [ + + ]: 31876 : rel->rd_att->constr->has_generated_stored)
1978 heikki.linnakangas@i 938 : 2 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
939 : : CMD_UPDATE);
940 : :
941 : : /* Check the constraints of the tuple */
3342 peter_e@gmx.net 942 [ + + ]: 31921 : if (rel->rd_att->constr)
2834 alvherre@alvh.no-ip. 943 : 31876 : ExecConstraints(resultRelInfo, slot, estate);
2006 tgl@sss.pgh.pa.us 944 [ + + ]: 31921 : if (rel->rd_rel->relispartition)
2834 alvherre@alvh.no-ip. 945 : 12 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
946 : :
2488 andres@anarazel.de 947 : 31921 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
948 : : &update_indexes);
949 : :
572 akapila@postgresql.o 950 : 31921 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
951 : :
1091 tomas.vondra@postgre 952 [ + + + + ]: 31921 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
953 : : {
26 alvherre@kurilemu.de 954 :GNC 20221 : bits32 flags = EIIT_IS_UPDATE;
955 : :
956 [ + + ]: 20221 : if (conflictindexes != NIL)
957 : 20212 : flags |= EIIT_NO_DUPE_ERROR;
958 [ - + ]: 20221 : if (update_indexes == TU_Summarizing)
26 alvherre@kurilemu.de 959 :UNC 0 : flags |= EIIT_ONLY_SUMMARIZING;
1978 heikki.linnakangas@i 960 :CBC 20221 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
961 : : estate, flags,
962 : : slot, conflictindexes,
963 : : &conflict);
964 : : }
965 : :
966 : : /*
967 : : * Refer to the comments above the call to CheckAndReportConflict() in
968 : : * ExecSimpleRelationInsert to understand why this check is done at
969 : : * this point.
970 : : */
572 akapila@postgresql.o 971 [ + + ]: 31921 : if (conflict)
972 : 2 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
973 : : recheckIndexes, searchslot, slot);
974 : :
975 : : /* AFTER ROW UPDATE Triggers */
3342 peter_e@gmx.net 976 : 31919 : ExecARUpdateTriggers(estate, resultRelInfo,
977 : : NULL, NULL,
978 : : tid, NULL, slot,
979 : : recheckIndexes, NULL, false);
980 : :
981 : 31919 : list_free(recheckIndexes);
982 : : }
983 : 31921 : }
984 : :
985 : : /*
986 : : * Find the searchslot tuple and delete it, and execute any constraints
987 : : * and per-row triggers.
988 : : *
989 : : * Caller is responsible for opening the indexes.
990 : : */
991 : : void
1978 heikki.linnakangas@i 992 : 40313 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
993 : : EState *estate, EPQState *epqstate,
994 : : TupleTableSlot *searchslot)
995 : : {
3224 bruce@momjian.us 996 : 40313 : bool skip_tuple = false;
997 : 40313 : Relation rel = resultRelInfo->ri_RelationDesc;
2549 andres@anarazel.de 998 : 40313 : ItemPointer tid = &searchslot->tts_tid;
999 : :
3342 peter_e@gmx.net 1000 : 40313 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
1001 : :
1002 : : /* BEFORE ROW DELETE Triggers */
1003 [ + + ]: 40313 : if (resultRelInfo->ri_TrigDesc &&
3076 rhaas@postgresql.org 1004 [ - + ]: 10 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
1005 : : {
3342 peter_e@gmx.net 1006 :UBC 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
240 dean.a.rasheed@gmail 1007 : 0 : tid, NULL, NULL, NULL, NULL, false);
1008 : : }
1009 : :
3342 peter_e@gmx.net 1010 [ + - ]:CBC 40313 : if (!skip_tuple)
1011 : : {
1012 : : /* OK, delete the tuple */
703 akorotkov@postgresql 1013 : 40313 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
1014 : :
1015 : : /* AFTER ROW DELETE Triggers */
3342 peter_e@gmx.net 1016 : 40313 : ExecARDeleteTriggers(estate, resultRelInfo,
1017 : : tid, NULL, NULL, false);
1018 : : }
1019 : 40313 : }
1020 : :
1021 : : /*
1022 : : * Check if command can be executed with current replica identity.
1023 : : */
1024 : : void
1025 : 215267 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
1026 : : {
1027 : : PublicationDesc pubdesc;
1028 : :
1029 : : /*
1030 : : * Skip checking the replica identity for partitioned tables, because the
1031 : : * operations are actually performed on the leaf partitions.
1032 : : */
1307 akapila@postgresql.o 1033 [ + + ]: 215267 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1034 : 202987 : return;
1035 : :
1036 : : /* We only need to do checks for UPDATE and DELETE. */
3342 peter_e@gmx.net 1037 [ + + + + ]: 212795 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1038 : 124438 : return;
1039 : :
1040 : : /*
1041 : : * It is only safe to execute UPDATE/DELETE if the relation does not
1042 : : * publish UPDATEs or DELETEs, or all the following conditions are
1043 : : * satisfied:
1044 : : *
1045 : : * 1. All columns, referenced in the row filters from publications which
1046 : : * the relation is in, are valid - i.e. when all referenced columns are
1047 : : * part of REPLICA IDENTITY.
1048 : : *
1049 : : * 2. All columns, referenced in the column lists are valid - i.e. when
1050 : : * all columns referenced in the REPLICA IDENTITY are covered by the
1051 : : * column list.
1052 : : *
1053 : : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1054 : : * - i.e. when all these generated columns are published.
1055 : : *
1056 : : * XXX We could optimize it by first checking whether any of the
1057 : : * publications have a row filter or column list for this relation, or if
1058 : : * the relation contains a generated column. If none of these exist and
1059 : : * the relation has replica identity then we can avoid building the
1060 : : * descriptor but as this happens only one time it doesn't seem worth the
1061 : : * additional complexity.
1062 : : */
1482 akapila@postgresql.o 1063 : 88357 : RelationBuildPublicationDesc(rel, &pubdesc);
1064 [ + + + + ]: 88357 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1065 [ + - ]: 30 : ereport(ERROR,
1066 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1067 : : errmsg("cannot update table \"%s\"",
1068 : : RelationGetRelationName(rel)),
1069 : : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1450 tomas.vondra@postgre 1070 [ + + + + ]: 88327 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1071 [ + - ]: 54 : ereport(ERROR,
1072 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1073 : : errmsg("cannot update table \"%s\"",
1074 : : RelationGetRelationName(rel)),
1075 : : errdetail("Column list used by the publication does not cover the replica identity.")));
466 akapila@postgresql.o 1076 [ + + + + ]: 88273 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1077 [ + - ]: 12 : ereport(ERROR,
1078 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1079 : : errmsg("cannot update table \"%s\"",
1080 : : RelationGetRelationName(rel)),
1081 : : errdetail("Replica identity must not contain unpublished generated columns.")));
1482 1082 [ + + - + ]: 88261 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1482 akapila@postgresql.o 1083 [ # # ]:UBC 0 : ereport(ERROR,
1084 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1085 : : errmsg("cannot delete from table \"%s\"",
1086 : : RelationGetRelationName(rel)),
1087 : : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1450 tomas.vondra@postgre 1088 [ + + - + ]:CBC 88261 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1450 tomas.vondra@postgre 1089 [ # # ]:UBC 0 : ereport(ERROR,
1090 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1091 : : errmsg("cannot delete from table \"%s\"",
1092 : : RelationGetRelationName(rel)),
1093 : : errdetail("Column list used by the publication does not cover the replica identity.")));
466 akapila@postgresql.o 1094 [ + + - + ]:CBC 88261 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
466 akapila@postgresql.o 1095 [ # # ]:UBC 0 : ereport(ERROR,
1096 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1097 : : errmsg("cannot delete from table \"%s\"",
1098 : : RelationGetRelationName(rel)),
1099 : : errdetail("Replica identity must not contain unpublished generated columns.")));
1100 : :
1101 : : /* If relation has replica identity we are always good. */
1482 akapila@postgresql.o 1102 [ + + ]:CBC 88261 : if (OidIsValid(RelationGetReplicaIndex(rel)))
3342 peter_e@gmx.net 1103 : 75846 : return;
1104 : :
1105 : : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1450 tomas.vondra@postgre 1106 [ + + ]: 12415 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1107 : 231 : return;
1108 : :
1109 : : /*
1110 : : * This is UPDATE/DELETE and there is no replica identity.
1111 : : *
1112 : : * Check if the table publishes UPDATES or DELETES.
1113 : : */
1482 akapila@postgresql.o 1114 [ + + + + ]: 12184 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
3342 peter_e@gmx.net 1115 [ + - ]: 62 : ereport(ERROR,
1116 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1117 : : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1118 : : RelationGetRelationName(rel)),
1119 : : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1482 akapila@postgresql.o 1120 [ + + + + ]: 12122 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
3342 peter_e@gmx.net 1121 [ + - ]: 8 : ereport(ERROR,
1122 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1123 : : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1124 : : RelationGetRelationName(rel)),
1125 : : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1126 : : }
1127 : :
1128 : :
1129 : : /*
1130 : : * Check if we support writing into specific relkind of local relation and check
1131 : : * if it aligns with the relkind of the relation on the publisher.
1132 : : *
1133 : : * The nspname and relname are only needed for error reporting.
1134 : : */
1135 : : void
143 akapila@postgresql.o 1136 :GNC 983 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
1137 : : const char *nspname, const char *relname)
1138 : : {
1139 [ + + + + ]: 983 : if (localrelkind != RELKIND_RELATION &&
1140 [ - + ]: 17 : localrelkind != RELKIND_PARTITIONED_TABLE &&
1141 : : localrelkind != RELKIND_SEQUENCE)
3225 peter_e@gmx.net 1142 [ # # ]:UBC 0 : ereport(ERROR,
1143 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1144 : : errmsg("cannot use relation \"%s.%s\" as logical replication target",
1145 : : nspname, relname),
1146 : : errdetail_relkind_not_supported(localrelkind)));
1147 : :
1148 : : /*
1149 : : * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
1150 : : * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
1151 : : * exactly on both publisher and subscriber.
1152 : : */
143 akapila@postgresql.o 1153 [ + + + - :GNC 983 : if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
+ + ]
1154 [ - + ]: 966 : (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
143 akapila@postgresql.o 1155 [ # # # # :UNC 0 : ereport(ERROR,
# # ]
1156 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1157 : : /* translator: 3rd and 4th %s are "sequence" or "table" */
1158 : : errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
1159 : : nspname, relname,
1160 : : remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
1161 : : localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
3225 peter_e@gmx.net 1162 :CBC 983 : }
|