Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execReplication.c
4 : : * miscellaneous executor routines for logical replication
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/execReplication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/commit_ts.h"
18 : : #include "access/genam.h"
19 : : #include "access/gist.h"
20 : : #include "access/relscan.h"
21 : : #include "access/tableam.h"
22 : : #include "access/transam.h"
23 : : #include "access/xact.h"
24 : : #include "access/heapam.h"
25 : : #include "catalog/pg_am_d.h"
26 : : #include "commands/trigger.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeModifyTable.h"
29 : : #include "replication/conflict.h"
30 : : #include "replication/logicalrelation.h"
31 : : #include "storage/lmgr.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/lsyscache.h"
34 : : #include "utils/rel.h"
35 : : #include "utils/snapmgr.h"
36 : : #include "utils/syscache.h"
37 : : #include "utils/typcache.h"
38 : :
39 : :
40 : : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
41 : : TypeCacheEntry **eq, Bitmapset *columns);
42 : :
43 : : /*
44 : : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
45 : : * is setup to match 'rel' (*NOT* idxrel!).
46 : : *
47 : : * Returns how many columns to use for the index scan.
48 : : *
49 : : * This is not generic routine, idxrel must be PK, RI, or an index that can be
50 : : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
51 : : * for details.
52 : : *
53 : : * By definition, replication identity of a rel meets all limitations associated
54 : : * with that. Note that any other index could also meet these limitations.
55 : : */
56 : : static int
3152 peter_e@gmx.net 57 :CBC 72106 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
58 : : TupleTableSlot *searchslot)
59 : : {
60 : : int index_attoff;
906 akapila@postgresql.o 61 : 72106 : int skey_attoff = 0;
62 : : Datum indclassDatum;
63 : : oidvector *opclass;
3152 peter_e@gmx.net 64 : 72106 : int2vector *indkey = &idxrel->rd_index->indkey;
65 : :
896 dgustafsson@postgres 66 : 72106 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
67 : : Anum_pg_index_indclass);
3152 peter_e@gmx.net 68 : 72106 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
69 : :
70 : : /* Build scankey for every non-expression attribute in the index. */
906 akapila@postgresql.o 71 [ + + ]: 144235 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
72 : 72129 : index_attoff++)
73 : : {
74 : : Oid operator;
75 : : Oid optype;
76 : : Oid opfamily;
77 : : RegProcedure regop;
78 : 72129 : int table_attno = indkey->values[index_attoff];
79 : : StrategyNumber eq_strategy;
80 : :
81 [ + + ]: 72129 : if (!AttributeNumberIsValid(table_attno))
82 : : {
83 : : /*
84 : : * XXX: Currently, we don't support expressions in the scan key,
85 : : * see code below.
86 : : */
87 : 2 : continue;
88 : : }
89 : :
90 : : /*
91 : : * Load the operator info. We need this to get the equality operator
92 : : * function for the scan key.
93 : : */
94 : 72127 : optype = get_opclass_input_type(opclass->values[index_attoff]);
95 : 72127 : opfamily = get_opclass_family(opclass->values[index_attoff]);
197 peter@eisentraut.org 96 : 72127 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
3152 peter_e@gmx.net 97 : 72127 : operator = get_opfamily_member(opfamily, optype,
98 : : optype,
99 : : eq_strategy);
100 : :
101 [ - + ]: 72127 : if (!OidIsValid(operator))
2966 tgl@sss.pgh.pa.us 102 [ # # ]:UBC 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
103 : : eq_strategy, optype, optype, opfamily);
104 : :
3152 peter_e@gmx.net 105 :CBC 72127 : regop = get_opcode(operator);
106 : :
107 : : /* Initialize the scankey. */
906 akapila@postgresql.o 108 : 72127 : ScanKeyInit(&skey[skey_attoff],
109 : 72127 : index_attoff + 1,
110 : : eq_strategy,
111 : : regop,
112 : 72127 : searchslot->tts_values[table_attno - 1]);
113 : :
114 : 72127 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
115 : :
116 : : /* Check for null value. */
117 [ + + ]: 72127 : if (searchslot->tts_isnull[table_attno - 1])
118 : 1 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
119 : :
120 : 72127 : skey_attoff++;
121 : : }
122 : :
123 : : /* There must always be at least one attribute for the index scan. */
124 [ - + ]: 72106 : Assert(skey_attoff > 0);
125 : :
126 : 72106 : return skey_attoff;
127 : : }
128 : :
129 : :
130 : : /*
131 : : * Helper function to check if it is necessary to re-fetch and lock the tuple
132 : : * due to concurrent modifications. This function should be called after
133 : : * invoking table_tuple_lock.
134 : : */
135 : : static bool
382 136 : 72279 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
137 : : {
138 : 72279 : bool refetch = false;
139 : :
140 [ + - - - : 72279 : switch (res)
- ]
141 : : {
142 : 72279 : case TM_Ok:
143 : 72279 : break;
382 akapila@postgresql.o 144 :UBC 0 : case TM_Updated:
145 : : /* XXX: Improve handling here */
146 [ # # ]: 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
147 [ # # ]: 0 : ereport(LOG,
148 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
149 : : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
150 : : else
151 [ # # ]: 0 : ereport(LOG,
152 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
153 : : errmsg("concurrent update, retrying")));
154 : 0 : refetch = true;
155 : 0 : break;
156 : 0 : case TM_Deleted:
157 : : /* XXX: Improve handling here */
158 [ # # ]: 0 : ereport(LOG,
159 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
160 : : errmsg("concurrent delete, retrying")));
161 : 0 : refetch = true;
162 : 0 : break;
163 : 0 : case TM_Invisible:
164 [ # # ]: 0 : elog(ERROR, "attempted to lock invisible tuple");
165 : : break;
166 : 0 : default:
167 [ # # ]: 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
168 : : break;
169 : : }
170 : :
382 akapila@postgresql.o 171 :CBC 72279 : return refetch;
172 : : }
173 : :
174 : : /*
175 : : * Search the relation 'rel' for tuple using the index.
176 : : *
177 : : * If a matching tuple is found, lock it with lockmode, fill the slot with its
178 : : * contents, and return true. Return false otherwise.
179 : : */
180 : : bool
3152 peter_e@gmx.net 181 : 72105 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
182 : : LockTupleMode lockmode,
183 : : TupleTableSlot *searchslot,
184 : : TupleTableSlot *outslot)
185 : : {
186 : : ScanKeyData skey[INDEX_MAX_KEYS];
187 : : int skey_attoff;
188 : : IndexScanDesc scan;
189 : : SnapshotData snap;
190 : : TransactionId xwait;
191 : : Relation idxrel;
192 : : bool found;
906 akapila@postgresql.o 193 : 72105 : TypeCacheEntry **eq = NULL;
194 : : bool isIdxSafeToSkipDuplicates;
195 : :
196 : : /* Open the index. */
3152 peter_e@gmx.net 197 : 72105 : idxrel = index_open(idxoid, RowExclusiveLock);
198 : :
906 akapila@postgresql.o 199 : 72105 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
200 : :
3152 peter_e@gmx.net 201 : 72105 : InitDirtySnapshot(snap);
202 : :
203 : : /* Build scan key. */
906 akapila@postgresql.o 204 : 72105 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
205 : :
206 : : /* Start an index scan. */
179 pg@bowt.ie 207 : 72105 : scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
208 : :
3152 peter_e@gmx.net 209 :UBC 0 : retry:
3152 peter_e@gmx.net 210 :CBC 72105 : found = false;
211 : :
906 akapila@postgresql.o 212 : 72105 : index_rescan(scan, skey, skey_attoff, NULL, 0);
213 : :
214 : : /* Try to find the tuple */
215 [ + + ]: 72105 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
216 : : {
217 : : /*
218 : : * Avoid expensive equality check if the index is primary key or
219 : : * replica identity index.
220 : : */
221 [ + + ]: 72088 : if (!isIdxSafeToSkipDuplicates)
222 : : {
223 [ + - ]: 17 : if (eq == NULL)
224 : 17 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
225 : :
33 akapila@postgresql.o 226 [ - + ]:GNC 17 : if (!tuples_equal(outslot, searchslot, eq, NULL))
906 akapila@postgresql.o 227 :UBC 0 : continue;
228 : : }
229 : :
3152 peter_e@gmx.net 230 :CBC 72088 : ExecMaterializeSlot(outslot);
231 : :
232 : 144176 : xwait = TransactionIdIsValid(snap.xmin) ?
233 [ - + ]: 72088 : snap.xmin : snap.xmax;
234 : :
235 : : /*
236 : : * If the tuple is locked, wait for locking transaction to finish and
237 : : * retry.
238 : : */
239 [ - + ]: 72088 : if (TransactionIdIsValid(xwait))
240 : : {
3152 peter_e@gmx.net 241 :UBC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
242 : 0 : goto retry;
243 : : }
244 : :
245 : : /* Found our tuple and it's not locked */
906 akapila@postgresql.o 246 :CBC 72088 : found = true;
247 : 72088 : break;
248 : : }
249 : :
250 : : /* Found tuple, try to lock it in the lockmode. */
3152 peter_e@gmx.net 251 [ + + ]: 72105 : if (found)
252 : : {
253 : : TM_FailureData tmfd;
254 : : TM_Result res;
255 : :
256 : 72088 : PushActiveSnapshot(GetLatestSnapshot());
257 : :
180 heikki.linnakangas@i 258 : 72088 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
259 : : outslot,
260 : : GetCurrentCommandId(false),
261 : : lockmode,
262 : : LockWaitBlock,
263 : : 0 /* don't follow updates */ ,
264 : : &tmfd);
265 : :
3152 peter_e@gmx.net 266 : 72088 : PopActiveSnapshot();
267 : :
382 akapila@postgresql.o 268 [ - + ]: 72088 : if (should_refetch_tuple(res, &tmfd))
382 akapila@postgresql.o 269 :UBC 0 : goto retry;
270 : : }
271 : :
3152 peter_e@gmx.net 272 :CBC 72105 : index_endscan(scan);
273 : :
274 : : /* Don't release lock until commit. */
275 : 72105 : index_close(idxrel, NoLock);
276 : :
277 : 72105 : return found;
278 : : }
279 : :
280 : : /*
281 : : * Compare the tuples in the slots by checking if they have equal values.
282 : : *
283 : : * If 'columns' is not null, only the columns specified within it will be
284 : : * considered for the equality check, ignoring all other columns.
285 : : */
286 : : static bool
1974 noah@leadboat.com 287 : 105324 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
288 : : TypeCacheEntry **eq, Bitmapset *columns)
289 : : {
290 : : int attrnum;
291 : :
2371 andres@anarazel.de 292 [ - + ]: 105324 : Assert(slot1->tts_tupleDescriptor->natts ==
293 : : slot2->tts_tupleDescriptor->natts);
294 : :
295 : 105324 : slot_getallattrs(slot1);
296 : 105324 : slot_getallattrs(slot2);
297 : :
298 : : /* Check equality of the attributes. */
299 [ + + ]: 105526 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
300 : : {
301 : : Form_pg_attribute att;
302 : : TypeCacheEntry *typentry;
303 : :
900 akapila@postgresql.o 304 : 105361 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
305 : :
306 : : /*
307 : : * Ignore dropped and generated columns as the publisher doesn't send
308 : : * those
309 : : */
898 310 [ + + - + ]: 105361 : if (att->attisdropped || att->attgenerated)
900 311 : 1 : continue;
312 : :
313 : : /*
314 : : * Ignore columns that are not listed for checking.
315 : : */
33 akapila@postgresql.o 316 [ - + ]:GNC 105360 : if (columns &&
33 akapila@postgresql.o 317 [ # # ]:UNC 0 : !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
318 : : columns))
319 : 0 : continue;
320 : :
321 : : /*
322 : : * If one value is NULL and other is not, then they are certainly not
323 : : * equal
324 : : */
2371 andres@anarazel.de 325 [ - + ]:CBC 105360 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
3152 peter_e@gmx.net 326 :UBC 0 : return false;
327 : :
328 : : /*
329 : : * If both are NULL, they can be considered equal.
330 : : */
2371 andres@anarazel.de 331 [ + + - + ]:CBC 105360 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
3152 peter_e@gmx.net 332 : 1 : continue;
333 : :
1974 noah@leadboat.com 334 : 105359 : typentry = eq[attrnum];
335 [ + + ]: 105359 : if (typentry == NULL)
336 : : {
337 : 202 : typentry = lookup_type_cache(att->atttypid,
338 : : TYPECACHE_EQ_OPR_FINFO);
339 [ - + ]: 202 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1974 noah@leadboat.com 340 [ # # ]:UBC 0 : ereport(ERROR,
341 : : (errcode(ERRCODE_UNDEFINED_FUNCTION),
342 : : errmsg("could not identify an equality operator for type %s",
343 : : format_type_be(att->atttypid))));
1974 noah@leadboat.com 344 :CBC 202 : eq[attrnum] = typentry;
345 : : }
346 : :
2360 peter@eisentraut.org 347 [ + + ]: 105359 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
348 : : att->attcollation,
2299 tgl@sss.pgh.pa.us 349 : 105359 : slot1->tts_values[attrnum],
350 : 105359 : slot2->tts_values[attrnum])))
3152 peter_e@gmx.net 351 : 105159 : return false;
352 : : }
353 : :
354 : 165 : return true;
355 : : }
356 : :
357 : : /*
358 : : * Search the relation 'rel' for tuple using the sequential scan.
359 : : *
360 : : * If a matching tuple is found, lock it with lockmode, fill the slot with its
361 : : * contents, and return true. Return false otherwise.
362 : : *
363 : : * Note that this stops on the first matching tuple.
364 : : *
365 : : * This can obviously be quite slow on tables that have more than few rows.
366 : : */
367 : : bool
368 : 150 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
369 : : TupleTableSlot *searchslot, TupleTableSlot *outslot)
370 : : {
371 : : TupleTableSlot *scanslot;
372 : : TableScanDesc scan;
373 : : SnapshotData snap;
374 : : TypeCacheEntry **eq;
375 : : TransactionId xwait;
376 : : bool found;
2371 andres@anarazel.de 377 : 150 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
378 : :
3152 peter_e@gmx.net 379 [ - + ]: 150 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
380 : :
1974 noah@leadboat.com 381 : 150 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
382 : :
383 : : /* Start a heap scan. */
3152 peter_e@gmx.net 384 : 150 : InitDirtySnapshot(snap);
2371 andres@anarazel.de 385 : 150 : scan = table_beginscan(rel, &snap, 0, NULL);
386 : 150 : scanslot = table_slot_create(rel, NULL);
387 : :
3152 peter_e@gmx.net 388 :UBC 0 : retry:
3152 peter_e@gmx.net 389 :CBC 150 : found = false;
390 : :
2371 andres@anarazel.de 391 : 150 : table_rescan(scan, NULL);
392 : :
393 : : /* Try to find the tuple */
394 [ + + ]: 105309 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
395 : : {
33 akapila@postgresql.o 396 [ + + ]:GNC 105306 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
3152 peter_e@gmx.net 397 :CBC 105159 : continue;
398 : :
399 : 147 : found = true;
2371 andres@anarazel.de 400 : 147 : ExecCopySlot(outslot, scanslot);
401 : :
3152 peter_e@gmx.net 402 : 294 : xwait = TransactionIdIsValid(snap.xmin) ?
403 [ - + ]: 147 : snap.xmin : snap.xmax;
404 : :
405 : : /*
406 : : * If the tuple is locked, wait for locking transaction to finish and
407 : : * retry.
408 : : */
409 [ - + ]: 147 : if (TransactionIdIsValid(xwait))
410 : : {
3152 peter_e@gmx.net 411 :UBC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
412 : 0 : goto retry;
413 : : }
414 : :
415 : : /* Found our tuple and it's not locked */
2042 alvherre@alvh.no-ip. 416 :CBC 147 : break;
417 : : }
418 : :
419 : : /* Found tuple, try to lock it in the lockmode. */
3152 peter_e@gmx.net 420 [ + + ]: 150 : if (found)
421 : : {
422 : : TM_FailureData tmfd;
423 : : TM_Result res;
424 : :
425 : 147 : PushActiveSnapshot(GetLatestSnapshot());
426 : :
180 heikki.linnakangas@i 427 : 147 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
428 : : outslot,
429 : : GetCurrentCommandId(false),
430 : : lockmode,
431 : : LockWaitBlock,
432 : : 0 /* don't follow updates */ ,
433 : : &tmfd);
434 : :
3152 peter_e@gmx.net 435 : 147 : PopActiveSnapshot();
436 : :
382 akapila@postgresql.o 437 [ - + ]: 147 : if (should_refetch_tuple(res, &tmfd))
382 akapila@postgresql.o 438 :UBC 0 : goto retry;
439 : : }
440 : :
2371 andres@anarazel.de 441 :CBC 150 : table_endscan(scan);
442 : 150 : ExecDropSingleTupleTableSlot(scanslot);
443 : :
3152 peter_e@gmx.net 444 : 150 : return found;
445 : : }
446 : :
447 : : /*
448 : : * Build additional index information necessary for conflict detection.
449 : : */
450 : : static void
151 akapila@postgresql.o 451 : 46 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
452 : : {
453 [ + + ]: 138 : for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
454 : : {
455 : 92 : Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
456 : 92 : IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
457 : :
458 [ + + ]: 92 : if (conflictindex != RelationGetRelid(indexRelation))
459 : 46 : continue;
460 : :
461 : : /*
462 : : * This Assert will fail if BuildSpeculativeIndexInfo() is called
463 : : * twice for the given index.
464 : : */
465 [ - + ]: 46 : Assert(indexRelationInfo->ii_UniqueOps == NULL);
466 : :
467 : 46 : BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
468 : : }
469 : 46 : }
470 : :
471 : : /*
472 : : * If the tuple is recently dead and was deleted by a transaction with a newer
473 : : * commit timestamp than previously recorded, update the associated transaction
474 : : * ID, commit time, and origin. This helps ensure that conflict detection uses
475 : : * the most recent and relevant deletion metadata.
476 : : */
477 : : static void
33 akapila@postgresql.o 478 :GNC 2 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
479 : : TransactionId oldestxmin,
480 : : TransactionId *delete_xid,
481 : : TimestampTz *delete_time,
482 : : RepOriginId *delete_origin)
483 : : {
484 : : BufferHeapTupleTableSlot *hslot;
485 : : HeapTuple tuple;
486 : : Buffer buf;
487 : 2 : bool recently_dead = false;
488 : : TransactionId xmax;
489 : : TimestampTz localts;
490 : : RepOriginId localorigin;
491 : :
492 : 2 : hslot = (BufferHeapTupleTableSlot *) scanslot;
493 : :
494 : 2 : tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
495 : 2 : buf = hslot->buffer;
496 : :
497 : 2 : LockBuffer(buf, BUFFER_LOCK_SHARE);
498 : :
499 : : /*
500 : : * We do not consider HEAPTUPLE_DEAD status because it indicates either
501 : : * tuples whose inserting transaction was aborted (meaning there is no
502 : : * commit timestamp or origin), or tuples deleted by a transaction older
503 : : * than oldestxmin, making it safe to ignore them during conflict
504 : : * detection (See comments atop worker.c for details).
505 : : */
506 [ + - ]: 2 : if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
507 : 2 : recently_dead = true;
508 : :
509 : 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
510 : :
511 [ - + ]: 2 : if (!recently_dead)
33 akapila@postgresql.o 512 :UNC 0 : return;
513 : :
33 akapila@postgresql.o 514 :GNC 2 : xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
515 [ - + ]: 2 : if (!TransactionIdIsValid(xmax))
33 akapila@postgresql.o 516 :UNC 0 : return;
517 : :
518 : : /* Select the dead tuple with the most recent commit timestamp */
33 akapila@postgresql.o 519 [ + - + - ]:GNC 4 : if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
520 : 2 : TimestampDifferenceExceeds(*delete_time, localts, 0))
521 : : {
522 : 2 : *delete_xid = xmax;
523 : 2 : *delete_time = localts;
524 : 2 : *delete_origin = localorigin;
525 : : }
526 : : }
527 : :
528 : : /*
529 : : * Searches the relation 'rel' for the most recently deleted tuple that matches
530 : : * the values in 'searchslot' and is not yet removable by VACUUM. The function
531 : : * returns the transaction ID, origin, and commit timestamp of the transaction
532 : : * that deleted this tuple.
533 : : *
534 : : * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
535 : : * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
536 : : * conflict detection.
537 : : *
538 : : * Instead of stopping at the first match, we scan all matching dead tuples to
539 : : * identify most recent deletion. This is crucial because only the latest
540 : : * deletion is relevant for resolving conflicts.
541 : : *
542 : : * For example, consider a scenario on the subscriber where a row is deleted,
543 : : * re-inserted, and then deleted again only on the subscriber:
544 : : *
545 : : * - (pk, 1) - deleted at 9:00,
546 : : * - (pk, 1) - deleted at 9:02,
547 : : *
548 : : * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
549 : : *
550 : : * If we mistakenly return the older deletion (9:00), the system may wrongly
551 : : * apply the remote update using a last-update-wins strategy. Instead, we must
552 : : * recognize the more recent deletion at 9:02 and skip the update. See
553 : : * comments atop worker.c for details. Note, as of now, conflict resolution
554 : : * is not implemented. Consequently, the system may incorrectly report the
555 : : * older tuple as the conflicted one, leading to misleading results.
556 : : *
557 : : * The commit timestamp of the deleting transaction is used to determine which
558 : : * tuple was deleted most recently.
559 : : */
560 : : bool
561 : 1 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
562 : : TransactionId oldestxmin,
563 : : TransactionId *delete_xid,
564 : : RepOriginId *delete_origin,
565 : : TimestampTz *delete_time)
566 : : {
567 : : TupleTableSlot *scanslot;
568 : : TableScanDesc scan;
569 : : TypeCacheEntry **eq;
570 : : Bitmapset *indexbitmap;
571 : 1 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
572 : :
573 [ - + ]: 1 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
574 : :
575 : 1 : *delete_xid = InvalidTransactionId;
576 : 1 : *delete_origin = InvalidRepOriginId;
577 : 1 : *delete_time = 0;
578 : :
579 : : /*
580 : : * If the relation has a replica identity key or a primary key that is
581 : : * unusable for locating deleted tuples (see
582 : : * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
583 : : * necessary. In such cases, comparing the entire tuple is not required,
584 : : * since the remote tuple might not include all column values. Instead,
585 : : * the indexed columns alone are sufficient to identify the target tuple
586 : : * (see logicalrep_rel_mark_updatable).
587 : : */
588 : 1 : indexbitmap = RelationGetIndexAttrBitmap(rel,
589 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
590 : :
591 : : /* fallback to PK if no replica identity */
592 [ + - ]: 1 : if (!indexbitmap)
593 : 1 : indexbitmap = RelationGetIndexAttrBitmap(rel,
594 : : INDEX_ATTR_BITMAP_PRIMARY_KEY);
595 : :
596 : 1 : eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
597 : :
598 : : /*
599 : : * Start a heap scan using SnapshotAny to identify dead tuples that are
600 : : * not visible under a standard MVCC snapshot. Tuples from transactions
601 : : * not yet committed or those just committed prior to the scan are
602 : : * excluded in update_most_recent_deletion_info().
603 : : */
604 : 1 : scan = table_beginscan(rel, SnapshotAny, 0, NULL);
605 : 1 : scanslot = table_slot_create(rel, NULL);
606 : :
607 : 1 : table_rescan(scan, NULL);
608 : :
609 : : /* Try to find the tuple */
610 [ + + ]: 2 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
611 : : {
612 [ - + ]: 1 : if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
33 akapila@postgresql.o 613 :UNC 0 : continue;
614 : :
33 akapila@postgresql.o 615 :GNC 1 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
616 : : delete_time, delete_origin);
617 : : }
618 : :
619 : 1 : table_endscan(scan);
620 : 1 : ExecDropSingleTupleTableSlot(scanslot);
621 : :
622 : 1 : return *delete_time != 0;
623 : : }
624 : :
625 : : /*
626 : : * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
627 : : * the deleted tuple.
628 : : */
629 : : bool
630 : 1 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
631 : : TupleTableSlot *searchslot,
632 : : TransactionId oldestxmin,
633 : : TransactionId *delete_xid,
634 : : RepOriginId *delete_origin,
635 : : TimestampTz *delete_time)
636 : : {
637 : : Relation idxrel;
638 : : ScanKeyData skey[INDEX_MAX_KEYS];
639 : : int skey_attoff;
640 : : IndexScanDesc scan;
641 : : TupleTableSlot *scanslot;
642 : 1 : TypeCacheEntry **eq = NULL;
643 : : bool isIdxSafeToSkipDuplicates;
644 : 1 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
645 : :
646 [ - + ]: 1 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
647 [ - + ]: 1 : Assert(OidIsValid(idxoid));
648 : :
649 : 1 : *delete_xid = InvalidTransactionId;
650 : 1 : *delete_time = 0;
651 : 1 : *delete_origin = InvalidRepOriginId;
652 : :
653 : 1 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
654 : :
655 : 1 : scanslot = table_slot_create(rel, NULL);
656 : :
657 : 1 : idxrel = index_open(idxoid, RowExclusiveLock);
658 : :
659 : : /* Build scan key. */
660 : 1 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
661 : :
662 : : /*
663 : : * Start an index scan using SnapshotAny to identify dead tuples that are
664 : : * not visible under a standard MVCC snapshot. Tuples from transactions
665 : : * not yet committed or those just committed prior to the scan are
666 : : * excluded in update_most_recent_deletion_info().
667 : : */
668 : 1 : scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
669 : :
670 : 1 : index_rescan(scan, skey, skey_attoff, NULL, 0);
671 : :
672 : : /* Try to find the tuple */
673 [ + + ]: 2 : while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
674 : : {
675 : : /*
676 : : * Avoid expensive equality check if the index is primary key or
677 : : * replica identity index.
678 : : */
679 [ - + ]: 1 : if (!isIdxSafeToSkipDuplicates)
680 : : {
33 akapila@postgresql.o 681 [ # # ]:UNC 0 : if (eq == NULL)
682 : 0 : eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
683 : :
684 [ # # ]: 0 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
685 : 0 : continue;
686 : : }
687 : :
33 akapila@postgresql.o 688 :GNC 1 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
689 : : delete_time, delete_origin);
690 : : }
691 : :
692 : 1 : index_endscan(scan);
693 : :
694 : 1 : index_close(idxrel, NoLock);
695 : :
696 : 1 : ExecDropSingleTupleTableSlot(scanslot);
697 : :
698 : 1 : return *delete_time != 0;
699 : : }
700 : :
701 : : /*
702 : : * Find the tuple that violates the passed unique index (conflictindex).
703 : : *
704 : : * If the conflicting tuple is found return true, otherwise false.
705 : : *
706 : : * We lock the tuple to avoid getting it deleted before the caller can fetch
707 : : * the required information. Note that if the tuple is deleted before a lock
708 : : * is acquired, we will retry to find the conflicting tuple again.
709 : : */
710 : : static bool
382 akapila@postgresql.o 711 :CBC 46 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
712 : : Oid conflictindex, TupleTableSlot *slot,
713 : : TupleTableSlot **conflictslot)
714 : : {
715 : 46 : Relation rel = resultRelInfo->ri_RelationDesc;
716 : : ItemPointerData conflictTid;
717 : : TM_FailureData tmfd;
718 : : TM_Result res;
719 : :
720 : 46 : *conflictslot = NULL;
721 : :
722 : : /*
723 : : * Build additional information required to check constraints violations.
724 : : * See check_exclusion_or_unique_constraint().
725 : : */
151 726 : 46 : BuildConflictIndexInfo(resultRelInfo, conflictindex);
727 : :
382 728 : 46 : retry:
729 [ + + ]: 46 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
730 : : &conflictTid, &slot->tts_tid,
731 : 46 : list_make1_oid(conflictindex)))
732 : : {
733 [ - + ]: 1 : if (*conflictslot)
382 akapila@postgresql.o 734 :UBC 0 : ExecDropSingleTupleTableSlot(*conflictslot);
735 : :
382 akapila@postgresql.o 736 :CBC 1 : *conflictslot = NULL;
737 : 1 : return false;
738 : : }
739 : :
740 : 44 : *conflictslot = table_slot_create(rel, NULL);
741 : :
742 : 44 : PushActiveSnapshot(GetLatestSnapshot());
743 : :
180 heikki.linnakangas@i 744 : 44 : res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
745 : : *conflictslot,
746 : : GetCurrentCommandId(false),
747 : : LockTupleShare,
748 : : LockWaitBlock,
749 : : 0 /* don't follow updates */ ,
750 : : &tmfd);
751 : :
382 akapila@postgresql.o 752 : 44 : PopActiveSnapshot();
753 : :
754 [ - + ]: 44 : if (should_refetch_tuple(res, &tmfd))
382 akapila@postgresql.o 755 :UBC 0 : goto retry;
756 : :
382 akapila@postgresql.o 757 :CBC 44 : return true;
758 : : }
759 : :
760 : : /*
761 : : * Check all the unique indexes in 'recheckIndexes' for conflict with the
762 : : * tuple in 'remoteslot' and report if found.
763 : : */
764 : : static void
765 : 26 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
766 : : ConflictType type, List *recheckIndexes,
767 : : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
768 : : {
166 769 : 26 : List *conflicttuples = NIL;
770 : : TupleTableSlot *conflictslot;
771 : :
772 : : /* Check all the unique indexes for conflicts */
382 773 [ + - + + : 96 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ + ]
774 : : {
775 [ + - + + ]: 91 : if (list_member_oid(recheckIndexes, uniqueidx) &&
776 : 46 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
777 : : &conflictslot))
778 : : {
166 779 : 44 : ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
780 : :
781 : 44 : conflicttuple->slot = conflictslot;
782 : 44 : conflicttuple->indexoid = uniqueidx;
783 : :
784 : 44 : GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
785 : : &conflicttuple->origin, &conflicttuple->ts);
786 : :
787 : 44 : conflicttuples = lappend(conflicttuples, conflicttuple);
788 : : }
789 : : }
790 : :
791 : : /* Report the conflict, if found */
792 [ + + ]: 25 : if (conflicttuples)
793 [ + + ]: 24 : ReportApplyConflict(estate, resultRelInfo, ERROR,
794 : 24 : list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
795 : : searchslot, remoteslot, conflicttuples);
382 796 : 1 : }
797 : :
798 : : /*
799 : : * Insert tuple represented in the slot to the relation, update the indexes,
800 : : * and execute any constraints and per-row triggers.
801 : : *
802 : : * Caller is responsible for opening the indexes.
803 : : */
804 : : void
1788 heikki.linnakangas@i 805 : 75822 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
806 : : EState *estate, TupleTableSlot *slot)
807 : : {
3034 bruce@momjian.us 808 : 75822 : bool skip_tuple = false;
809 : 75822 : Relation rel = resultRelInfo->ri_RelationDesc;
810 : :
811 : : /* For now we support only tables. */
3152 peter_e@gmx.net 812 [ - + ]: 75822 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
813 : :
814 : 75822 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
815 : :
816 : : /* BEFORE ROW INSERT Triggers */
817 [ + + ]: 75822 : if (resultRelInfo->ri_TrigDesc &&
818 [ + + ]: 20 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
819 : : {
2384 andres@anarazel.de 820 [ + + ]: 3 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
2299 tgl@sss.pgh.pa.us 821 : 1 : skip_tuple = true; /* "do nothing" */
822 : : }
823 : :
3152 peter_e@gmx.net 824 [ + + ]: 75822 : if (!skip_tuple)
825 : : {
826 : 75821 : List *recheckIndexes = NIL;
827 : : List *conflictindexes;
382 akapila@postgresql.o 828 : 75821 : bool conflict = false;
829 : :
830 : : /* Compute stored generated columns */
2352 peter@eisentraut.org 831 [ + + ]: 75821 : if (rel->rd_att->constr &&
832 [ + + ]: 45472 : rel->rd_att->constr->has_generated_stored)
1788 heikki.linnakangas@i 833 : 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
834 : : CMD_INSERT);
835 : :
836 : : /* Check the constraints of the tuple */
3152 peter_e@gmx.net 837 [ + + ]: 75821 : if (rel->rd_att->constr)
2644 alvherre@alvh.no-ip. 838 : 45472 : ExecConstraints(resultRelInfo, slot, estate);
1816 tgl@sss.pgh.pa.us 839 [ + + ]: 75821 : if (rel->rd_rel->relispartition)
2644 alvherre@alvh.no-ip. 840 : 74 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
841 : :
842 : : /* OK, store the tuple and create index entries for it */
513 akorotkov@postgresql 843 : 75821 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
844 : :
382 akapila@postgresql.o 845 : 75821 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
846 : :
513 akorotkov@postgresql 847 [ + + ]: 75821 : if (resultRelInfo->ri_NumIndices > 0)
1788 heikki.linnakangas@i 848 : 55481 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
849 : : slot, estate, false,
850 : : conflictindexes ? true : false,
851 : : &conflict,
852 : : conflictindexes, false);
853 : :
854 : : /*
855 : : * Checks the conflict indexes to fetch the conflicting local row and
856 : : * reports the conflict. We perform this check here, instead of
857 : : * performing an additional index scan before the actual insertion and
858 : : * reporting the conflict if any conflicting rows are found. This is
859 : : * to avoid the overhead of executing the extra scan for each INSERT
860 : : * operation, even when no conflict arises, which could introduce
861 : : * significant overhead to replication, particularly in cases where
862 : : * conflicts are rare.
863 : : *
864 : : * XXX OTOH, this could lead to clean-up effort for dead tuples added
865 : : * in heap and index in case of conflicts. But as conflicts shouldn't
866 : : * be a frequent thing so we preferred to save the performance
867 : : * overhead of extra scan before each insertion.
868 : : */
382 akapila@postgresql.o 869 [ + + ]: 75821 : if (conflict)
870 : 24 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
871 : : recheckIndexes, NULL, slot);
872 : :
873 : : /* AFTER ROW INSERT Triggers */
2384 andres@anarazel.de 874 : 75798 : ExecARInsertTriggers(estate, resultRelInfo, slot,
875 : : recheckIndexes, NULL);
876 : :
877 : : /*
878 : : * XXX we should in theory pass a TransitionCaptureState object to the
879 : : * above to capture transition tuples, but after statement triggers
880 : : * don't actually get fired by replication yet anyway
881 : : */
882 : :
3152 peter_e@gmx.net 883 : 75798 : list_free(recheckIndexes);
884 : : }
885 : 75799 : }
886 : :
887 : : /*
888 : : * Find the searchslot tuple and update it with data in the slot,
889 : : * update the indexes, and execute any constraints and per-row triggers.
890 : : *
891 : : * Caller is responsible for opening the indexes.
892 : : */
893 : : void
1788 heikki.linnakangas@i 894 : 31923 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
895 : : EState *estate, EPQState *epqstate,
896 : : TupleTableSlot *searchslot, TupleTableSlot *slot)
897 : : {
3034 bruce@momjian.us 898 : 31923 : bool skip_tuple = false;
899 : 31923 : Relation rel = resultRelInfo->ri_RelationDesc;
2359 andres@anarazel.de 900 : 31923 : ItemPointer tid = &(searchslot->tts_tid);
901 : :
902 : : /*
903 : : * We support only non-system tables, with
904 : : * check_publication_add_relation() accountable.
905 : : */
3152 peter_e@gmx.net 906 [ - + ]: 31923 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
347 noah@leadboat.com 907 [ - + ]: 31923 : Assert(!IsCatalogRelation(rel));
908 : :
3152 peter_e@gmx.net 909 : 31923 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
910 : :
911 : : /* BEFORE ROW UPDATE Triggers */
912 [ + + ]: 31923 : if (resultRelInfo->ri_TrigDesc &&
913 [ + + ]: 10 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
914 : : {
2384 andres@anarazel.de 915 [ + + ]: 3 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
916 : : tid, NULL, slot, NULL, NULL, false))
2299 tgl@sss.pgh.pa.us 917 : 2 : skip_tuple = true; /* "do nothing" */
918 : : }
919 : :
3152 peter_e@gmx.net 920 [ + + ]: 31923 : if (!skip_tuple)
921 : : {
922 : 31921 : List *recheckIndexes = NIL;
923 : : TU_UpdateIndexes update_indexes;
924 : : List *conflictindexes;
382 akapila@postgresql.o 925 : 31921 : bool conflict = false;
926 : :
927 : : /* Compute stored generated columns */
2352 peter@eisentraut.org 928 [ + + ]: 31921 : if (rel->rd_att->constr &&
929 [ + + ]: 31876 : rel->rd_att->constr->has_generated_stored)
1788 heikki.linnakangas@i 930 : 2 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
931 : : CMD_UPDATE);
932 : :
933 : : /* Check the constraints of the tuple */
3152 peter_e@gmx.net 934 [ + + ]: 31921 : if (rel->rd_att->constr)
2644 alvherre@alvh.no-ip. 935 : 31876 : ExecConstraints(resultRelInfo, slot, estate);
1816 tgl@sss.pgh.pa.us 936 [ + + ]: 31921 : if (rel->rd_rel->relispartition)
2644 alvherre@alvh.no-ip. 937 : 12 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
938 : :
2298 andres@anarazel.de 939 : 31921 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
940 : : &update_indexes);
941 : :
382 akapila@postgresql.o 942 : 31921 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
943 : :
901 tomas.vondra@postgre 944 [ + + + + ]: 31921 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
1788 heikki.linnakangas@i 945 : 20222 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
946 : : slot, estate, true,
947 : : conflictindexes ? true : false,
948 : : &conflict, conflictindexes,
949 : : (update_indexes == TU_Summarizing));
950 : :
951 : : /*
952 : : * Refer to the comments above the call to CheckAndReportConflict() in
953 : : * ExecSimpleRelationInsert to understand why this check is done at
954 : : * this point.
955 : : */
382 akapila@postgresql.o 956 [ + + ]: 31921 : if (conflict)
957 : 2 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
958 : : recheckIndexes, searchslot, slot);
959 : :
960 : : /* AFTER ROW UPDATE Triggers */
3152 peter_e@gmx.net 961 : 31919 : ExecARUpdateTriggers(estate, resultRelInfo,
962 : : NULL, NULL,
963 : : tid, NULL, slot,
964 : : recheckIndexes, NULL, false);
965 : :
966 : 31919 : list_free(recheckIndexes);
967 : : }
968 : 31921 : }
969 : :
970 : : /*
971 : : * Find the searchslot tuple and delete it, and execute any constraints
972 : : * and per-row triggers.
973 : : *
974 : : * Caller is responsible for opening the indexes.
975 : : */
976 : : void
1788 heikki.linnakangas@i 977 : 40312 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
978 : : EState *estate, EPQState *epqstate,
979 : : TupleTableSlot *searchslot)
980 : : {
3034 bruce@momjian.us 981 : 40312 : bool skip_tuple = false;
982 : 40312 : Relation rel = resultRelInfo->ri_RelationDesc;
2359 andres@anarazel.de 983 : 40312 : ItemPointer tid = &searchslot->tts_tid;
984 : :
3152 peter_e@gmx.net 985 : 40312 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
986 : :
987 : : /* BEFORE ROW DELETE Triggers */
988 [ + + ]: 40312 : if (resultRelInfo->ri_TrigDesc &&
2886 rhaas@postgresql.org 989 [ - + ]: 10 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
990 : : {
3152 peter_e@gmx.net 991 :UBC 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
50 dean.a.rasheed@gmail 992 : 0 : tid, NULL, NULL, NULL, NULL, false);
993 : : }
994 : :
3152 peter_e@gmx.net 995 [ + - ]:CBC 40312 : if (!skip_tuple)
996 : : {
997 : : /* OK, delete the tuple */
513 akorotkov@postgresql 998 : 40312 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
999 : :
1000 : : /* AFTER ROW DELETE Triggers */
3152 peter_e@gmx.net 1001 : 40312 : ExecARDeleteTriggers(estate, resultRelInfo,
1002 : : tid, NULL, NULL, false);
1003 : : }
1004 : 40312 : }
1005 : :
1006 : : /*
1007 : : * Check if command can be executed with current replica identity.
1008 : : */
1009 : : void
1010 : 209774 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
1011 : : {
1012 : : PublicationDesc pubdesc;
1013 : :
1014 : : /*
1015 : : * Skip checking the replica identity for partitioned tables, because the
1016 : : * operations are actually performed on the leaf partitions.
1017 : : */
1117 akapila@postgresql.o 1018 [ + + ]: 209774 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1019 : 197670 : return;
1020 : :
1021 : : /* We only need to do checks for UPDATE and DELETE. */
3152 peter_e@gmx.net 1022 [ + + + + ]: 207659 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1023 : 119842 : return;
1024 : :
1025 : : /*
1026 : : * It is only safe to execute UPDATE/DELETE if the relation does not
1027 : : * publish UPDATEs or DELETEs, or all the following conditions are
1028 : : * satisfied:
1029 : : *
1030 : : * 1. All columns, referenced in the row filters from publications which
1031 : : * the relation is in, are valid - i.e. when all referenced columns are
1032 : : * part of REPLICA IDENTITY.
1033 : : *
1034 : : * 2. All columns, referenced in the column lists are valid - i.e. when
1035 : : * all columns referenced in the REPLICA IDENTITY are covered by the
1036 : : * column list.
1037 : : *
1038 : : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1039 : : * - i.e. when all these generated columns are published.
1040 : : *
1041 : : * XXX We could optimize it by first checking whether any of the
1042 : : * publications have a row filter or column list for this relation, or if
1043 : : * the relation contains a generated column. If none of these exist and
1044 : : * the relation has replica identity then we can avoid building the
1045 : : * descriptor but as this happens only one time it doesn't seem worth the
1046 : : * additional complexity.
1047 : : */
1292 akapila@postgresql.o 1048 : 87817 : RelationBuildPublicationDesc(rel, &pubdesc);
1049 [ + + + + ]: 87817 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1050 [ + - ]: 30 : ereport(ERROR,
1051 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1052 : : errmsg("cannot update table \"%s\"",
1053 : : RelationGetRelationName(rel)),
1054 : : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1260 tomas.vondra@postgre 1055 [ + + + + ]: 87787 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1056 [ + - ]: 54 : ereport(ERROR,
1057 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1058 : : errmsg("cannot update table \"%s\"",
1059 : : RelationGetRelationName(rel)),
1060 : : errdetail("Column list used by the publication does not cover the replica identity.")));
276 akapila@postgresql.o 1061 [ + + + + ]: 87733 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1062 [ + - ]: 12 : ereport(ERROR,
1063 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1064 : : errmsg("cannot update table \"%s\"",
1065 : : RelationGetRelationName(rel)),
1066 : : errdetail("Replica identity must not contain unpublished generated columns.")));
1292 1067 [ + + - + ]: 87721 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1292 akapila@postgresql.o 1068 [ # # ]:UBC 0 : ereport(ERROR,
1069 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1070 : : errmsg("cannot delete from table \"%s\"",
1071 : : RelationGetRelationName(rel)),
1072 : : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1260 tomas.vondra@postgre 1073 [ + + - + ]:CBC 87721 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1260 tomas.vondra@postgre 1074 [ # # ]:UBC 0 : ereport(ERROR,
1075 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1076 : : errmsg("cannot delete from table \"%s\"",
1077 : : RelationGetRelationName(rel)),
1078 : : errdetail("Column list used by the publication does not cover the replica identity.")));
276 akapila@postgresql.o 1079 [ + + - + ]:CBC 87721 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
276 akapila@postgresql.o 1080 [ # # ]:UBC 0 : ereport(ERROR,
1081 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1082 : : errmsg("cannot delete from table \"%s\"",
1083 : : RelationGetRelationName(rel)),
1084 : : errdetail("Replica identity must not contain unpublished generated columns.")));
1085 : :
1086 : : /* If relation has replica identity we are always good. */
1292 akapila@postgresql.o 1087 [ + + ]:CBC 87721 : if (OidIsValid(RelationGetReplicaIndex(rel)))
3152 peter_e@gmx.net 1088 : 75485 : return;
1089 : :
1090 : : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1260 tomas.vondra@postgre 1091 [ + + ]: 12236 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1092 : 228 : return;
1093 : :
1094 : : /*
1095 : : * This is UPDATE/DELETE and there is no replica identity.
1096 : : *
1097 : : * Check if the table publishes UPDATES or DELETES.
1098 : : */
1292 akapila@postgresql.o 1099 [ + + + + ]: 12008 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
3152 peter_e@gmx.net 1100 [ + - ]: 62 : ereport(ERROR,
1101 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1102 : : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1103 : : RelationGetRelationName(rel)),
1104 : : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1292 akapila@postgresql.o 1105 [ + + + + ]: 11946 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
3152 peter_e@gmx.net 1106 [ + - ]: 8 : ereport(ERROR,
1107 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1108 : : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1109 : : RelationGetRelationName(rel)),
1110 : : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1111 : : }
1112 : :
1113 : :
1114 : : /*
1115 : : * Check if we support writing into specific relkind.
1116 : : *
1117 : : * The nspname and relname are only needed for error reporting.
1118 : : */
1119 : : void
3035 1120 : 911 : CheckSubscriptionRelkind(char relkind, const char *nspname,
1121 : : const char *relname)
1122 : : {
1248 tomas.vondra@postgre 1123 [ + + - + ]: 911 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
3035 peter_e@gmx.net 1124 [ # # ]:UBC 0 : ereport(ERROR,
1125 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1126 : : errmsg("cannot use relation \"%s.%s\" as logical replication target",
1127 : : nspname, relname),
1128 : : errdetail_relkind_not_supported(relkind)));
3035 peter_e@gmx.net 1129 :CBC 911 : }
|