Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * twophase.c
4 : : * Two-phase commit support functions.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/twophase.c
11 : : *
12 : : * NOTES
13 : : * Each global transaction is associated with a global transaction
14 : : * identifier (GID). The client assigns a GID to a postgres
15 : : * transaction with the PREPARE TRANSACTION command.
16 : : *
17 : : * We keep all active global transactions in a shared memory array.
18 : : * When the PREPARE TRANSACTION command is issued, the GID is
19 : : * reserved for the transaction in the array. This is done before
20 : : * a WAL entry is made, because the reservation checks for duplicate
21 : : * GIDs and aborts the transaction if there already is a global
22 : : * transaction in prepared state with the same GID.
23 : : *
24 : : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : : * the XID considered running by TransactionIdIsInProgress. It is also
26 : : * convenient as a PGPROC to hook the gxact's locks to.
27 : : *
28 : : * Information to recover prepared transactions in case of crash is
29 : : * now stored in WAL for the common case. In some cases there will be
30 : : * an extended period between preparing a GXACT and commit/abort, in
31 : : * which case we need to separately record prepared transaction data
32 : : * in permanent storage. This includes locking information, pending
33 : : * notifications etc. All that state information is written to the
34 : : * per-transaction state file in the pg_twophase directory.
35 : : * All prepared transactions will be written prior to shutdown.
36 : : *
37 : : * Life track of state data is following:
38 : : *
39 : : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : : * stores pointer to the start of the WAL record in
41 : : * gxact->prepare_start_lsn.
42 : : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : : * using prepare_start_lsn.
44 : : * * On checkpoint state data copied to files in pg_twophase directory and
45 : : * fsynced
46 : : * * If COMMIT happens after checkpoint then backend reads state data from
47 : : * files
48 : : *
49 : : * During replay and replication, TwoPhaseState also holds information
50 : : * about active prepared transactions that haven't been moved to disk yet.
51 : : *
52 : : * Replay of twophase records happens by the following rules:
53 : : *
54 : : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : : * TwoPhaseState with entries marked with gxact->inredo and
56 : : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : : * the redo position are discarded.
58 : : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : : * gxact->inredo is set to true for such entries.
60 : : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : : * that have gxact->inredo set and are behind the redo_horizon. We
62 : : * save them to disk and then switch gxact->ondisk to true.
63 : : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : : * If gxact->ondisk is true, the corresponding entry from the disk
65 : : * is additionally deleted.
66 : : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : : * and PrescanPreparedTransactions() have been modified to go through
68 : : * gxact->inredo entries that have not made it to disk.
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include <fcntl.h>
75 : : #include <sys/stat.h>
76 : : #include <time.h>
77 : : #include <unistd.h>
78 : :
79 : : #include "access/commit_ts.h"
80 : : #include "access/htup_details.h"
81 : : #include "access/subtrans.h"
82 : : #include "access/transam.h"
83 : : #include "access/twophase.h"
84 : : #include "access/twophase_rmgr.h"
85 : : #include "access/xact.h"
86 : : #include "access/xlog.h"
87 : : #include "access/xloginsert.h"
88 : : #include "access/xlogreader.h"
89 : : #include "access/xlogrecovery.h"
90 : : #include "access/xlogutils.h"
91 : : #include "catalog/pg_type.h"
92 : : #include "catalog/storage.h"
93 : : #include "funcapi.h"
94 : : #include "miscadmin.h"
95 : : #include "pg_trace.h"
96 : : #include "pgstat.h"
97 : : #include "replication/origin.h"
98 : : #include "replication/syncrep.h"
99 : : #include "storage/fd.h"
100 : : #include "storage/ipc.h"
101 : : #include "storage/md.h"
102 : : #include "storage/predicate.h"
103 : : #include "storage/proc.h"
104 : : #include "storage/procarray.h"
105 : : #include "utils/builtins.h"
106 : : #include "utils/injection_point.h"
107 : : #include "utils/memutils.h"
108 : : #include "utils/timestamp.h"
109 : : #include "utils/wait_event.h"
110 : :
111 : : /*
112 : : * Directory where Two-phase commit files reside within PGDATA
113 : : */
114 : : #define TWOPHASE_DIR "pg_twophase"
115 : :
116 : : /* GUC variable, can't be changed after startup */
117 : : int max_prepared_xacts = 0;
118 : :
119 : : /*
120 : : * This struct describes one global transaction that is in prepared state
121 : : * or attempting to become prepared.
122 : : *
123 : : * The lifecycle of a global transaction is:
124 : : *
125 : : * 1. After checking that the requested GID is not in use, set up an entry in
126 : : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 : : * and mark it as locked by my backend.
128 : : *
129 : : * 2. After successfully completing prepare, set valid = true and enter the
130 : : * referenced PGPROC into the global ProcArray.
131 : : *
132 : : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 : : * valid and not locked, then mark the entry as locked by storing my current
134 : : * proc number into locking_backend. This prevents concurrent attempts to
135 : : * commit or rollback the same prepared xact.
136 : : *
137 : : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 : : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 : : * the freelist.
140 : : *
141 : : * Note that if the preparing transaction fails between steps 1 and 2, the
142 : : * entry must be removed so that the GID and the GlobalTransaction struct
143 : : * can be reused. See AtAbort_Twophase().
144 : : *
145 : : * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 : : * twophase.h
147 : : */
148 : :
149 : : typedef struct GlobalTransactionData
150 : : {
151 : : GlobalTransaction next; /* list link for free list */
152 : : int pgprocno; /* ID of associated dummy PGPROC */
153 : : TimestampTz prepared_at; /* time of preparation */
154 : :
155 : : /*
156 : : * Note that we need to keep track of two LSNs for each GXACT. We keep
157 : : * track of the start LSN because this is the address we must use to read
158 : : * state data back from WAL when committing a prepared GXACT. We keep
159 : : * track of the end LSN because that is the LSN we need to wait for prior
160 : : * to commit.
161 : : */
162 : : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
163 : : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
164 : : FullTransactionId fxid; /* The GXACT full xid */
165 : :
166 : : Oid owner; /* ID of user that executed the xact */
167 : : ProcNumber locking_backend; /* backend currently working on the xact */
168 : : bool valid; /* true if PGPROC entry is in proc array */
169 : : bool ondisk; /* true if prepare state file is on disk */
170 : : bool inredo; /* true if entry was added via xlog_redo */
171 : : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
172 : : } GlobalTransactionData;
173 : :
174 : : /*
175 : : * Two Phase Commit shared state. Access to this struct is protected
176 : : * by TwoPhaseStateLock.
177 : : */
178 : : typedef struct TwoPhaseStateData
179 : : {
180 : : /* Head of linked list of free GlobalTransactionData structs */
181 : : GlobalTransaction freeGXacts;
182 : :
183 : : /* Number of valid prepXacts entries. */
184 : : int numPrepXacts;
185 : :
186 : : /* There are max_prepared_xacts items in this array */
187 : : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
188 : : } TwoPhaseStateData;
189 : :
190 : : static TwoPhaseStateData *TwoPhaseState;
191 : :
192 : : /*
193 : : * Global transaction entry currently locked by us, if any. Note that any
194 : : * access to the entry pointed to by this variable must be protected by
195 : : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
196 : : * (since it's just local memory).
197 : : */
198 : : static GlobalTransaction MyLockedGxact = NULL;
199 : :
200 : : static bool twophaseExitRegistered = false;
201 : :
202 : : static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
203 : : static void RecordTransactionCommitPrepared(TransactionId xid,
204 : : int nchildren,
205 : : TransactionId *children,
206 : : int nrels,
207 : : RelFileLocator *rels,
208 : : int nstats,
209 : : xl_xact_stats_item *stats,
210 : : int ninvalmsgs,
211 : : SharedInvalidationMessage *invalmsgs,
212 : : bool initfileinval,
213 : : const char *gid);
214 : : static void RecordTransactionAbortPrepared(TransactionId xid,
215 : : int nchildren,
216 : : TransactionId *children,
217 : : int nrels,
218 : : RelFileLocator *rels,
219 : : int nstats,
220 : : xl_xact_stats_item *stats,
221 : : const char *gid);
222 : : static void ProcessRecords(char *bufptr, FullTransactionId fxid,
223 : : const TwoPhaseCallback callbacks[]);
224 : : static void RemoveGXact(GlobalTransaction gxact);
225 : :
226 : : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
227 : : static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
228 : : XLogRecPtr prepare_start_lsn,
229 : : bool fromdisk, bool setParent, bool setNextXid);
230 : : static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
231 : : const char *gid, TimestampTz prepared_at, Oid owner,
232 : : Oid databaseid);
233 : : static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
234 : : static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
235 : :
236 : : /*
237 : : * Initialization of shared memory
238 : : */
239 : : Size
7576 tgl@sss.pgh.pa.us 240 :CBC 3297 : TwoPhaseShmemSize(void)
241 : : {
242 : : Size size;
243 : :
244 : : /* Need the fixed struct, the array of pointers, and the GTD structs */
7512 245 : 3297 : size = offsetof(TwoPhaseStateData, prepXacts);
246 : 3297 : size = add_size(size, mul_size(max_prepared_xacts,
247 : : sizeof(GlobalTransaction)));
248 : 3297 : size = MAXALIGN(size);
249 : 3297 : size = add_size(size, mul_size(max_prepared_xacts,
250 : : sizeof(GlobalTransactionData)));
251 : :
252 : 3297 : return size;
253 : : }
254 : :
255 : : void
7576 256 : 1150 : TwoPhaseShmemInit(void)
257 : : {
258 : : bool found;
259 : :
260 : 1150 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
261 : : TwoPhaseShmemSize(),
262 : : &found);
263 [ + - ]: 1150 : if (!IsUnderPostmaster)
264 : : {
265 : : GlobalTransaction gxacts;
266 : : int i;
267 : :
268 [ - + ]: 1150 : Assert(!found);
6342 269 : 1150 : TwoPhaseState->freeGXacts = NULL;
7576 270 : 1150 : TwoPhaseState->numPrepXacts = 0;
271 : :
272 : : /*
273 : : * Initialize the linked list of free GlobalTransactionData structs
274 : : */
275 : 1150 : gxacts = (GlobalTransaction)
276 : 1150 : ((char *) TwoPhaseState +
7456 bruce@momjian.us 277 : 1150 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
278 : : sizeof(GlobalTransaction) * max_prepared_xacts));
7576 tgl@sss.pgh.pa.us 279 [ + + ]: 2018 : for (i = 0; i < max_prepared_xacts; i++)
280 : : {
281 : : /* insert into linked list */
5224 rhaas@postgresql.org 282 : 868 : gxacts[i].next = TwoPhaseState->freeGXacts;
6342 tgl@sss.pgh.pa.us 283 : 868 : TwoPhaseState->freeGXacts = &gxacts[i];
284 : :
285 : : /* associate it with a PGPROC assigned by InitProcGlobal */
752 heikki.linnakangas@i 286 : 868 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
287 : : }
288 : : }
289 : : else
7576 tgl@sss.pgh.pa.us 290 [ # # ]:UBC 0 : Assert(found);
7576 tgl@sss.pgh.pa.us 291 :CBC 1150 : }
292 : :
293 : : /*
294 : : * Exit hook to unlock the global transaction entry we're working on.
295 : : */
296 : : static void
4322 heikki.linnakangas@i 297 : 138 : AtProcExit_Twophase(int code, Datum arg)
298 : : {
299 : : /* same logic as abort */
300 : 138 : AtAbort_Twophase();
301 : 138 : }
302 : :
303 : : /*
304 : : * Abort hook to unlock the global transaction entry we're working on.
305 : : */
306 : : void
307 : 26854 : AtAbort_Twophase(void)
308 : : {
309 [ + + ]: 26854 : if (MyLockedGxact == NULL)
310 : 26852 : return;
311 : :
312 : : /*
313 : : * What to do with the locked global transaction entry? If we were in the
314 : : * process of preparing the transaction, but haven't written the WAL
315 : : * record and state file yet, the transaction must not be considered as
316 : : * prepared. Likewise, if we are in the process of finishing an
317 : : * already-prepared transaction, and fail after having already written the
318 : : * 2nd phase commit or rollback record to the WAL, the transaction should
319 : : * not be considered as prepared anymore. In those cases, just remove the
320 : : * entry from shared memory.
321 : : *
322 : : * Otherwise, the entry must be left in place so that the transaction can
323 : : * be finished later, so just unlock it.
324 : : *
325 : : * If we abort during prepare, after having written the WAL record, we
326 : : * might not have transferred all locks and other state to the prepared
327 : : * transaction yet. Likewise, if we abort during commit or rollback,
328 : : * after having written the WAL record, we might not have released all the
329 : : * resources held by the transaction yet. In those cases, the in-memory
330 : : * state can be wrong, but it's too late to back out.
331 : : */
3196 alvherre@alvh.no-ip. 332 : 2 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
4322 heikki.linnakangas@i 333 [ + - ]: 2 : if (!MyLockedGxact->valid)
334 : 2 : RemoveGXact(MyLockedGxact);
335 : : else
742 heikki.linnakangas@i 336 :UBC 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
3196 alvherre@alvh.no-ip. 337 :CBC 2 : LWLockRelease(TwoPhaseStateLock);
338 : :
4322 heikki.linnakangas@i 339 : 2 : MyLockedGxact = NULL;
340 : : }
341 : :
342 : : /*
343 : : * This is called after we have finished transferring state to the prepared
344 : : * PGPROC entry.
345 : : */
346 : : void
3865 andres@anarazel.de 347 : 345 : PostPrepare_Twophase(void)
348 : : {
4322 heikki.linnakangas@i 349 : 345 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
742 350 : 345 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
4322 351 : 345 : LWLockRelease(TwoPhaseStateLock);
352 : :
353 : 345 : MyLockedGxact = NULL;
354 : 345 : }
355 : :
356 : :
357 : : /*
358 : : * MarkAsPreparing
359 : : * Reserve the GID for the given transaction.
360 : : */
361 : : GlobalTransaction
251 michael@paquier.xyz 362 :GNC 325 : MarkAsPreparing(FullTransactionId fxid, const char *gid,
363 : : TimestampTz prepared_at, Oid owner, Oid databaseid)
364 : : {
365 : : GlobalTransaction gxact;
366 : : int i;
367 : :
7576 tgl@sss.pgh.pa.us 368 [ - + ]:CBC 325 : if (strlen(gid) >= GIDSIZE)
7576 tgl@sss.pgh.pa.us 369 [ # # ]:UBC 0 : ereport(ERROR,
370 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
371 : : errmsg("transaction identifier \"%s\" is too long",
372 : : gid)));
373 : :
374 : : /* fail immediately if feature is disabled */
6170 tgl@sss.pgh.pa.us 375 [ + + ]:CBC 325 : if (max_prepared_xacts == 0)
376 [ + - ]: 9 : ereport(ERROR,
377 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
378 : : errmsg("prepared transactions are disabled"),
379 : : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
380 : :
381 : : /* on first call, register the exit hook */
4322 heikki.linnakangas@i 382 [ + + ]: 316 : if (!twophaseExitRegistered)
383 : : {
384 : 77 : before_shmem_exit(AtProcExit_Twophase, 0);
385 : 77 : twophaseExitRegistered = true;
386 : : }
387 : :
388 : 316 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
389 : :
390 : : /* Check for conflicting GID */
7576 tgl@sss.pgh.pa.us 391 [ + + ]: 549 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
392 : : {
393 : 235 : gxact = TwoPhaseState->prepXacts[i];
394 [ + + ]: 235 : if (strcmp(gxact->gid, gid) == 0)
395 : : {
396 [ + - ]: 2 : ereport(ERROR,
397 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
398 : : errmsg("transaction identifier \"%s\" is already in use",
399 : : gid)));
400 : : }
401 : : }
402 : :
403 : : /* Get a free gxact from the freelist */
6342 404 [ - + ]: 314 : if (TwoPhaseState->freeGXacts == NULL)
7576 tgl@sss.pgh.pa.us 405 [ # # ]:UBC 0 : ereport(ERROR,
406 : : (errcode(ERRCODE_OUT_OF_MEMORY),
407 : : errmsg("maximum number of prepared transactions reached"),
408 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
409 : : max_prepared_xacts)));
6342 tgl@sss.pgh.pa.us 410 :CBC 314 : gxact = TwoPhaseState->freeGXacts;
4967 411 : 314 : TwoPhaseState->freeGXacts = gxact->next;
412 : :
251 michael@paquier.xyz 413 :GNC 314 : MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
414 : :
3267 simon@2ndQuadrant.co 415 :CBC 314 : gxact->ondisk = false;
416 : :
417 : : /* And insert it into the active array */
418 [ - + ]: 314 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
419 : 314 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
420 : :
421 : 314 : LWLockRelease(TwoPhaseStateLock);
422 : :
423 : 314 : return gxact;
424 : : }
425 : :
426 : : /*
427 : : * MarkAsPreparingGuts
428 : : *
429 : : * This uses a gxact struct and puts it into the active array.
430 : : * NOTE: this is also used when reloading a gxact after a crash; so avoid
431 : : * assuming that we can use very much backend context.
432 : : *
433 : : * Note: This function should be called with appropriate locks held.
434 : : */
435 : : static void
251 michael@paquier.xyz 436 :GNC 347 : MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
437 : : const char *gid, TimestampTz prepared_at, Oid owner,
438 : : Oid databaseid)
439 : : {
440 : : PGPROC *proc;
441 : : int i;
442 : 347 : TransactionId xid = XidFromFullTransactionId(fxid);
443 : :
3196 alvherre@alvh.no-ip. 444 [ - + ]:CBC 347 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
445 : :
3267 simon@2ndQuadrant.co 446 [ - + ]: 347 : Assert(gxact != NULL);
742 heikki.linnakangas@i 447 : 347 : proc = GetPGProcByNumber(gxact->pgprocno);
448 : :
449 : : /* Initialize the PGPROC entry */
5224 rhaas@postgresql.org 450 [ + - + - : 39211 : MemSet(proc, 0, sizeof(PGPROC));
+ - + - +
+ ]
2097 peter@eisentraut.org 451 : 347 : proc->waitStatus = PROC_WAIT_STATUS_OK;
742 heikki.linnakangas@i 452 [ + + ]: 347 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
453 : : {
454 : : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
455 : 314 : proc->vxid.lxid = MyProc->vxid.lxid;
456 : 314 : proc->vxid.procNumber = MyProcNumber;
457 : : }
458 : : else
459 : : {
1604 noah@leadboat.com 460 [ - + - - ]: 33 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
461 : : /* GetLockConflicts() uses this to specify a wait on the XID */
742 heikki.linnakangas@i 462 : 33 : proc->vxid.lxid = xid;
463 : 33 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
464 : : }
2039 andres@anarazel.de 465 : 347 : proc->xid = xid;
2040 466 [ - + ]: 347 : Assert(proc->xmin == InvalidTransactionId);
1437 rhaas@postgresql.org 467 : 347 : proc->delayChkptFlags = 0;
1945 alvherre@alvh.no-ip. 468 : 347 : proc->statusFlags = 0;
5224 rhaas@postgresql.org 469 : 347 : proc->pid = 0;
470 : 347 : proc->databaseId = databaseid;
471 : 347 : proc->roleId = owner;
2771 michael@paquier.xyz 472 : 347 : proc->tempNamespaceId = InvalidOid;
39 heikki.linnakangas@i 473 :GNC 347 : proc->backendType = B_INVALID;
1211 andres@anarazel.de 474 :CBC 347 : proc->lwWaiting = LW_WS_NOT_WAITING;
5158 heikki.linnakangas@i 475 : 347 : proc->lwWaitMode = 0;
5224 rhaas@postgresql.org 476 : 347 : proc->waitLock = NULL;
23 heikki.linnakangas@i 477 :GNC 347 : dlist_node_init(&proc->waitLink);
5224 rhaas@postgresql.org 478 :CBC 347 : proc->waitProcLock = NULL;
1847 fujii@postgresql.org 479 : 347 : pg_atomic_init_u64(&proc->waitStart, 0);
7399 tgl@sss.pgh.pa.us 480 [ + + ]: 5899 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
1152 andres@anarazel.de 481 : 5552 : dlist_init(&proc->myProcLocks[i]);
482 : : /* subxid data must be filled later by GXactLoadSubxactData */
2039 483 : 347 : proc->subxidStatus.overflowed = false;
484 : 347 : proc->subxidStatus.count = 0;
485 : :
7575 tgl@sss.pgh.pa.us 486 : 347 : gxact->prepared_at = prepared_at;
251 michael@paquier.xyz 487 :GNC 347 : gxact->fxid = fxid;
7576 tgl@sss.pgh.pa.us 488 :CBC 347 : gxact->owner = owner;
742 heikki.linnakangas@i 489 : 347 : gxact->locking_backend = MyProcNumber;
7576 tgl@sss.pgh.pa.us 490 : 347 : gxact->valid = false;
3267 simon@2ndQuadrant.co 491 : 347 : gxact->inredo = false;
7576 tgl@sss.pgh.pa.us 492 : 347 : strcpy(gxact->gid, gid);
493 : :
494 : : /*
495 : : * Remember that we have this GlobalTransaction entry locked for us. If we
496 : : * abort after this, we must release it.
497 : : */
4322 heikki.linnakangas@i 498 : 347 : MyLockedGxact = gxact;
7576 tgl@sss.pgh.pa.us 499 : 347 : }
500 : :
501 : : /*
502 : : * GXactLoadSubxactData
503 : : *
504 : : * If the transaction being persisted had any subtransactions, this must
505 : : * be called before MarkAsPrepared() to load information into the dummy
506 : : * PGPROC.
507 : : */
508 : : static void
509 : 134 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
510 : : TransactionId *children)
511 : : {
742 heikki.linnakangas@i 512 : 134 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
513 : :
514 : : /* We need no extra lock since the GXACT isn't valid yet */
7576 tgl@sss.pgh.pa.us 515 [ + + ]: 134 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
516 : : {
2039 andres@anarazel.de 517 : 4 : proc->subxidStatus.overflowed = true;
7576 tgl@sss.pgh.pa.us 518 : 4 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
519 : : }
520 [ + + ]: 134 : if (nsubxacts > 0)
521 : : {
5224 rhaas@postgresql.org 522 : 117 : memcpy(proc->subxids.xids, children,
523 : : nsubxacts * sizeof(TransactionId));
2039 andres@anarazel.de 524 : 117 : proc->subxidStatus.count = nsubxacts;
525 : : }
7576 tgl@sss.pgh.pa.us 526 : 134 : }
527 : :
528 : : /*
529 : : * MarkAsPrepared
530 : : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
531 : : *
532 : : * lock_held indicates whether caller already holds TwoPhaseStateLock.
533 : : */
534 : : static void
3196 alvherre@alvh.no-ip. 535 : 345 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
536 : : {
537 : : /* Lock here may be overkill, but I'm not convinced of that ... */
538 [ + + ]: 345 : if (!lock_held)
539 : 312 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
7576 tgl@sss.pgh.pa.us 540 [ - + ]: 345 : Assert(!gxact->valid);
541 : 345 : gxact->valid = true;
3196 alvherre@alvh.no-ip. 542 [ + + ]: 345 : if (!lock_held)
543 : 312 : LWLockRelease(TwoPhaseStateLock);
544 : :
545 : : /*
546 : : * Put it into the global ProcArray so TransactionIdIsInProgress considers
547 : : * the XID as still running.
548 : : */
742 heikki.linnakangas@i 549 : 345 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
7576 tgl@sss.pgh.pa.us 550 : 345 : }
551 : :
552 : : /*
553 : : * LockGXact
554 : : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
555 : : */
556 : : static GlobalTransaction
7565 557 : 326 : LockGXact(const char *gid, Oid user)
558 : : {
559 : : int i;
560 : :
561 : : /* on first call, register the exit hook */
4322 heikki.linnakangas@i 562 [ + + ]: 326 : if (!twophaseExitRegistered)
563 : : {
564 : 61 : before_shmem_exit(AtProcExit_Twophase, 0);
565 : 61 : twophaseExitRegistered = true;
566 : : }
567 : :
7576 tgl@sss.pgh.pa.us 568 : 326 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
569 : :
570 [ + + ]: 517 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
571 : : {
7456 bruce@momjian.us 572 : 510 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
742 heikki.linnakangas@i 573 : 510 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
574 : :
575 : : /* Ignore not-yet-valid GIDs */
7576 tgl@sss.pgh.pa.us 576 [ - + ]: 510 : if (!gxact->valid)
7576 tgl@sss.pgh.pa.us 577 :LBC (6) : continue;
7576 tgl@sss.pgh.pa.us 578 [ + + ]:CBC 510 : if (strcmp(gxact->gid, gid) != 0)
579 : 191 : continue;
580 : :
581 : : /* Found it, but has someone else got it locked? */
742 heikki.linnakangas@i 582 [ - + ]: 319 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
4322 heikki.linnakangas@i 583 [ # # ]:UBC 0 : ereport(ERROR,
584 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
585 : : errmsg("prepared transaction with identifier \"%s\" is busy",
586 : : gid)));
587 : :
7576 tgl@sss.pgh.pa.us 588 [ - + - - ]:CBC 319 : if (user != gxact->owner && !superuser_arg(user))
7576 tgl@sss.pgh.pa.us 589 [ # # ]:UBC 0 : ereport(ERROR,
590 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
591 : : errmsg("permission denied to finish prepared transaction"),
592 : : errhint("Must be superuser or the user that prepared the transaction.")));
593 : :
594 : : /*
595 : : * Note: it probably would be possible to allow committing from
596 : : * another database; but at the moment NOTIFY is known not to work and
597 : : * there may be some other issues as well. Hence disallow until
598 : : * someone gets motivated to make it work.
599 : : */
5224 rhaas@postgresql.org 600 [ - + ]:CBC 319 : if (MyDatabaseId != proc->databaseId)
6970 tgl@sss.pgh.pa.us 601 [ # # ]:UBC 0 : ereport(ERROR,
602 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
603 : : errmsg("prepared transaction belongs to another database"),
604 : : errhint("Connect to the database where the transaction was prepared to finish it.")));
605 : :
606 : : /* OK for me to lock it */
742 heikki.linnakangas@i 607 :CBC 319 : gxact->locking_backend = MyProcNumber;
4322 608 : 319 : MyLockedGxact = gxact;
609 : :
7576 tgl@sss.pgh.pa.us 610 : 319 : LWLockRelease(TwoPhaseStateLock);
611 : :
612 : 319 : return gxact;
613 : : }
614 : :
7576 tgl@sss.pgh.pa.us 615 :GBC 7 : LWLockRelease(TwoPhaseStateLock);
616 : :
617 [ + - ]: 7 : ereport(ERROR,
618 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
619 : : errmsg("prepared transaction with identifier \"%s\" does not exist",
620 : : gid)));
621 : :
622 : : /* NOTREACHED */
623 : : return NULL;
624 : : }
625 : :
626 : : /*
627 : : * RemoveGXact
628 : : * Remove the prepared transaction from the shared memory array.
629 : : *
630 : : * NB: caller should have already removed it from ProcArray
631 : : */
632 : : static void
7576 tgl@sss.pgh.pa.us 633 :CBC 383 : RemoveGXact(GlobalTransaction gxact)
634 : : {
635 : : int i;
636 : :
3196 alvherre@alvh.no-ip. 637 [ - + ]: 383 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
638 : :
7576 tgl@sss.pgh.pa.us 639 [ + - ]: 571 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
640 : : {
641 [ + + ]: 571 : if (gxact == TwoPhaseState->prepXacts[i])
642 : : {
643 : : /* remove from the active array */
644 : 383 : TwoPhaseState->numPrepXacts--;
645 : 383 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
646 : :
647 : : /* and put it back in the freelist */
5224 rhaas@postgresql.org 648 : 383 : gxact->next = TwoPhaseState->freeGXacts;
6342 tgl@sss.pgh.pa.us 649 : 383 : TwoPhaseState->freeGXacts = gxact;
650 : :
7576 651 : 383 : return;
652 : : }
653 : : }
654 : :
7576 tgl@sss.pgh.pa.us 655 [ # # ]:UBC 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
656 : : }
657 : :
658 : : /*
659 : : * Returns an array of all prepared transactions for the user-level
660 : : * function pg_prepared_xact.
661 : : *
662 : : * The returned array and all its elements are copies of internal data
663 : : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
664 : : *
665 : : * WARNING -- we return even those transactions that are not fully prepared
666 : : * yet. The caller should filter them out if he doesn't want them.
667 : : *
668 : : * The returned array is palloc'd.
669 : : */
670 : : static int
7576 tgl@sss.pgh.pa.us 671 :CBC 107 : GetPreparedTransactionList(GlobalTransaction *gxacts)
672 : : {
673 : : GlobalTransaction array;
674 : : int num;
675 : : int i;
676 : :
677 : 107 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
678 : :
679 [ + + ]: 107 : if (TwoPhaseState->numPrepXacts == 0)
680 : : {
681 : 66 : LWLockRelease(TwoPhaseStateLock);
682 : :
683 : 66 : *gxacts = NULL;
684 : 66 : return 0;
685 : : }
686 : :
687 : 41 : num = TwoPhaseState->numPrepXacts;
95 michael@paquier.xyz 688 :GNC 41 : array = palloc_array(GlobalTransactionData, num);
7576 tgl@sss.pgh.pa.us 689 :CBC 41 : *gxacts = array;
690 [ + + ]: 87 : for (i = 0; i < num; i++)
691 : 46 : memcpy(array + i, TwoPhaseState->prepXacts[i],
692 : : sizeof(GlobalTransactionData));
693 : :
694 : 41 : LWLockRelease(TwoPhaseStateLock);
695 : :
696 : 41 : return num;
697 : : }
698 : :
699 : :
700 : : /* Working status for pg_prepared_xact */
701 : : typedef struct
702 : : {
703 : : GlobalTransaction array;
704 : : int ngxacts;
705 : : int currIdx;
706 : : } Working_State;
707 : :
708 : : /*
709 : : * pg_prepared_xact
710 : : * Produce a view with one row per prepared transaction.
711 : : *
712 : : * This function is here so we don't have to export the
713 : : * GlobalTransactionData struct definition.
714 : : */
715 : : Datum
716 : 153 : pg_prepared_xact(PG_FUNCTION_ARGS)
717 : : {
718 : : FuncCallContext *funcctx;
719 : : Working_State *status;
720 : :
721 [ + + ]: 153 : if (SRF_IS_FIRSTCALL())
722 : : {
723 : : TupleDesc tupdesc;
724 : : MemoryContext oldcontext;
725 : :
726 : : /* create a function context for cross-call persistence */
727 : 107 : funcctx = SRF_FIRSTCALL_INIT();
728 : :
729 : : /*
730 : : * Switch to memory context appropriate for multiple function calls
731 : : */
732 : 107 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
733 : :
734 : : /* build tupdesc for result tuples */
735 : : /* this had better match pg_prepared_xacts view in system_views.sql */
2672 andres@anarazel.de 736 : 107 : tupdesc = CreateTemplateTupleDesc(5);
7576 tgl@sss.pgh.pa.us 737 : 107 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
738 : : XIDOID, -1, 0);
739 : 107 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
740 : : TEXTOID, -1, 0);
7575 741 : 107 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
742 : : TIMESTAMPTZOID, -1, 0);
743 : 107 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
744 : : OIDOID, -1, 0);
745 : 107 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
746 : : OIDOID, -1, 0);
747 : :
7576 748 : 107 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
749 : :
750 : : /*
751 : : * Collect all the 2PC status information that we will format and send
752 : : * out as a result set.
753 : : */
95 michael@paquier.xyz 754 :GNC 107 : status = palloc_object(Working_State);
472 peter@eisentraut.org 755 :CBC 107 : funcctx->user_fctx = status;
756 : :
7576 tgl@sss.pgh.pa.us 757 : 107 : status->ngxacts = GetPreparedTransactionList(&status->array);
758 : 107 : status->currIdx = 0;
759 : :
760 : 107 : MemoryContextSwitchTo(oldcontext);
761 : : }
762 : :
763 : 153 : funcctx = SRF_PERCALL_SETUP();
764 : 153 : status = (Working_State *) funcctx->user_fctx;
765 : :
766 [ + + + + ]: 153 : while (status->array != NULL && status->currIdx < status->ngxacts)
767 : : {
768 : 46 : GlobalTransaction gxact = &status->array[status->currIdx++];
752 heikki.linnakangas@i 769 : 46 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1338 peter@eisentraut.org 770 : 46 : Datum values[5] = {0};
771 : 46 : bool nulls[5] = {0};
772 : : HeapTuple tuple;
773 : : Datum result;
774 : :
7576 tgl@sss.pgh.pa.us 775 [ - + ]: 46 : if (!gxact->valid)
7576 tgl@sss.pgh.pa.us 776 :UBC 0 : continue;
777 : :
778 : : /*
779 : : * Form tuple with appropriate data.
780 : : */
781 : :
2039 andres@anarazel.de 782 :CBC 46 : values[0] = TransactionIdGetDatum(proc->xid);
6564 tgl@sss.pgh.pa.us 783 : 46 : values[1] = CStringGetTextDatum(gxact->gid);
7575 784 : 46 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
7565 785 : 46 : values[3] = ObjectIdGetDatum(gxact->owner);
5224 rhaas@postgresql.org 786 : 46 : values[4] = ObjectIdGetDatum(proc->databaseId);
787 : :
7576 tgl@sss.pgh.pa.us 788 : 46 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
789 : 46 : result = HeapTupleGetDatum(tuple);
790 : 46 : SRF_RETURN_NEXT(funcctx, result);
791 : : }
792 : :
793 : 107 : SRF_RETURN_DONE(funcctx);
794 : : }
795 : :
796 : : /*
797 : : * TwoPhaseGetGXact
798 : : * Get the GlobalTransaction struct for a prepared transaction
799 : : * specified by XID
800 : : *
801 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
802 : : * caller had better hold it.
803 : : */
804 : : static GlobalTransaction
251 michael@paquier.xyz 805 :GNC 1393 : TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
806 : : {
4967 tgl@sss.pgh.pa.us 807 :CBC 1393 : GlobalTransaction result = NULL;
808 : : int i;
809 : :
810 : : static FullTransactionId cached_fxid = {InvalidTransactionId};
811 : : static GlobalTransaction cached_gxact = NULL;
812 : :
2575 michael@paquier.xyz 813 [ + + - + ]: 1393 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
814 : :
815 : : /*
816 : : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
817 : : * repeatedly for the same XID. We can save work with a simple cache.
818 : : */
251 michael@paquier.xyz 819 [ + + ]:GNC 1393 : if (FullTransactionIdEquals(fxid, cached_fxid))
4967 tgl@sss.pgh.pa.us 820 :CBC 973 : return cached_gxact;
821 : :
2575 michael@paquier.xyz 822 [ + + ]: 420 : if (!lock_held)
823 : 345 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
824 : :
7576 tgl@sss.pgh.pa.us 825 [ + - ]: 658 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
826 : : {
7456 bruce@momjian.us 827 : 658 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
828 : :
251 michael@paquier.xyz 829 [ + + ]:GNC 658 : if (FullTransactionIdEquals(gxact->fxid, fxid))
830 : : {
4967 tgl@sss.pgh.pa.us 831 :CBC 420 : result = gxact;
7576 832 : 420 : break;
833 : : }
834 : : }
835 : :
2575 michael@paquier.xyz 836 [ + + ]: 420 : if (!lock_held)
837 : 345 : LWLockRelease(TwoPhaseStateLock);
838 : :
7576 tgl@sss.pgh.pa.us 839 [ - + ]: 420 : if (result == NULL) /* should not happen */
251 michael@paquier.xyz 840 [ # # ]:UNC 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u",
841 : : XidFromFullTransactionId(fxid));
842 : :
251 michael@paquier.xyz 843 :GNC 420 : cached_fxid = fxid;
4967 tgl@sss.pgh.pa.us 844 :CBC 420 : cached_gxact = result;
845 : :
7576 846 : 420 : return result;
847 : : }
848 : :
849 : : /*
850 : : * TwoPhaseGetXidByVirtualXID
851 : : * Lookup VXID among xacts prepared since last startup.
852 : : *
853 : : * (This won't find recovered xacts.) If more than one matches, return any
854 : : * and set "have_more" to true. To witness multiple matches, a single
855 : : * proc number must consume 2^32 LXIDs, with no intervening database restart.
856 : : */
857 : : TransactionId
1604 noah@leadboat.com 858 : 81 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
859 : : bool *have_more)
860 : : {
861 : : int i;
862 : 81 : TransactionId result = InvalidTransactionId;
863 : :
864 [ - + ]: 81 : Assert(VirtualTransactionIdIsValid(vxid));
865 : 81 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
866 : :
867 [ + + ]: 112 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
868 : : {
869 : 31 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
870 : : PGPROC *proc;
871 : : VirtualTransactionId proc_vxid;
872 : :
873 [ - + ]: 31 : if (!gxact->valid)
1604 noah@leadboat.com 874 :LBC (2) : continue;
742 heikki.linnakangas@i 875 :CBC 31 : proc = GetPGProcByNumber(gxact->pgprocno);
1604 noah@leadboat.com 876 : 31 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
877 [ + + + + ]: 31 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
878 : : {
879 : : /*
880 : : * Startup process sets proc->vxid.procNumber to
881 : : * INVALID_PROC_NUMBER.
882 : : */
883 [ - + ]: 7 : Assert(!gxact->inredo);
884 : :
885 [ - + ]: 7 : if (result != InvalidTransactionId)
886 : : {
1604 noah@leadboat.com 887 :UBC 0 : *have_more = true;
888 : 0 : break;
889 : : }
251 michael@paquier.xyz 890 :GNC 7 : result = XidFromFullTransactionId(gxact->fxid);
891 : : }
892 : : }
893 : :
1604 noah@leadboat.com 894 :CBC 81 : LWLockRelease(TwoPhaseStateLock);
895 : :
896 : 81 : return result;
897 : : }
898 : :
899 : : /*
900 : : * TwoPhaseGetDummyProcNumber
901 : : * Get the dummy proc number for prepared transaction
902 : : *
903 : : * Dummy proc numbers are similar to proc numbers of real backends. They
904 : : * start at FIRST_PREPARED_XACT_PROC_NUMBER, and are unique across all
905 : : * currently active real backends and prepared transactions. If lock_held is
906 : : * set to true, TwoPhaseStateLock will not be taken, so the caller had better
907 : : * hold it.
908 : : */
909 : : ProcNumber
251 michael@paquier.xyz 910 :GNC 147 : TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
911 : : {
912 : 147 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
913 : :
742 heikki.linnakangas@i 914 :CBC 147 : return gxact->pgprocno;
915 : : }
916 : :
917 : : /*
918 : : * TwoPhaseGetDummyProc
919 : : * Get the PGPROC that represents a prepared transaction
920 : : *
921 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
922 : : * caller had better hold it.
923 : : */
924 : : PGPROC *
251 michael@paquier.xyz 925 :GNC 1246 : TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
926 : : {
927 : 1246 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
928 : :
752 heikki.linnakangas@i 929 :CBC 1246 : return GetPGProcByNumber(gxact->pgprocno);
930 : : }
931 : :
932 : : /************************************************************************/
933 : : /* State file support */
934 : : /************************************************************************/
935 : :
936 : : /*
937 : : * Compute the FullTransactionId for the given TransactionId.
938 : : *
939 : : * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
940 : : * PREPARED. After those commands, concurrent vac_truncate_clog() may make
941 : : * the xid cease to qualify as allowable. XXX Not all callers limit their
942 : : * calls accordingly.
943 : : */
944 : : static inline FullTransactionId
422 michael@paquier.xyz 945 : 358 : AdjustToFullTransactionId(TransactionId xid)
946 : : {
947 [ - + ]: 358 : Assert(TransactionIdIsValid(xid));
414 noah@leadboat.com 948 : 358 : return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
949 : : }
950 : :
951 : : static inline int
251 michael@paquier.xyz 952 :GNC 568 : TwoPhaseFilePath(char *path, FullTransactionId fxid)
953 : : {
837 akorotkov@postgresql 954 :CBC 1136 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
955 : 568 : EpochFromFullTransactionId(fxid),
956 : 568 : XidFromFullTransactionId(fxid));
957 : : }
958 : :
959 : : /*
960 : : * 2PC state file format:
961 : : *
962 : : * 1. TwoPhaseFileHeader
963 : : * 2. TransactionId[] (subtransactions)
964 : : * 3. RelFileLocator[] (files to be deleted at commit)
965 : : * 4. RelFileLocator[] (files to be deleted at abort)
966 : : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
967 : : * 6. TwoPhaseRecordOnDisk
968 : : * 7. ...
969 : : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
970 : : * 9. checksum (CRC-32C)
971 : : *
972 : : * Each segment except the final checksum is MAXALIGN'd.
973 : : */
974 : :
975 : : /*
976 : : * Header for a 2PC state file
977 : : */
978 : : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
979 : :
980 : : typedef xl_xact_prepare TwoPhaseFileHeader;
981 : :
982 : : /*
983 : : * Header for each record in a state file
984 : : *
985 : : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
986 : : * The rmgr data will be stored starting on a MAXALIGN boundary.
987 : : */
988 : : typedef struct TwoPhaseRecordOnDisk
989 : : {
990 : : uint32 len; /* length of rmgr data */
991 : : TwoPhaseRmgrId rmid; /* resource manager for this record */
992 : : uint16 info; /* flag bits for use by rmgr */
993 : : } TwoPhaseRecordOnDisk;
994 : :
995 : : /*
996 : : * During prepare, the state file is assembled in memory before writing it
997 : : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
998 : : * for that.
999 : : */
1000 : : typedef struct StateFileChunk
1001 : : {
1002 : : char *data;
1003 : : uint32 len;
1004 : : struct StateFileChunk *next;
1005 : : } StateFileChunk;
1006 : :
1007 : : static struct xllist
1008 : : {
1009 : : StateFileChunk *head; /* first data block in the chain */
1010 : : StateFileChunk *tail; /* last block in chain */
1011 : : uint32 num_chunks;
1012 : : uint32 bytes_free; /* free bytes left in tail block */
1013 : : uint32 total_len; /* total data bytes in chain */
1014 : : } records;
1015 : :
1016 : :
1017 : : /*
1018 : : * Append a block of data to records data structure.
1019 : : *
1020 : : * NB: each block is padded to a MAXALIGN multiple. This must be
1021 : : * accounted for when the file is later read!
1022 : : *
1023 : : * The data is copied, so the caller is free to modify it afterwards.
1024 : : */
1025 : : static void
7576 tgl@sss.pgh.pa.us 1026 : 3854 : save_state_data(const void *data, uint32 len)
1027 : : {
7456 bruce@momjian.us 1028 : 3854 : uint32 padlen = MAXALIGN(len);
1029 : :
7576 tgl@sss.pgh.pa.us 1030 [ + + ]: 3854 : if (padlen > records.bytes_free)
1031 : : {
95 michael@paquier.xyz 1032 :GNC 79 : records.tail->next = palloc0_object(StateFileChunk);
7576 tgl@sss.pgh.pa.us 1033 :CBC 79 : records.tail = records.tail->next;
1034 : 79 : records.tail->len = 0;
1035 : 79 : records.tail->next = NULL;
4133 heikki.linnakangas@i 1036 : 79 : records.num_chunks++;
1037 : :
7576 tgl@sss.pgh.pa.us 1038 : 79 : records.bytes_free = Max(padlen, 512);
1039 : 79 : records.tail->data = palloc(records.bytes_free);
1040 : : }
1041 : :
103 peter@eisentraut.org 1042 :GNC 3854 : memcpy(records.tail->data + records.tail->len, data, len);
7576 tgl@sss.pgh.pa.us 1043 :CBC 3854 : records.tail->len += padlen;
1044 : 3854 : records.bytes_free -= padlen;
1045 : 3854 : records.total_len += padlen;
1046 : 3854 : }
1047 : :
1048 : : /*
1049 : : * Start preparing a state file.
1050 : : *
1051 : : * Initializes data structure and inserts the 2PC file header record.
1052 : : */
1053 : : void
1054 : 314 : StartPrepare(GlobalTransaction gxact)
1055 : : {
752 heikki.linnakangas@i 1056 : 314 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
251 michael@paquier.xyz 1057 :GNC 314 : TransactionId xid = XidFromFullTransactionId(gxact->fxid);
1058 : : TwoPhaseFileHeader hdr;
1059 : : TransactionId *children;
1060 : : RelFileLocator *commitrels;
1061 : : RelFileLocator *abortrels;
1439 andres@anarazel.de 1062 :CBC 314 : xl_xact_stats_item *abortstats = NULL;
1063 : 314 : xl_xact_stats_item *commitstats = NULL;
1064 : : SharedInvalidationMessage *invalmsgs;
1065 : :
1066 : : /* Initialize linked list */
95 michael@paquier.xyz 1067 :GNC 314 : records.head = palloc0_object(StateFileChunk);
7576 tgl@sss.pgh.pa.us 1068 :CBC 314 : records.head->len = 0;
1069 : 314 : records.head->next = NULL;
1070 : :
1071 : 314 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1072 : 314 : records.head->data = palloc(records.bytes_free);
1073 : :
1074 : 314 : records.tail = records.head;
4133 heikki.linnakangas@i 1075 : 314 : records.num_chunks = 1;
1076 : :
7576 tgl@sss.pgh.pa.us 1077 : 314 : records.total_len = 0;
1078 : :
1079 : : /* Create header */
1080 : 314 : hdr.magic = TWOPHASE_MAGIC;
1081 : 314 : hdr.total_len = 0; /* EndPrepare will fill this in */
1082 : 314 : hdr.xid = xid;
5224 rhaas@postgresql.org 1083 : 314 : hdr.database = proc->databaseId;
7575 tgl@sss.pgh.pa.us 1084 : 314 : hdr.prepared_at = gxact->prepared_at;
1085 : 314 : hdr.owner = gxact->owner;
7576 1086 : 314 : hdr.nsubxacts = xactGetCommittedChildren(&children);
5693 rhaas@postgresql.org 1087 : 314 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1088 : 314 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1439 andres@anarazel.de 1089 : 314 : hdr.ncommitstats =
1090 : 314 : pgstat_get_transactional_drops(true, &commitstats);
1091 : 314 : hdr.nabortstats =
1092 : 314 : pgstat_get_transactional_drops(false, &abortstats);
5930 simon@2ndQuadrant.co 1093 : 314 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1094 : : &hdr.initfileinval);
3189 tgl@sss.pgh.pa.us 1095 : 314 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1096 : : /* EndPrepare will fill the origin data, if necessary */
1490 michael@paquier.xyz 1097 : 314 : hdr.origin_lsn = InvalidXLogRecPtr;
1098 : 314 : hdr.origin_timestamp = 0;
1099 : :
7576 tgl@sss.pgh.pa.us 1100 : 314 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
3657 simon@2ndQuadrant.co 1101 : 314 : save_state_data(gxact->gid, hdr.gidlen);
1102 : :
1103 : : /*
1104 : : * Add the additional info about subxacts, deletable files and cache
1105 : : * invalidation messages.
1106 : : */
7576 tgl@sss.pgh.pa.us 1107 [ + + ]: 314 : if (hdr.nsubxacts > 0)
1108 : : {
1109 : 101 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1110 : : /* While we have the child-xact data, stuff it in the gxact too */
1111 : 101 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1112 : : }
1113 [ + + ]: 314 : if (hdr.ncommitrels > 0)
1114 : : {
1348 rhaas@postgresql.org 1115 : 21 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
7576 tgl@sss.pgh.pa.us 1116 : 21 : pfree(commitrels);
1117 : : }
1118 [ + + ]: 314 : if (hdr.nabortrels > 0)
1119 : : {
1348 rhaas@postgresql.org 1120 : 29 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
7576 tgl@sss.pgh.pa.us 1121 : 29 : pfree(abortrels);
1122 : : }
1439 andres@anarazel.de 1123 [ + + ]: 314 : if (hdr.ncommitstats > 0)
1124 : : {
1125 : 21 : save_state_data(commitstats,
1126 : 21 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1127 : 21 : pfree(commitstats);
1128 : : }
1129 [ + + ]: 314 : if (hdr.nabortstats > 0)
1130 : : {
1131 : 25 : save_state_data(abortstats,
1403 tgl@sss.pgh.pa.us 1132 : 25 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1439 andres@anarazel.de 1133 : 25 : pfree(abortstats);
1134 : : }
5930 simon@2ndQuadrant.co 1135 [ + + ]: 314 : if (hdr.ninvalmsgs > 0)
1136 : : {
1137 : 37 : save_state_data(invalmsgs,
1138 : 37 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1139 : 37 : pfree(invalmsgs);
1140 : : }
7576 tgl@sss.pgh.pa.us 1141 : 314 : }
1142 : :
1143 : : /*
1144 : : * Finish preparing state data and writing it to WAL.
1145 : : */
1146 : : void
1147 : 312 : EndPrepare(GlobalTransaction gxact)
1148 : : {
1149 : : TwoPhaseFileHeader *hdr;
1150 : : StateFileChunk *record;
1151 : : bool replorigin;
1152 : :
1153 : : /* Add the end sentinel to the list of 2PC records */
1154 : 312 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1155 : : NULL, 0);
1156 : :
1157 : : /* Go back and fill in total_len in the file header record */
1158 : 312 : hdr = (TwoPhaseFileHeader *) records.head->data;
1159 [ - + ]: 312 : Assert(hdr->magic == TWOPHASE_MAGIC);
3988 heikki.linnakangas@i 1160 : 312 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1161 : :
46 msawada@postgresql.o 1162 [ + + ]:GNC 338 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
1163 [ + - ]: 26 : replorigin_xact_state.origin != DoNotReplicateId);
1164 : :
2909 simon@2ndQuadrant.co 1165 [ + + ]:CBC 312 : if (replorigin)
1166 : : {
46 msawada@postgresql.o 1167 :GNC 26 : hdr->origin_lsn = replorigin_xact_state.origin_lsn;
1168 : 26 : hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
1169 : : }
1170 : :
1171 : : /*
1172 : : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1173 : : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1174 : : * where we write data to file and then re-read at commit time.
1175 : : */
6509 heikki.linnakangas@i 1176 [ - + ]:CBC 312 : if (hdr->total_len > MaxAllocSize)
6509 heikki.linnakangas@i 1177 [ # # ]:UBC 0 : ereport(ERROR,
1178 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1179 : : errmsg("two-phase state file maximum length exceeded")));
1180 : :
1181 : : /*
1182 : : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1183 : : * cover us, so no need to calculate a separate CRC.
1184 : : *
1185 : : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1186 : : * starting immediately after the WAL record is inserted could complete
1187 : : * without fsync'ing our state file. (This is essentially the same kind
1188 : : * of race condition as the COMMIT-to-clog-write case that
1189 : : * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
1190 : : * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
1191 : : * the critical commit section. We need to know about such transactions
1192 : : * for conflict detection in logical replication. See
1193 : : * GetOldestActiveTransactionId(true, false) and its use.
1194 : : *
1195 : : * We save the PREPARE record's location in the gxact for later use by
1196 : : * CheckPointTwoPhase.
1197 : : */
4133 heikki.linnakangas@i 1198 :CBC 312 : XLogEnsureRecordSpace(0, records.num_chunks);
1199 : :
7576 tgl@sss.pgh.pa.us 1200 : 312 : START_CRIT_SECTION();
1201 : :
1437 rhaas@postgresql.org 1202 [ - + ]: 312 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1203 : 312 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1204 : :
4133 heikki.linnakangas@i 1205 : 312 : XLogBeginInsert();
1206 [ + + ]: 703 : for (record = records.head; record != NULL; record = record->next)
1207 : 391 : XLogRegisterData(record->data, record->len);
1208 : :
2909 simon@2ndQuadrant.co 1209 : 312 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1210 : :
3707 1211 : 312 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1212 : :
2909 1213 [ + + ]: 312 : if (replorigin)
1214 : : {
1215 : : /* Move LSNs forward for this replication origin */
46 msawada@postgresql.o 1216 :GNC 26 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
1217 : : gxact->prepare_end_lsn);
1218 : : }
1219 : :
3707 simon@2ndQuadrant.co 1220 :CBC 312 : XLogFlush(gxact->prepare_end_lsn);
1221 : :
1222 : : /* If we crash now, we have prepared: WAL replay will fix things */
1223 : :
1224 : : /* Store record's start location to read that later on Commit */
1225 : 312 : gxact->prepare_start_lsn = ProcLastRecPtr;
1226 : :
1227 : : /*
1228 : : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1229 : : * as not running our XID (which it will do immediately after this
1230 : : * function returns), others can commit/rollback the xact.
1231 : : *
1232 : : * NB: a side effect of this is to make a dummy ProcArray entry for the
1233 : : * prepared XID. This must happen before we clear the XID from MyProc /
1234 : : * ProcGlobal->xids[], else there is a window where the XID is not running
1235 : : * according to TransactionIdIsInProgress, and onlookers would be entitled
1236 : : * to assume the xact crashed. Instead we have a window where the same
1237 : : * XID appears twice in ProcArray, which is OK.
1238 : : */
3196 alvherre@alvh.no-ip. 1239 : 312 : MarkAsPrepared(gxact, false);
1240 : :
1241 : : /*
1242 : : * Now we can mark ourselves as out of the commit critical section: a
1243 : : * checkpoint starting after this will certainly see the gxact as a
1244 : : * candidate for fsyncing.
1245 : : */
1437 rhaas@postgresql.org 1246 : 312 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1247 : :
1248 : : /*
1249 : : * Remember that we have this GlobalTransaction entry locked for us. If
1250 : : * we crash after this point, it's too late to abort, but we must unlock
1251 : : * it so that the prepared transaction can be committed or rolled back.
1252 : : */
4322 heikki.linnakangas@i 1253 : 312 : MyLockedGxact = gxact;
1254 : :
7576 tgl@sss.pgh.pa.us 1255 [ - + ]: 312 : END_CRIT_SECTION();
1256 : :
1257 : : /*
1258 : : * Wait for synchronous replication, if required.
1259 : : *
1260 : : * Note that at this stage we have marked the prepare, but still show as
1261 : : * running in the procarray (twice!) and continue to hold locks.
1262 : : */
3638 rhaas@postgresql.org 1263 : 312 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1264 : :
7576 tgl@sss.pgh.pa.us 1265 : 312 : records.tail = records.head = NULL;
4133 heikki.linnakangas@i 1266 : 312 : records.num_chunks = 0;
7576 tgl@sss.pgh.pa.us 1267 : 312 : }
1268 : :
1269 : : /*
1270 : : * Register a 2PC record to be written to state file.
1271 : : */
1272 : : void
1273 : 1652 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1274 : : const void *data, uint32 len)
1275 : : {
1276 : : TwoPhaseRecordOnDisk record;
1277 : :
1278 : 1652 : record.rmid = rmid;
1279 : 1652 : record.info = info;
1280 : 1652 : record.len = len;
1281 : 1652 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1282 [ + + ]: 1652 : if (len > 0)
1283 : 1340 : save_state_data(data, len);
1284 : 1652 : }
1285 : :
1286 : :
1287 : : /*
1288 : : * Read and validate the state file for xid.
1289 : : *
1290 : : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1291 : : * contents of the file, issuing an error when finding corrupted data. If
1292 : : * missing_ok is true, which indicates that missing files can be safely
1293 : : * ignored, then return NULL. This state can be reached when doing recovery
1294 : : * after discarding two-phase files from frozen epochs.
1295 : : */
1296 : : static char *
251 michael@paquier.xyz 1297 :GNC 432 : ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
1298 : : {
1299 : : char path[MAXPGPATH];
1300 : : char *buf;
1301 : : TwoPhaseFileHeader *hdr;
1302 : : int fd;
1303 : : struct stat stat;
1304 : : uint32 crc_offset;
1305 : : pg_crc32c calc_crc,
1306 : : file_crc;
1307 : : int r;
1308 : :
1309 : 432 : TwoPhaseFilePath(path, fxid);
1310 : :
3095 peter_e@gmx.net 1311 :CBC 432 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
7576 tgl@sss.pgh.pa.us 1312 [ + + ]: 432 : if (fd < 0)
1313 : : {
2746 michael@paquier.xyz 1314 [ + - + - ]: 358 : if (missing_ok && errno == ENOENT)
1315 : 358 : return NULL;
1316 : :
2746 michael@paquier.xyz 1317 [ # # ]:UBC 0 : ereport(ERROR,
1318 : : (errcode_for_file_access(),
1319 : : errmsg("could not open file \"%s\": %m", path)));
1320 : : }
1321 : :
1322 : : /*
1323 : : * Check file length. We can determine a lower bound pretty easily. We
1324 : : * set an upper bound to avoid palloc() failure on a corrupt file, though
1325 : : * we can't guarantee that we won't get an out of memory error anyway,
1326 : : * even on a valid file.
1327 : : */
7576 tgl@sss.pgh.pa.us 1328 [ - + ]:CBC 74 : if (fstat(fd, &stat))
2746 michael@paquier.xyz 1329 [ # # ]:UBC 0 : ereport(ERROR,
1330 : : (errcode_for_file_access(),
1331 : : errmsg("could not stat file \"%s\": %m", path)));
1332 : :
7576 tgl@sss.pgh.pa.us 1333 [ + - ]:CBC 74 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1334 : : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
3988 heikki.linnakangas@i 1335 : 74 : sizeof(pg_crc32c)) ||
6509 1336 [ - + ]: 74 : stat.st_size > MaxAllocSize)
2746 michael@paquier.xyz 1337 [ # # ]:UBC 0 : ereport(ERROR,
1338 : : (errcode(ERRCODE_DATA_CORRUPTED),
1339 : : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1340 : : "incorrect size of file \"%s\": %lld bytes",
1341 : : (long long int) stat.st_size, path,
1342 : : (long long int) stat.st_size)));
1343 : :
3988 heikki.linnakangas@i 1344 :CBC 74 : crc_offset = stat.st_size - sizeof(pg_crc32c);
7576 tgl@sss.pgh.pa.us 1345 [ - + ]: 74 : if (crc_offset != MAXALIGN(crc_offset))
2746 michael@paquier.xyz 1346 [ # # ]:UBC 0 : ereport(ERROR,
1347 : : (errcode(ERRCODE_DATA_CORRUPTED),
1348 : : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1349 : : path)));
1350 : :
1351 : : /*
1352 : : * OK, slurp in the file.
1353 : : */
7576 tgl@sss.pgh.pa.us 1354 :CBC 74 : buf = (char *) palloc(stat.st_size);
1355 : :
3284 rhaas@postgresql.org 1356 : 74 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
2797 michael@paquier.xyz 1357 : 74 : r = read(fd, buf, stat.st_size);
1358 [ - + ]: 74 : if (r != stat.st_size)
1359 : : {
2746 michael@paquier.xyz 1360 [ # # ]:UBC 0 : if (r < 0)
1361 [ # # ]: 0 : ereport(ERROR,
1362 : : (errcode_for_file_access(),
1363 : : errmsg("could not read file \"%s\": %m", path)));
1364 : : else
1365 [ # # ]: 0 : ereport(ERROR,
1366 : : (errmsg("could not read file \"%s\": read %d of %lld",
1367 : : path, r, (long long int) stat.st_size)));
1368 : : }
1369 : :
3284 rhaas@postgresql.org 1370 :CBC 74 : pgstat_report_wait_end();
1371 : :
2444 peter@eisentraut.org 1372 [ - + ]: 74 : if (CloseTransientFile(fd) != 0)
2563 michael@paquier.xyz 1373 [ # # ]:UBC 0 : ereport(ERROR,
1374 : : (errcode_for_file_access(),
1375 : : errmsg("could not close file \"%s\": %m", path)));
1376 : :
7576 tgl@sss.pgh.pa.us 1377 :CBC 74 : hdr = (TwoPhaseFileHeader *) buf;
2746 michael@paquier.xyz 1378 [ - + ]: 74 : if (hdr->magic != TWOPHASE_MAGIC)
2746 michael@paquier.xyz 1379 [ # # ]:UBC 0 : ereport(ERROR,
1380 : : (errcode(ERRCODE_DATA_CORRUPTED),
1381 : : errmsg("invalid magic number stored in file \"%s\"",
1382 : : path)));
1383 : :
2746 michael@paquier.xyz 1384 [ - + ]:CBC 74 : if (hdr->total_len != stat.st_size)
2746 michael@paquier.xyz 1385 [ # # ]:UBC 0 : ereport(ERROR,
1386 : : (errcode(ERRCODE_DATA_CORRUPTED),
1387 : : errmsg("invalid size stored in file \"%s\"",
1388 : : path)));
1389 : :
4149 heikki.linnakangas@i 1390 :CBC 74 : INIT_CRC32C(calc_crc);
1391 : 74 : COMP_CRC32C(calc_crc, buf, crc_offset);
1392 : 74 : FIN_CRC32C(calc_crc);
1393 : :
3988 1394 : 74 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1395 : :
4149 1396 [ - + ]: 74 : if (!EQ_CRC32C(calc_crc, file_crc))
2746 michael@paquier.xyz 1397 [ # # ]:UBC 0 : ereport(ERROR,
1398 : : (errcode(ERRCODE_DATA_CORRUPTED),
1399 : : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1400 : : path)));
1401 : :
7576 tgl@sss.pgh.pa.us 1402 :CBC 74 : return buf;
1403 : : }
1404 : :
1405 : :
1406 : : /*
1407 : : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1408 : : * twophase files and ReadTwoPhaseFile should be used instead.
1409 : : *
1410 : : * Note clearly that this function can access WAL during normal operation,
1411 : : * similarly to the way WALSender or Logical Decoding would do.
1412 : : */
1413 : : static void
3707 simon@2ndQuadrant.co 1414 : 400 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1415 : : {
1416 : : XLogRecord *record;
1417 : : XLogReaderState *xlogreader;
1418 : : char *errormsg;
1419 : :
1770 tmunro@postgresql.or 1420 : 400 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1421 : 400 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1422 : : .segment_open = &wal_segment_open,
1423 : : .segment_close = &wal_segment_close),
1424 : : NULL);
3707 simon@2ndQuadrant.co 1425 [ - + ]: 400 : if (!xlogreader)
3707 simon@2ndQuadrant.co 1426 [ # # ]:UBC 0 : ereport(ERROR,
1427 : : (errcode(ERRCODE_OUT_OF_MEMORY),
1428 : : errmsg("out of memory"),
1429 : : errdetail("Failed while allocating a WAL reading processor.")));
1430 : :
2240 heikki.linnakangas@i 1431 :CBC 400 : XLogBeginRead(xlogreader, lsn);
1770 tmunro@postgresql.or 1432 : 400 : record = XLogReadRecord(xlogreader, &errormsg);
1433 : :
3707 simon@2ndQuadrant.co 1434 [ - + ]: 400 : if (record == NULL)
1435 : : {
1585 noah@leadboat.com 1436 [ # # ]:UBC 0 : if (errormsg)
1437 [ # # ]: 0 : ereport(ERROR,
1438 : : (errcode_for_file_access(),
1439 : : errmsg("could not read two-phase state from WAL at %X/%08X: %s",
1440 : : LSN_FORMAT_ARGS(lsn), errormsg)));
1441 : : else
1442 [ # # ]: 0 : ereport(ERROR,
1443 : : (errcode_for_file_access(),
1444 : : errmsg("could not read two-phase state from WAL at %X/%08X",
1445 : : LSN_FORMAT_ARGS(lsn))));
1446 : : }
1447 : :
3707 simon@2ndQuadrant.co 1448 [ + - ]:CBC 400 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1449 [ - + ]: 400 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
3707 simon@2ndQuadrant.co 1450 [ # # ]:UBC 0 : ereport(ERROR,
1451 : : (errcode_for_file_access(),
1452 : : errmsg("expected two-phase state data is not present in WAL at %X/%08X",
1453 : : LSN_FORMAT_ARGS(lsn))));
1454 : :
3707 simon@2ndQuadrant.co 1455 [ + + ]:CBC 400 : if (len != NULL)
1456 : 26 : *len = XLogRecGetDataLen(xlogreader);
1457 : :
95 michael@paquier.xyz 1458 :GNC 400 : *buf = palloc_array(char, XLogRecGetDataLen(xlogreader));
3707 simon@2ndQuadrant.co 1459 :CBC 400 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1460 : :
1461 : 400 : XLogReaderFree(xlogreader);
1462 : 400 : }
1463 : :
1464 : :
1465 : : /*
1466 : : * Confirms an xid is prepared, during recovery
1467 : : */
1468 : : bool
5930 1469 : 358 : StandbyTransactionIdIsPrepared(TransactionId xid)
1470 : : {
1471 : : char *buf;
1472 : : TwoPhaseFileHeader *hdr;
1473 : : bool result;
1474 : : FullTransactionId fxid;
1475 : :
1476 [ - + ]: 358 : Assert(TransactionIdIsValid(xid));
1477 : :
5800 tgl@sss.pgh.pa.us 1478 [ - + ]: 358 : if (max_prepared_xacts <= 0)
5731 bruce@momjian.us 1479 :UBC 0 : return false; /* nothing to do */
1480 : :
1481 : : /* Read and validate file */
251 michael@paquier.xyz 1482 :GNC 358 : fxid = AdjustToFullTransactionId(xid);
1483 : 358 : buf = ReadTwoPhaseFile(fxid, true);
5930 simon@2ndQuadrant.co 1484 [ + - ]:CBC 358 : if (buf == NULL)
1485 : 358 : return false;
1486 : :
1487 : : /* Check header also */
5930 simon@2ndQuadrant.co 1488 :UBC 0 : hdr = (TwoPhaseFileHeader *) buf;
1489 : 0 : result = TransactionIdEquals(hdr->xid, xid);
1490 : 0 : pfree(buf);
1491 : :
1492 : 0 : return result;
1493 : : }
1494 : :
1495 : : /*
1496 : : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1497 : : */
1498 : : void
7575 tgl@sss.pgh.pa.us 1499 :CBC 326 : FinishPreparedTransaction(const char *gid, bool isCommit)
1500 : : {
1501 : : GlobalTransaction gxact;
1502 : : PGPROC *proc;
1503 : : FullTransactionId fxid;
1504 : : TransactionId xid;
1505 : : bool ondisk;
1506 : : char *buf;
1507 : : char *bufptr;
1508 : : TwoPhaseFileHeader *hdr;
1509 : : TransactionId latestXid;
1510 : : TransactionId *children;
1511 : : RelFileLocator *commitrels;
1512 : : RelFileLocator *abortrels;
1513 : : RelFileLocator *delrels;
1514 : : int ndelrels;
1515 : : xl_xact_stats_item *commitstats;
1516 : : xl_xact_stats_item *abortstats;
1517 : : SharedInvalidationMessage *invalmsgs;
1518 : :
1519 : : /*
1520 : : * Validate the GID, and lock the GXACT to ensure that two backends do not
1521 : : * try to commit the same GID at once.
1522 : : */
7576 1523 : 326 : gxact = LockGXact(gid, GetUserId());
752 heikki.linnakangas@i 1524 : 319 : proc = GetPGProcByNumber(gxact->pgprocno);
251 michael@paquier.xyz 1525 :GNC 319 : fxid = gxact->fxid;
1526 : 319 : xid = XidFromFullTransactionId(fxid);
1527 : :
1528 : : /*
1529 : : * Read and validate 2PC state data. State data will typically be stored
1530 : : * in WAL files if the LSN is after the last checkpoint record, or moved
1531 : : * to disk if for some reason they have lived for a long time.
1532 : : */
3707 simon@2ndQuadrant.co 1533 [ + + ]:CBC 319 : if (gxact->ondisk)
251 michael@paquier.xyz 1534 :GNC 24 : buf = ReadTwoPhaseFile(fxid, false);
1535 : : else
3707 simon@2ndQuadrant.co 1536 :CBC 295 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1537 : :
1538 : :
1539 : : /*
1540 : : * Disassemble the header area
1541 : : */
7576 tgl@sss.pgh.pa.us 1542 : 319 : hdr = (TwoPhaseFileHeader *) buf;
1543 [ - + ]: 319 : Assert(TransactionIdEquals(hdr->xid, xid));
1544 : 319 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
3657 simon@2ndQuadrant.co 1545 : 319 : bufptr += MAXALIGN(hdr->gidlen);
7576 tgl@sss.pgh.pa.us 1546 : 319 : children = (TransactionId *) bufptr;
1547 : 319 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1348 rhaas@postgresql.org 1548 : 319 : commitrels = (RelFileLocator *) bufptr;
1549 : 319 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1550 : 319 : abortrels = (RelFileLocator *) bufptr;
1551 : 319 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1403 tgl@sss.pgh.pa.us 1552 : 319 : commitstats = (xl_xact_stats_item *) bufptr;
1439 andres@anarazel.de 1553 : 319 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1403 tgl@sss.pgh.pa.us 1554 : 319 : abortstats = (xl_xact_stats_item *) bufptr;
1439 andres@anarazel.de 1555 : 319 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
5930 simon@2ndQuadrant.co 1556 : 319 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1557 : 319 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1558 : :
1559 : : /* compute latestXid among all children */
6763 tgl@sss.pgh.pa.us 1560 : 319 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1561 : :
1562 : : /* Prevent cancel/die interrupt while cleaning up */
2873 teodor@sigaev.ru 1563 : 319 : HOLD_INTERRUPTS();
1564 : :
1565 : : /*
1566 : : * The order of operations here is critical: make the XLOG entry for
1567 : : * commit or abort, then mark the transaction committed or aborted in
1568 : : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1569 : : * TransactionIdIsInProgress will stop saying the prepared xact is in
1570 : : * progress), then run the post-commit or post-abort callbacks. The
1571 : : * callbacks will release the locks the transaction held.
1572 : : */
7576 tgl@sss.pgh.pa.us 1573 [ + + ]: 319 : if (isCommit)
1574 : 273 : RecordTransactionCommitPrepared(xid,
1575 : : hdr->nsubxacts, children,
1576 : : hdr->ncommitrels, commitrels,
1577 : : hdr->ncommitstats,
1578 : : commitstats,
1579 : : hdr->ninvalmsgs, invalmsgs,
2909 simon@2ndQuadrant.co 1580 : 273 : hdr->initfileinval, gid);
1581 : : else
7576 tgl@sss.pgh.pa.us 1582 : 46 : RecordTransactionAbortPrepared(xid,
1583 : : hdr->nsubxacts, children,
1584 : : hdr->nabortrels, abortrels,
1585 : : hdr->nabortstats,
1586 : : abortstats,
1587 : : gid);
1588 : :
5224 rhaas@postgresql.org 1589 : 319 : ProcArrayRemove(proc, latestXid);
1590 : :
1591 : : /*
1592 : : * In case we fail while running the callbacks, mark the gxact invalid so
1593 : : * no one else will try to commit/rollback, and so it will be recycled if
1594 : : * we fail after this point. It is still locked by our backend so it
1595 : : * won't go away yet.
1596 : : *
1597 : : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1598 : : */
7576 tgl@sss.pgh.pa.us 1599 : 319 : gxact->valid = false;
1600 : :
1601 : : /*
1602 : : * We have to remove any files that were supposed to be dropped. For
1603 : : * consistency with the regular xact.c code paths, must do this before
1604 : : * releasing locks, so do it before running the callbacks.
1605 : : *
1606 : : * NB: this code knows that we couldn't be dropping any temp rels ...
1607 : : */
1608 [ + + ]: 319 : if (isCommit)
1609 : : {
6325 heikki.linnakangas@i 1610 : 273 : delrels = commitrels;
1611 : 273 : ndelrels = hdr->ncommitrels;
1612 : : }
1613 : : else
1614 : : {
1615 : 46 : delrels = abortrels;
1616 : 46 : ndelrels = hdr->nabortrels;
1617 : : }
1618 : :
1619 : : /* Make sure files supposed to be dropped are dropped */
2810 fujii@postgresql.org 1620 : 319 : DropRelationFiles(delrels, ndelrels, false);
1621 : :
1439 andres@anarazel.de 1622 [ + + ]: 319 : if (isCommit)
1623 : 273 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1624 : : else
1625 : 46 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1626 : :
1627 : : /*
1628 : : * Handle cache invalidation messages.
1629 : : *
1630 : : * Relcache init file invalidation requires processing both before and
1631 : : * after we send the SI messages, only when committing. See
1632 : : * AtEOXact_Inval().
1633 : : */
1676 michael@paquier.xyz 1634 [ + + ]: 319 : if (isCommit)
1635 : : {
1636 [ - + ]: 273 : if (hdr->initfileinval)
1676 michael@paquier.xyz 1637 :UBC 0 : RelationCacheInitFilePreInvalidate();
1676 michael@paquier.xyz 1638 :CBC 273 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1639 [ - + ]: 273 : if (hdr->initfileinval)
1676 michael@paquier.xyz 1640 :UBC 0 : RelationCacheInitFilePostInvalidate();
1641 : : }
1642 : :
1643 : : /*
1644 : : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1645 : : * while holding it to avoid potential conflicts with other transactions
1646 : : * attempting to use the same GID, so the lock is released once the shared
1647 : : * memory state is cleared.
1648 : : */
2575 michael@paquier.xyz 1649 :CBC 319 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1650 : :
1651 : : /* And now do the callbacks */
7575 tgl@sss.pgh.pa.us 1652 [ + + ]: 319 : if (isCommit)
251 michael@paquier.xyz 1653 :GNC 273 : ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
1654 : : else
1655 : 46 : ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
1656 : :
1657 : 319 : PredicateLockTwoPhaseFinish(fxid, isCommit);
1658 : :
1659 : : /*
1660 : : * Read this value while holding the two-phase lock, as the on-disk 2PC
1661 : : * file is physically removed after the lock is released.
1662 : : */
530 michael@paquier.xyz 1663 :CBC 319 : ondisk = gxact->ondisk;
1664 : :
1665 : : /* Clear shared memory state */
2575 1666 : 319 : RemoveGXact(gxact);
1667 : :
1668 : : /*
1669 : : * Release the lock as all callbacks are called and shared memory cleanup
1670 : : * is done.
1671 : : */
1672 : 319 : LWLockRelease(TwoPhaseStateLock);
1673 : :
1674 : : /* Count the prepared xact as committed or aborted */
2531 akapila@postgresql.o 1675 : 319 : AtEOXact_PgStat(isCommit, false);
1676 : :
1677 : : /*
1678 : : * And now we can clean up any files we may have left.
1679 : : */
530 michael@paquier.xyz 1680 [ + + ]: 319 : if (ondisk)
251 michael@paquier.xyz 1681 :GNC 24 : RemoveTwoPhaseFile(fxid, true);
1682 : :
4322 heikki.linnakangas@i 1683 :CBC 319 : MyLockedGxact = NULL;
1684 : :
2873 teodor@sigaev.ru 1685 [ - + ]: 319 : RESUME_INTERRUPTS();
1686 : :
7576 tgl@sss.pgh.pa.us 1687 : 319 : pfree(buf);
1688 : 319 : }
1689 : :
1690 : : /*
1691 : : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1692 : : */
1693 : : static void
251 michael@paquier.xyz 1694 :GNC 352 : ProcessRecords(char *bufptr, FullTransactionId fxid,
1695 : : const TwoPhaseCallback callbacks[])
1696 : : {
1697 : : for (;;)
7576 tgl@sss.pgh.pa.us 1698 :CBC 1538 : {
1699 : 1890 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1700 : :
1701 [ - + ]: 1890 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1702 [ + + ]: 1890 : if (record->rmid == TWOPHASE_RM_END_ID)
1703 : 352 : break;
1704 : :
1705 : 1538 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1706 : :
1707 [ + + ]: 1538 : if (callbacks[record->rmid] != NULL)
251 michael@paquier.xyz 1708 :GNC 1459 : callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
1709 : :
7576 tgl@sss.pgh.pa.us 1710 :CBC 1538 : bufptr += MAXALIGN(record->len);
1711 : : }
1712 : 352 : }
1713 : :
1714 : : /*
1715 : : * Remove the 2PC file.
1716 : : *
1717 : : * If giveWarning is false, do not complain about file-not-present;
1718 : : * this is an expected case during WAL replay.
1719 : : *
1720 : : * This routine is used at early stages at recovery where future and
1721 : : * past orphaned files are checked, hence the FullTransactionId to build
1722 : : * a complete file name fit for the removal.
1723 : : */
1724 : : static void
251 michael@paquier.xyz 1725 :GNC 29 : RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
1726 : : {
1727 : : char path[MAXPGPATH];
1728 : :
1729 : 29 : TwoPhaseFilePath(path, fxid);
7576 tgl@sss.pgh.pa.us 1730 [ - + ]:CBC 29 : if (unlink(path))
7576 tgl@sss.pgh.pa.us 1731 [ # # # # ]:UBC 0 : if (errno != ENOENT || giveWarning)
1732 [ # # ]: 0 : ereport(WARNING,
1733 : : (errcode_for_file_access(),
1734 : : errmsg("could not remove file \"%s\": %m", path)));
7576 tgl@sss.pgh.pa.us 1735 :CBC 29 : }
1736 : :
1737 : : /*
1738 : : * Recreates a state file. This is used in WAL replay and during
1739 : : * checkpoint creation.
1740 : : *
1741 : : * Note: content and len don't include CRC.
1742 : : */
1743 : : static void
251 michael@paquier.xyz 1744 :GNC 26 : RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
1745 : : {
1746 : : char path[MAXPGPATH];
1747 : : pg_crc32c statefile_crc;
1748 : : int fd;
1749 : :
1750 : : /* Recompute CRC */
4149 heikki.linnakangas@i 1751 :CBC 26 : INIT_CRC32C(statefile_crc);
1752 : 26 : COMP_CRC32C(statefile_crc, content, len);
1753 : 26 : FIN_CRC32C(statefile_crc);
1754 : :
251 michael@paquier.xyz 1755 :GNC 26 : TwoPhaseFilePath(path, fxid);
1756 : :
4856 heikki.linnakangas@i 1757 :CBC 26 : fd = OpenTransientFile(path,
1758 : : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
7576 tgl@sss.pgh.pa.us 1759 [ - + ]: 26 : if (fd < 0)
7576 tgl@sss.pgh.pa.us 1760 [ # # ]:UBC 0 : ereport(ERROR,
1761 : : (errcode_for_file_access(),
1762 : : errmsg("could not recreate file \"%s\": %m", path)));
1763 : :
1764 : : /* Write content and CRC */
2779 michael@paquier.xyz 1765 :CBC 26 : errno = 0;
3284 rhaas@postgresql.org 1766 : 26 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
7576 tgl@sss.pgh.pa.us 1767 [ - + ]: 26 : if (write(fd, content, len) != len)
1768 : : {
1769 : : /* if write didn't set errno, assume problem is no disk space */
2524 michael@paquier.xyz 1770 [ # # ]:UBC 0 : if (errno == 0)
1771 : 0 : errno = ENOSPC;
7576 tgl@sss.pgh.pa.us 1772 [ # # ]: 0 : ereport(ERROR,
1773 : : (errcode_for_file_access(),
1774 : : errmsg("could not write file \"%s\": %m", path)));
1775 : : }
3988 heikki.linnakangas@i 1776 [ - + ]:CBC 26 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1777 : : {
1778 : : /* if write didn't set errno, assume problem is no disk space */
2524 michael@paquier.xyz 1779 [ # # ]:UBC 0 : if (errno == 0)
1780 : 0 : errno = ENOSPC;
7576 tgl@sss.pgh.pa.us 1781 [ # # ]: 0 : ereport(ERROR,
1782 : : (errcode_for_file_access(),
1783 : : errmsg("could not write file \"%s\": %m", path)));
1784 : : }
3284 rhaas@postgresql.org 1785 :CBC 26 : pgstat_report_wait_end();
1786 : :
1787 : : /*
1788 : : * We must fsync the file because the end-of-replay checkpoint will not do
1789 : : * so, there being no GXACT in shared memory yet to tell it to.
1790 : : */
1791 : 26 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
7576 tgl@sss.pgh.pa.us 1792 [ - + ]: 26 : if (pg_fsync(fd) != 0)
7576 tgl@sss.pgh.pa.us 1793 [ # # ]:UBC 0 : ereport(ERROR,
1794 : : (errcode_for_file_access(),
1795 : : errmsg("could not fsync file \"%s\": %m", path)));
3284 rhaas@postgresql.org 1796 :CBC 26 : pgstat_report_wait_end();
1797 : :
4856 heikki.linnakangas@i 1798 [ - + ]: 26 : if (CloseTransientFile(fd) != 0)
7576 tgl@sss.pgh.pa.us 1799 [ # # ]:UBC 0 : ereport(ERROR,
1800 : : (errcode_for_file_access(),
1801 : : errmsg("could not close file \"%s\": %m", path)));
7576 tgl@sss.pgh.pa.us 1802 :CBC 26 : }
1803 : :
1804 : : /*
1805 : : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1806 : : *
1807 : : * We must fsync the state file of any GXACT that is valid or has been
1808 : : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1809 : : * horizon. (If the gxact isn't valid yet, has not been generated in
1810 : : * redo, or has a later LSN, this checkpoint is not responsible for
1811 : : * fsyncing it.)
1812 : : *
1813 : : * This is deliberately run as late as possible in the checkpoint sequence,
1814 : : * because GXACTs ordinarily have short lifespans, and so it is quite
1815 : : * possible that GXACTs that were valid at checkpoint start will no longer
1816 : : * exist if we wait a little bit. With typical checkpoint settings this
1817 : : * will be about 3 minutes for an online checkpoint, so as a result we
1818 : : * expect that there will be no GXACTs that need to be copied to disk.
1819 : : *
1820 : : * If a GXACT remains valid across multiple checkpoints, it will already
1821 : : * be on disk so we don't bother to repeat that write.
1822 : : */
1823 : : void
7574 1824 : 1802 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1825 : : {
1826 : : int i;
3707 simon@2ndQuadrant.co 1827 : 1802 : int serialized_xacts = 0;
1828 : :
7574 tgl@sss.pgh.pa.us 1829 [ + + ]: 1802 : if (max_prepared_xacts <= 0)
1830 : 1259 : return; /* nothing to do */
1831 : :
1832 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1833 : :
1834 : : /*
1835 : : * We are expecting there to be zero GXACTs that need to be copied to
1836 : : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1837 : : * simplicity. This prevents any new xacts from preparing while this
1838 : : * occurs, which shouldn't be a problem since the presence of long-lived
1839 : : * prepared xacts indicates the transaction manager isn't active.
1840 : : *
1841 : : * It's also possible to move I/O out of the lock, but on every error we
1842 : : * should check whether somebody committed our transaction in different
1843 : : * backend. Let's leave this optimization for future, if somebody will
1844 : : * spot that this place cause bottleneck.
1845 : : *
1846 : : * Note that it isn't possible for there to be a GXACT with a
1847 : : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1848 : : * because of the efforts with delayChkptFlags.
1849 : : */
1850 : 543 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1851 [ + + ]: 579 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1852 : : {
1853 : : /*
1854 : : * Note that we are using gxact not PGPROC so this works in recovery
1855 : : * also
1856 : : */
7456 bruce@momjian.us 1857 : 36 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1858 : :
3267 simon@2ndQuadrant.co 1859 [ + + + - ]: 36 : if ((gxact->valid || gxact->inredo) &&
3707 1860 [ + + ]: 36 : !gxact->ondisk &&
1861 [ + + ]: 32 : gxact->prepare_end_lsn <= redo_horizon)
1862 : : {
1863 : : char *buf;
1864 : : int len;
1865 : :
1866 : 26 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
251 michael@paquier.xyz 1867 :GNC 26 : RecreateTwoPhaseFile(gxact->fxid, buf, len);
3707 simon@2ndQuadrant.co 1868 :CBC 26 : gxact->ondisk = true;
3267 1869 : 26 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1870 : 26 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
3707 1871 : 26 : pfree(buf);
1872 : 26 : serialized_xacts++;
1873 : : }
1874 : : }
1875 : 543 : LWLockRelease(TwoPhaseStateLock);
1876 : :
1877 : : /*
1878 : : * Flush unconditionally the parent directory to make any information
1879 : : * durable on disk. Two-phase files could have been removed and those
1880 : : * removals need to be made persistent as well as any files newly created
1881 : : * previously since the last checkpoint.
1882 : : */
3275 teodor@sigaev.ru 1883 : 543 : fsync_fname(TWOPHASE_DIR, true);
1884 : :
1885 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1886 : :
3707 simon@2ndQuadrant.co 1887 [ + - + + ]: 543 : if (log_checkpoints && serialized_xacts > 0)
1888 [ + - ]: 22 : ereport(LOG,
1889 : : (errmsg_plural("%u two-phase state file was written "
1890 : : "for a long-running prepared transaction",
1891 : : "%u two-phase state files were written "
1892 : : "for long-running prepared transactions",
1893 : : serialized_xacts,
1894 : : serialized_xacts)));
1895 : : }
1896 : :
1897 : : /*
1898 : : * restoreTwoPhaseData
1899 : : *
1900 : : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1901 : : * This is called once at the beginning of recovery, saving any extra
1902 : : * lookups in the future. Two-phase files that are newer than the
1903 : : * minimum XID horizon are discarded on the way.
1904 : : */
1905 : : void
3267 1906 : 1000 : restoreTwoPhaseData(void)
1907 : : {
1908 : : DIR *cldir;
1909 : : struct dirent *clde;
1910 : :
3196 alvherre@alvh.no-ip. 1911 : 1000 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3023 tgl@sss.pgh.pa.us 1912 : 1000 : cldir = AllocateDir(TWOPHASE_DIR);
3267 simon@2ndQuadrant.co 1913 [ + + ]: 3016 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1914 : : {
837 akorotkov@postgresql 1915 [ + + ]: 2016 : if (strlen(clde->d_name) == 16 &&
1916 [ + - ]: 16 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1917 : : {
1918 : : FullTransactionId fxid;
1919 : : char *buf;
1920 : :
1921 : 16 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1922 : :
251 michael@paquier.xyz 1923 :GNC 16 : buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
1924 : : true, false, false);
3267 simon@2ndQuadrant.co 1925 [ - + ]:CBC 16 : if (buf == NULL)
3267 simon@2ndQuadrant.co 1926 :UBC 0 : continue;
1927 : :
251 michael@paquier.xyz 1928 :GNC 16 : PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
1929 : : InvalidXLogRecPtr, InvalidReplOriginId);
1930 : : }
1931 : : }
3196 alvherre@alvh.no-ip. 1932 :CBC 1000 : LWLockRelease(TwoPhaseStateLock);
3267 simon@2ndQuadrant.co 1933 : 1000 : FreeDir(cldir);
1934 : 1000 : }
1935 : :
1936 : : /*
1937 : : * PrescanPreparedTransactions
1938 : : *
1939 : : * Scan the shared memory entries of TwoPhaseState and determine the range
1940 : : * of valid XIDs present. This is run during database startup, after we
1941 : : * have completed reading WAL. TransamVariables->nextXid has been set to
1942 : : * one more than the highest XID for which evidence exists in WAL.
1943 : : *
1944 : : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1945 : : * are present, it suggests that the DBA has done a PITR recovery to an
1946 : : * earlier point in time without cleaning out pg_twophase. We dare not
1947 : : * try to recover such prepared xacts since they likely depend on database
1948 : : * state that doesn't exist now.
1949 : : *
1950 : : * However, we will advance nextXid beyond any subxact XIDs belonging to
1951 : : * valid prepared xacts. We need to do this since subxact commit doesn't
1952 : : * write a WAL entry, and so there might be no evidence in WAL of those
1953 : : * subxact XIDs.
1954 : : *
1955 : : * On corrupted two-phase files, fail immediately. Keeping around broken
1956 : : * entries and let replay continue causes harm on the system, and a new
1957 : : * backup should be rolled in.
1958 : : *
1959 : : * Our other responsibility is to determine and return the oldest valid XID
1960 : : * among the prepared xacts (if none, return TransamVariables->nextXid).
1961 : : * This is needed to synchronize pg_subtrans startup properly.
1962 : : *
1963 : : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1964 : : * top-level xids is stored in *xids_p. The number of entries in the array
1965 : : * is returned in *nxids_p.
1966 : : */
1967 : : TransactionId
5930 1968 : 1004 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1969 : : {
828 heikki.linnakangas@i 1970 : 1004 : FullTransactionId nextXid = TransamVariables->nextXid;
2042 andres@anarazel.de 1971 : 1004 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
7576 tgl@sss.pgh.pa.us 1972 : 1004 : TransactionId result = origNextXid;
5930 simon@2ndQuadrant.co 1973 : 1004 : TransactionId *xids = NULL;
1974 : 1004 : int nxids = 0;
1975 : 1004 : int allocsize = 0;
1976 : : int i;
1977 : :
3196 alvherre@alvh.no-ip. 1978 : 1004 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3267 simon@2ndQuadrant.co 1979 [ + + ]: 1058 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1980 : : {
1981 : : TransactionId xid;
1982 : : char *buf;
1983 : 54 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1984 : :
1985 [ - + ]: 54 : Assert(gxact->inredo);
1986 : :
251 michael@paquier.xyz 1987 :GNC 54 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
1988 : : gxact->prepare_start_lsn,
3224 bruce@momjian.us 1989 :CBC 54 : gxact->ondisk, false, true);
1990 : :
3267 simon@2ndQuadrant.co 1991 [ - + ]: 54 : if (buf == NULL)
3267 simon@2ndQuadrant.co 1992 :UBC 0 : continue;
1993 : :
1994 : : /*
1995 : : * OK, we think this file is valid. Incorporate xid into the
1996 : : * running-minimum result.
1997 : : */
251 michael@paquier.xyz 1998 :GNC 54 : xid = XidFromFullTransactionId(gxact->fxid);
3253 simon@2ndQuadrant.co 1999 [ + + ]:CBC 54 : if (TransactionIdPrecedes(xid, result))
2000 : 47 : result = xid;
2001 : :
3267 2002 [ + + ]: 54 : if (xids_p)
2003 : : {
2004 [ + + ]: 21 : if (nxids == allocsize)
2005 : : {
2006 [ + - ]: 17 : if (nxids == 0)
2007 : : {
2008 : 17 : allocsize = 10;
2009 : 17 : xids = palloc(allocsize * sizeof(TransactionId));
2010 : : }
2011 : : else
2012 : : {
3267 simon@2ndQuadrant.co 2013 :UBC 0 : allocsize = allocsize * 2;
2014 : 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2015 : : }
2016 : : }
3267 simon@2ndQuadrant.co 2017 :CBC 21 : xids[nxids++] = xid;
2018 : : }
2019 : :
2020 : 54 : pfree(buf);
2021 : : }
2022 : 1004 : LWLockRelease(TwoPhaseStateLock);
2023 : :
5930 2024 [ + + ]: 1004 : if (xids_p)
2025 : : {
2026 : 65 : *xids_p = xids;
2027 : 65 : *nxids_p = nxids;
2028 : : }
2029 : :
7576 tgl@sss.pgh.pa.us 2030 : 1004 : return result;
2031 : : }
2032 : :
2033 : : /*
2034 : : * StandbyRecoverPreparedTransactions
2035 : : *
2036 : : * Scan the shared memory entries of TwoPhaseState and setup all the required
2037 : : * information to allow standby queries to treat prepared transactions as still
2038 : : * active.
2039 : : *
2040 : : * This is never called at the end of recovery - we use
2041 : : * RecoverPreparedTransactions() at that point.
2042 : : *
2043 : : * This updates pg_subtrans, so that any subtransactions will be correctly
2044 : : * seen as in-progress in snapshots taken during recovery.
2045 : : */
2046 : : void
3244 simon@2ndQuadrant.co 2047 : 65 : StandbyRecoverPreparedTransactions(void)
2048 : : {
2049 : : int i;
2050 : :
3196 alvherre@alvh.no-ip. 2051 : 65 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3267 simon@2ndQuadrant.co 2052 [ + + ]: 86 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2053 : : {
2054 : : char *buf;
2055 : 21 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2056 : :
2057 [ - + ]: 21 : Assert(gxact->inredo);
2058 : :
251 michael@paquier.xyz 2059 :GNC 21 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2060 : : gxact->prepare_start_lsn,
626 heikki.linnakangas@i 2061 :CBC 21 : gxact->ondisk, true, false);
3267 simon@2ndQuadrant.co 2062 [ + - ]: 21 : if (buf != NULL)
3475 2063 : 21 : pfree(buf);
2064 : : }
3267 2065 : 65 : LWLockRelease(TwoPhaseStateLock);
5815 heikki.linnakangas@i 2066 : 65 : }
2067 : :
2068 : : /*
2069 : : * RecoverPreparedTransactions
2070 : : *
2071 : : * Scan the shared memory entries of TwoPhaseState and reload the state for
2072 : : * each prepared transaction (reacquire locks, etc).
2073 : : *
2074 : : * This is run at the end of recovery, but before we allow backends to write
2075 : : * WAL.
2076 : : *
2077 : : * At the end of recovery the way we take snapshots will change. We now need
2078 : : * to mark all running transactions with their full SubTransSetParent() info
2079 : : * to allow normal snapshots to work correctly if snapshots overflow.
2080 : : * We do this here because by definition prepared transactions are the only
2081 : : * type of write transaction still running, so this is necessary and
2082 : : * complete.
2083 : : */
2084 : : void
7576 tgl@sss.pgh.pa.us 2085 : 939 : RecoverPreparedTransactions(void)
2086 : : {
2087 : : int i;
2088 : :
3196 alvherre@alvh.no-ip. 2089 : 939 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3267 simon@2ndQuadrant.co 2090 [ + + ]: 972 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2091 : : {
2092 : : char *buf;
2093 : 33 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
251 michael@paquier.xyz 2094 :GNC 33 : FullTransactionId fxid = gxact->fxid;
2095 : : char *bufptr;
2096 : : TwoPhaseFileHeader *hdr;
2097 : : TransactionId *subxids;
2098 : : const char *gid;
2099 : :
2100 : : /*
2101 : : * Reconstruct subtrans state for the transaction --- needed because
2102 : : * pg_subtrans is not preserved over a restart. Note that we are
2103 : : * linking all the subtransactions directly to the top-level XID;
2104 : : * there may originally have been a more complex hierarchy, but
2105 : : * there's no need to restore that exactly. It's possible that
2106 : : * SubTransSetParent has been set before, if the prepared transaction
2107 : : * generated xid assignment records.
2108 : : */
2109 : 33 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2110 : : gxact->prepare_start_lsn,
3224 bruce@momjian.us 2111 :CBC 33 : gxact->ondisk, true, false);
3267 simon@2ndQuadrant.co 2112 [ - + ]: 33 : if (buf == NULL)
3267 simon@2ndQuadrant.co 2113 :UBC 0 : continue;
2114 : :
3267 simon@2ndQuadrant.co 2115 [ + - ]:CBC 33 : ereport(LOG,
2116 : : (errmsg("recovering prepared transaction %u of epoch %u from shared memory",
2117 : : XidFromFullTransactionId(gxact->fxid),
2118 : : EpochFromFullTransactionId(gxact->fxid))));
2119 : :
2120 : 33 : hdr = (TwoPhaseFileHeader *) buf;
251 michael@paquier.xyz 2121 [ - + ]:GNC 33 : Assert(TransactionIdEquals(hdr->xid,
2122 : : XidFromFullTransactionId(gxact->fxid)));
3267 simon@2ndQuadrant.co 2123 :CBC 33 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2124 : 33 : gid = (const char *) bufptr;
2125 : 33 : bufptr += MAXALIGN(hdr->gidlen);
2126 : 33 : subxids = (TransactionId *) bufptr;
2127 : 33 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1348 rhaas@postgresql.org 2128 : 33 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2129 : 33 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1439 andres@anarazel.de 2130 : 33 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2131 : 33 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
3267 simon@2ndQuadrant.co 2132 : 33 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2133 : :
2134 : : /*
2135 : : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2136 : : * added in redo and already has a shmem entry for it.
2137 : : */
251 michael@paquier.xyz 2138 :GNC 33 : MarkAsPreparingGuts(gxact, gxact->fxid, gid,
2139 : : hdr->prepared_at,
2140 : : hdr->owner, hdr->database);
2141 : :
2142 : : /* recovered, so reset the flag for entries generated by redo */
3267 simon@2ndQuadrant.co 2143 :CBC 33 : gxact->inredo = false;
2144 : :
2145 : 33 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
3196 alvherre@alvh.no-ip. 2146 : 33 : MarkAsPrepared(gxact, true);
2147 : :
2148 : 33 : LWLockRelease(TwoPhaseStateLock);
2149 : :
2150 : : /*
2151 : : * Recover other state (notably locks) using resource managers.
2152 : : */
251 michael@paquier.xyz 2153 :GNC 33 : ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
2154 : :
2155 : : /*
2156 : : * Release locks held by the standby process after we process each
2157 : : * prepared transaction. As a result, we don't need too many
2158 : : * additional locks at any one time.
2159 : : */
3267 simon@2ndQuadrant.co 2160 [ + + ]:CBC 33 : if (InHotStandby)
251 michael@paquier.xyz 2161 :GNC 7 : StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
2162 : :
2163 : : /*
2164 : : * We're done with recovering this transaction. Clear MyLockedGxact,
2165 : : * like we do in PrepareTransaction() during normal operation.
2166 : : */
3267 simon@2ndQuadrant.co 2167 :CBC 33 : PostPrepare_Twophase();
2168 : :
2169 : 33 : pfree(buf);
2170 : :
3196 alvherre@alvh.no-ip. 2171 : 33 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2172 : : }
2173 : :
2174 : 939 : LWLockRelease(TwoPhaseStateLock);
3267 simon@2ndQuadrant.co 2175 : 939 : }
2176 : :
2177 : : /*
2178 : : * ProcessTwoPhaseBuffer
2179 : : *
2180 : : * Given a FullTransactionId, read it either from disk or read it directly
2181 : : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2182 : : *
2183 : : * If setParent is true, set up subtransaction parent linkages.
2184 : : *
2185 : : * If setNextXid is true, set TransamVariables->nextXid to the newest
2186 : : * value scanned.
2187 : : */
2188 : : static char *
251 michael@paquier.xyz 2189 :GNC 124 : ProcessTwoPhaseBuffer(FullTransactionId fxid,
2190 : : XLogRecPtr prepare_start_lsn,
2191 : : bool fromdisk,
2192 : : bool setParent, bool setNextXid)
2193 : : {
828 heikki.linnakangas@i 2194 :CBC 124 : FullTransactionId nextXid = TransamVariables->nextXid;
2195 : : TransactionId *subxids;
2196 : : char *buf;
2197 : : TwoPhaseFileHeader *hdr;
2198 : : int i;
2199 : :
3196 alvherre@alvh.no-ip. 2200 [ - + ]: 124 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2201 : :
3267 simon@2ndQuadrant.co 2202 [ + + ]: 124 : if (!fromdisk)
129 alvherre@kurilemu.de 2203 [ - + ]:GNC 74 : Assert(XLogRecPtrIsValid(prepare_start_lsn));
2204 : :
2205 : : /* Already processed? */
251 michael@paquier.xyz 2206 [ + - - + ]: 248 : if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
2207 : 124 : TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
2208 : : {
3267 simon@2ndQuadrant.co 2209 [ # # ]:UBC 0 : if (fromdisk)
2210 : : {
2211 [ # # ]: 0 : ereport(WARNING,
2212 : : (errmsg("removing stale two-phase state file for transaction %u of epoch %u",
2213 : : XidFromFullTransactionId(fxid),
2214 : : EpochFromFullTransactionId(fxid))));
251 michael@paquier.xyz 2215 :UNC 0 : RemoveTwoPhaseFile(fxid, true);
2216 : : }
2217 : : else
2218 : : {
440 michael@paquier.xyz 2219 [ # # ]:UBC 0 : ereport(WARNING,
2220 : : (errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
2221 : : XidFromFullTransactionId(fxid),
2222 : : EpochFromFullTransactionId(fxid))));
251 michael@paquier.xyz 2223 :UNC 0 : PrepareRedoRemoveFull(fxid, true);
2224 : : }
440 michael@paquier.xyz 2225 :UBC 0 : return NULL;
2226 : : }
2227 : :
2228 : : /* Reject XID if too new */
251 michael@paquier.xyz 2229 [ - + ]:GNC 124 : if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
2230 : : {
3267 simon@2ndQuadrant.co 2231 [ # # ]:UBC 0 : if (fromdisk)
2232 : : {
2233 [ # # ]: 0 : ereport(WARNING,
2234 : : (errmsg("removing future two-phase state file for transaction %u of epoch %u",
2235 : : XidFromFullTransactionId(fxid),
2236 : : EpochFromFullTransactionId(fxid))));
251 michael@paquier.xyz 2237 :UNC 0 : RemoveTwoPhaseFile(fxid, true);
2238 : : }
2239 : : else
2240 : : {
3267 simon@2ndQuadrant.co 2241 [ # # ]:UBC 0 : ereport(WARNING,
2242 : : (errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
2243 : : XidFromFullTransactionId(fxid),
2244 : : EpochFromFullTransactionId(fxid))));
251 michael@paquier.xyz 2245 :UNC 0 : PrepareRedoRemoveFull(fxid, true);
2246 : : }
3267 simon@2ndQuadrant.co 2247 :UBC 0 : return NULL;
2248 : : }
2249 : :
3267 simon@2ndQuadrant.co 2250 [ + + ]:CBC 124 : if (fromdisk)
2251 : : {
2252 : : /* Read and validate file */
251 michael@paquier.xyz 2253 :GNC 50 : buf = ReadTwoPhaseFile(fxid, false);
2254 : : }
2255 : : else
2256 : : {
2257 : : /* Read xlog data */
3267 simon@2ndQuadrant.co 2258 :CBC 74 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2259 : : }
2260 : :
2261 : : /* Deconstruct header */
2262 : 124 : hdr = (TwoPhaseFileHeader *) buf;
251 michael@paquier.xyz 2263 [ - + ]:GNC 124 : if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
2264 : : {
3267 simon@2ndQuadrant.co 2265 [ # # ]:UBC 0 : if (fromdisk)
2746 michael@paquier.xyz 2266 [ # # ]: 0 : ereport(ERROR,
2267 : : (errcode(ERRCODE_DATA_CORRUPTED),
2268 : : errmsg("corrupted two-phase state file for transaction %u of epoch %u",
2269 : : XidFromFullTransactionId(fxid),
2270 : : EpochFromFullTransactionId(fxid))));
2271 : : else
2272 [ # # ]: 0 : ereport(ERROR,
2273 : : (errcode(ERRCODE_DATA_CORRUPTED),
2274 : : errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
2275 : : XidFromFullTransactionId(fxid),
2276 : : EpochFromFullTransactionId(fxid))));
2277 : : }
2278 : :
2279 : : /*
2280 : : * Examine subtransaction XIDs ... they should all follow main XID, and
2281 : : * they may force us to advance nextXid.
2282 : : */
3267 simon@2ndQuadrant.co 2283 :CBC 124 : subxids = (TransactionId *) (buf +
2284 : 124 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2285 : 124 : MAXALIGN(hdr->gidlen));
2286 [ + + ]: 1911 : for (i = 0; i < hdr->nsubxacts; i++)
2287 : : {
2288 : 1787 : TransactionId subxid = subxids[i];
2289 : :
251 michael@paquier.xyz 2290 [ - + ]:GNC 1787 : Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
2291 : :
2292 : : /* update nextXid if needed */
2544 tmunro@postgresql.or 2293 [ + + ]:CBC 1787 : if (setNextXid)
2294 : 823 : AdvanceNextFullTransactionIdPastXid(subxid);
2295 : :
3267 simon@2ndQuadrant.co 2296 [ + + ]: 1787 : if (setParent)
251 michael@paquier.xyz 2297 :GNC 823 : SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
2298 : : }
2299 : :
3267 simon@2ndQuadrant.co 2300 :CBC 124 : return buf;
2301 : : }
2302 : :
2303 : :
2304 : : /*
2305 : : * RecordTransactionCommitPrepared
2306 : : *
2307 : : * This is basically the same as RecordTransactionCommit (q.v. if you change
2308 : : * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
2309 : : * race condition.
2310 : : *
2311 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2312 : : * so it is never possible to optimize out the commit record.
2313 : : */
2314 : : static void
7576 tgl@sss.pgh.pa.us 2315 : 273 : RecordTransactionCommitPrepared(TransactionId xid,
2316 : : int nchildren,
2317 : : TransactionId *children,
2318 : : int nrels,
2319 : : RelFileLocator *rels,
2320 : : int nstats,
2321 : : xl_xact_stats_item *stats,
2322 : : int ninvalmsgs,
2323 : : SharedInvalidationMessage *invalmsgs,
2324 : : bool initfileinval,
2325 : : const char *gid)
2326 : : {
2327 : : XLogRecPtr recptr;
2328 : : TimestampTz committs;
2329 : : bool replorigin;
2330 : :
2331 : : /*
2332 : : * Are we using the replication origins feature? Or, in other words, are
2333 : : * we replaying remote actions?
2334 : : */
46 msawada@postgresql.o 2335 [ + + ]:GNC 296 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2336 [ + - ]: 23 : replorigin_xact_state.origin != DoNotReplicateId);
2337 : :
2338 : : /* Load the injection point before entering the critical section */
188 akapila@postgresql.o 2339 : 273 : INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
2340 : :
7576 tgl@sss.pgh.pa.us 2341 :CBC 273 : START_CRIT_SECTION();
2342 : :
2343 : : /* See notes in RecordTransactionCommit */
235 akapila@postgresql.o 2344 [ - + ]:GNC 273 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
2345 : 273 : MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
2346 : :
188 2347 : 273 : INJECTION_POINT_CACHED("commit-after-delay-checkpoint", NULL);
2348 : :
2349 : : /*
2350 : : * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
2351 : : * commit time is written.
2352 : : */
235 2353 : 273 : pg_write_barrier();
2354 : :
2355 : : /*
2356 : : * Note it is important to set committs value after marking ourselves as
2357 : : * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
2358 : : * we want to ensure all transactions that have acquired commit timestamp
2359 : : * are finished before we allow the logical replication client to advance
2360 : : * its xid which is used to hold back dead rows for conflict detection.
2361 : : * See comments atop worker.c.
2362 : : */
2363 : 273 : committs = GetCurrentTimestamp();
2364 : :
2365 : : /*
2366 : : * Emit the XLOG commit record. Note that we mark 2PC commits as
2367 : : * potentially having AccessExclusiveLocks since we don't know whether or
2368 : : * not they do.
2369 : : */
3820 alvherre@alvh.no-ip. 2370 :CBC 273 : recptr = XactLogCommitRecord(committs,
2371 : : nchildren, children, nrels, rels,
2372 : : nstats, stats,
2373 : : ninvalmsgs, invalmsgs,
2374 : : initfileinval,
3189 tgl@sss.pgh.pa.us 2375 : 273 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2376 : : xid, gid);
2377 : :
2378 : :
3820 alvherre@alvh.no-ip. 2379 [ + + ]: 273 : if (replorigin)
2380 : : /* Move LSNs forward for this replication origin */
46 msawada@postgresql.o 2381 :GNC 23 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2382 : : XactLastRecEnd);
2383 : :
2384 : : /*
2385 : : * Record commit timestamp. The value comes from plain commit timestamp
2386 : : * if replorigin is not enabled, or replorigin already set a value for us
2387 : : * in replorigin_xact_state.origin_timestamp otherwise.
2388 : : *
2389 : : * We don't need to WAL-log anything here, as the commit record written
2390 : : * above already contains the data.
2391 : : */
2392 [ + + - + ]: 273 : if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
2393 : 250 : replorigin_xact_state.origin_timestamp = committs;
2394 : :
3820 alvherre@alvh.no-ip. 2395 :CBC 273 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2396 : : replorigin_xact_state.origin_timestamp,
46 msawada@postgresql.o 2397 :GNC 273 : replorigin_xact_state.origin);
2398 : :
2399 : : /*
2400 : : * We don't currently try to sleep before flush here ... nor is there any
2401 : : * support for async commit of a prepared xact (the very idea is probably
2402 : : * a contradiction)
2403 : : */
2404 : :
2405 : : /* Flush XLOG to disk */
7576 tgl@sss.pgh.pa.us 2406 :CBC 273 : XLogFlush(recptr);
2407 : :
2408 : : /* Mark the transaction committed in pg_xact */
6355 alvherre@alvh.no-ip. 2409 : 273 : TransactionIdCommitTree(xid, nchildren, children);
2410 : :
2411 : : /* Checkpoint can proceed now */
235 akapila@postgresql.o 2412 :GNC 273 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
2413 : :
7576 tgl@sss.pgh.pa.us 2414 [ - + ]:CBC 273 : END_CRIT_SECTION();
2415 : :
2416 : : /*
2417 : : * Wait for synchronous replication, if required.
2418 : : *
2419 : : * Note that at this stage we have marked clog, but still show as running
2420 : : * in the procarray and continue to hold locks.
2421 : : */
3638 rhaas@postgresql.org 2422 : 273 : SyncRepWaitForLSN(recptr, true);
7576 tgl@sss.pgh.pa.us 2423 : 273 : }
2424 : :
2425 : : /*
2426 : : * RecordTransactionAbortPrepared
2427 : : *
2428 : : * This is basically the same as RecordTransactionAbort.
2429 : : *
2430 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2431 : : * so it is never possible to optimize out the abort record.
2432 : : */
2433 : : static void
2434 : 46 : RecordTransactionAbortPrepared(TransactionId xid,
2435 : : int nchildren,
2436 : : TransactionId *children,
2437 : : int nrels,
2438 : : RelFileLocator *rels,
2439 : : int nstats,
2440 : : xl_xact_stats_item *stats,
2441 : : const char *gid)
2442 : : {
2443 : : XLogRecPtr recptr;
2444 : : bool replorigin;
2445 : :
2446 : : /*
2447 : : * Are we using the replication origins feature? Or, in other words, are
2448 : : * we replaying remote actions?
2449 : : */
46 msawada@postgresql.o 2450 [ + + ]:GNC 52 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2451 [ + - ]: 6 : replorigin_xact_state.origin != DoNotReplicateId);
2452 : :
2453 : : /*
2454 : : * Catch the scenario where we aborted partway through
2455 : : * RecordTransactionCommitPrepared ...
2456 : : */
7576 tgl@sss.pgh.pa.us 2457 [ - + ]:CBC 46 : if (TransactionIdDidCommit(xid))
7576 tgl@sss.pgh.pa.us 2458 [ # # ]:UBC 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2459 : : xid);
2460 : :
7576 tgl@sss.pgh.pa.us 2461 :CBC 46 : START_CRIT_SECTION();
2462 : :
2463 : : /*
2464 : : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2465 : : * potentially having AccessExclusiveLocks since we don't know whether or
2466 : : * not they do.
2467 : : */
4018 andres@anarazel.de 2468 : 46 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2469 : : nchildren, children,
2470 : : nrels, rels,
2471 : : nstats, stats,
3189 tgl@sss.pgh.pa.us 2472 : 46 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2473 : : xid, gid);
2474 : :
1833 akapila@postgresql.o 2475 [ + + ]: 46 : if (replorigin)
2476 : : /* Move LSNs forward for this replication origin */
46 msawada@postgresql.o 2477 :GNC 6 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2478 : : XactLastRecEnd);
2479 : :
2480 : : /* Always flush, since we're about to remove the 2PC state file */
7576 tgl@sss.pgh.pa.us 2481 :CBC 46 : XLogFlush(recptr);
2482 : :
2483 : : /*
2484 : : * Mark the transaction aborted in clog. This is not absolutely necessary
2485 : : * but we may as well do it while we are here.
2486 : : */
6355 alvherre@alvh.no-ip. 2487 : 46 : TransactionIdAbortTree(xid, nchildren, children);
2488 : :
7576 tgl@sss.pgh.pa.us 2489 [ - + ]: 46 : END_CRIT_SECTION();
2490 : :
2491 : : /*
2492 : : * Wait for synchronous replication, if required.
2493 : : *
2494 : : * Note that at this stage we have marked clog, but still show as running
2495 : : * in the procarray and continue to hold locks.
2496 : : */
3638 rhaas@postgresql.org 2497 : 46 : SyncRepWaitForLSN(recptr, false);
7576 tgl@sss.pgh.pa.us 2498 : 46 : }
2499 : :
2500 : : /*
2501 : : * PrepareRedoAdd
2502 : : *
2503 : : * Store pointers to the start/end of the WAL record along with the xid in
2504 : : * a gxact entry in shared memory TwoPhaseState structure. If caller
2505 : : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2506 : : * data, the entry is marked as located on disk.
2507 : : */
2508 : : void
251 michael@paquier.xyz 2509 :GNC 97 : PrepareRedoAdd(FullTransactionId fxid, char *buf,
2510 : : XLogRecPtr start_lsn, XLogRecPtr end_lsn,
2511 : : ReplOriginId origin_id)
2512 : : {
3267 simon@2ndQuadrant.co 2513 :CBC 97 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2514 : : char *bufptr;
2515 : : const char *gid;
2516 : : GlobalTransaction gxact;
2517 : :
3196 alvherre@alvh.no-ip. 2518 [ - + ]: 97 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
3267 simon@2ndQuadrant.co 2519 [ - + ]: 97 : Assert(RecoveryInProgress());
2520 : :
251 michael@paquier.xyz 2521 [ + + ]:GNC 97 : if (!FullTransactionIdIsValid(fxid))
2522 : : {
2523 [ - + ]: 81 : Assert(InRecovery);
2524 : 81 : fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
2525 : : hdr->xid);
2526 : : }
2527 : :
3267 simon@2ndQuadrant.co 2528 :CBC 97 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2529 : 97 : gid = (const char *) bufptr;
2530 : :
2531 : : /*
2532 : : * Reserve the GID for the given transaction in the redo code path.
2533 : : *
2534 : : * This creates a gxact struct and puts it into the active array.
2535 : : *
2536 : : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2537 : : * shared memory. Hence, we only fill up the bare minimum contents here.
2538 : : * The gxact also gets marked with gxact->inredo set to true to indicate
2539 : : * that it got added in the redo phase
2540 : : */
2541 : :
2542 : : /*
2543 : : * In the event of a crash while a checkpoint was running, it may be
2544 : : * possible that some two-phase data found its way to disk while its
2545 : : * corresponding record needs to be replayed in the follow-up recovery. As
2546 : : * the 2PC data was on disk, it has already been restored at the beginning
2547 : : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2548 : : * duplicates in TwoPhaseState. If a consistent state has been reached,
2549 : : * the record is added to TwoPhaseState and it should have no
2550 : : * corresponding file in pg_twophase.
2551 : : */
129 alvherre@kurilemu.de 2552 [ + + ]:GNC 97 : if (XLogRecPtrIsValid(start_lsn))
2553 : : {
2554 : : char path[MAXPGPATH];
2555 : :
251 michael@paquier.xyz 2556 [ - + ]: 81 : Assert(InRecovery);
2557 : 81 : TwoPhaseFilePath(path, fxid);
2558 : :
971 michael@paquier.xyz 2559 [ - + ]:CBC 81 : if (access(path, F_OK) == 0)
2560 : : {
971 michael@paquier.xyz 2561 [ # # # # ]:UBC 0 : ereport(reachedConsistency ? ERROR : WARNING,
2562 : : (errmsg("could not recover two-phase state file for transaction %u",
2563 : : hdr->xid),
2564 : : errdetail("Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2565 : : LSN_FORMAT_ARGS(start_lsn))));
2566 : 0 : return;
2567 : : }
2568 : :
971 michael@paquier.xyz 2569 [ - + ]:CBC 81 : if (errno != ENOENT)
971 michael@paquier.xyz 2570 [ # # ]:UBC 0 : ereport(ERROR,
2571 : : (errcode_for_file_access(),
2572 : : errmsg("could not access file \"%s\": %m", path)));
2573 : : }
2574 : :
2575 : : /* Get a free gxact from the freelist */
3267 simon@2ndQuadrant.co 2576 [ - + ]:CBC 97 : if (TwoPhaseState->freeGXacts == NULL)
3267 simon@2ndQuadrant.co 2577 [ # # ]:UBC 0 : ereport(ERROR,
2578 : : (errcode(ERRCODE_OUT_OF_MEMORY),
2579 : : errmsg("maximum number of prepared transactions reached"),
2580 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2581 : : max_prepared_xacts)));
3267 simon@2ndQuadrant.co 2582 :CBC 97 : gxact = TwoPhaseState->freeGXacts;
2583 : 97 : TwoPhaseState->freeGXacts = gxact->next;
2584 : :
2585 : 97 : gxact->prepared_at = hdr->prepared_at;
2586 : 97 : gxact->prepare_start_lsn = start_lsn;
2587 : 97 : gxact->prepare_end_lsn = end_lsn;
251 michael@paquier.xyz 2588 :GNC 97 : gxact->fxid = fxid;
3267 simon@2ndQuadrant.co 2589 :CBC 97 : gxact->owner = hdr->owner;
742 heikki.linnakangas@i 2590 : 97 : gxact->locking_backend = INVALID_PROC_NUMBER;
3267 simon@2ndQuadrant.co 2591 : 97 : gxact->valid = false;
129 alvherre@kurilemu.de 2592 :GNC 97 : gxact->ondisk = !XLogRecPtrIsValid(start_lsn);
3224 bruce@momjian.us 2593 :CBC 97 : gxact->inredo = true; /* yes, added in redo */
3267 simon@2ndQuadrant.co 2594 : 97 : strcpy(gxact->gid, gid);
2595 : :
2596 : : /* And insert it into the active array */
2597 [ - + ]: 97 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2598 : 97 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2599 : :
46 msawada@postgresql.o 2600 [ + + ]:GNC 97 : if (origin_id != InvalidReplOriginId)
2601 : : {
2602 : : /* recover apply progress */
2909 simon@2ndQuadrant.co 2603 :CBC 13 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2604 : : false /* backward */ , false /* WAL */ );
2605 : : }
2606 : :
251 michael@paquier.xyz 2607 [ - + ]:GNC 97 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
2608 : : XidFromFullTransactionId(gxact->fxid),
2609 : : EpochFromFullTransactionId(gxact->fxid));
2610 : : }
2611 : :
2612 : : /*
2613 : : * PrepareRedoRemoveFull
2614 : : *
2615 : : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2616 : : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2617 : : *
2618 : : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2619 : : * is updated.
2620 : : */
2621 : : static void
2622 : 71 : PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
2623 : : {
3267 simon@2ndQuadrant.co 2624 :CBC 71 : GlobalTransaction gxact = NULL;
2625 : : int i;
3253 2626 : 71 : bool found = false;
2627 : :
3196 alvherre@alvh.no-ip. 2628 [ - + ]: 71 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
3267 simon@2ndQuadrant.co 2629 [ - + ]: 71 : Assert(RecoveryInProgress());
2630 : :
2631 [ + + ]: 71 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2632 : : {
2633 : 62 : gxact = TwoPhaseState->prepXacts[i];
2634 : :
251 michael@paquier.xyz 2635 [ + - ]:GNC 62 : if (FullTransactionIdEquals(gxact->fxid, fxid))
2636 : : {
3267 simon@2ndQuadrant.co 2637 [ - + ]:CBC 62 : Assert(gxact->inredo);
3253 2638 : 62 : found = true;
3267 2639 : 62 : break;
2640 : : }
2641 : : }
2642 : :
2643 : : /*
2644 : : * Just leave if there is nothing, this is expected during WAL replay.
2645 : : */
3253 2646 [ + + ]: 71 : if (!found)
3267 2647 : 9 : return;
2648 : :
2649 : : /*
2650 : : * And now we can clean up any files we may have left.
2651 : : */
251 michael@paquier.xyz 2652 [ - + ]:GNC 62 : elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
2653 : : XidFromFullTransactionId(fxid),
2654 : : EpochFromFullTransactionId(fxid));
2655 : :
3267 simon@2ndQuadrant.co 2656 [ + + ]:CBC 62 : if (gxact->ondisk)
251 michael@paquier.xyz 2657 :GNC 5 : RemoveTwoPhaseFile(fxid, giveWarning);
2658 : :
3267 simon@2ndQuadrant.co 2659 :CBC 62 : RemoveGXact(gxact);
2660 : : }
2661 : :
2662 : : /*
2663 : : * Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
2664 : : */
2665 : : void
251 michael@paquier.xyz 2666 :GNC 71 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2667 : : {
2668 : : FullTransactionId fxid =
2669 : 71 : FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
2670 : :
2671 : 71 : PrepareRedoRemoveFull(fxid, giveWarning);
2672 : 71 : }
2673 : :
2674 : : /*
2675 : : * LookupGXact
2676 : : * Check if the prepared transaction with the given GID, lsn and timestamp
2677 : : * exists.
2678 : : *
2679 : : * Note that we always compare with the LSN where prepare ends because that is
2680 : : * what is stored as origin_lsn in the 2PC file.
2681 : : *
2682 : : * This function is primarily used to check if the prepared transaction
2683 : : * received from the upstream (remote node) already exists. Checking only GID
2684 : : * is not sufficient because a different prepared xact with the same GID can
2685 : : * exist on the same node. So, we are ensuring to match origin_lsn and
2686 : : * origin_timestamp of prepared xact to avoid the possibility of a match of
2687 : : * prepared xact from two different nodes.
2688 : : */
2689 : : bool
1705 akapila@postgresql.o 2690 :CBC 5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2691 : : TimestampTz origin_prepare_timestamp)
2692 : : {
2693 : : int i;
2694 : 5 : bool found = false;
2695 : :
2696 : 5 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2697 [ + - ]: 5 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2698 : : {
2699 : 5 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2700 : :
2701 : : /* Ignore not-yet-valid GIDs. */
2702 [ + - + - ]: 5 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2703 : : {
2704 : : char *buf;
2705 : : TwoPhaseFileHeader *hdr;
2706 : :
2707 : : /*
2708 : : * We are not expecting collisions of GXACTs (same gid) between
2709 : : * publisher and subscribers, so we perform all I/O while holding
2710 : : * TwoPhaseStateLock for simplicity.
2711 : : *
2712 : : * To move the I/O out of the lock, we need to ensure that no
2713 : : * other backend commits the prepared xact in the meantime. We can
2714 : : * do this optimization if we encounter many collisions in GID
2715 : : * between publisher and subscriber.
2716 : : */
2717 [ - + ]: 5 : if (gxact->ondisk)
251 michael@paquier.xyz 2718 :UNC 0 : buf = ReadTwoPhaseFile(gxact->fxid, false);
2719 : : else
2720 : : {
1705 akapila@postgresql.o 2721 [ - + ]:CBC 5 : Assert(gxact->prepare_start_lsn);
2722 : 5 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2723 : : }
2724 : :
2725 : 5 : hdr = (TwoPhaseFileHeader *) buf;
2726 : :
2727 [ + - ]: 5 : if (hdr->origin_lsn == prepare_end_lsn &&
2728 [ + - ]: 5 : hdr->origin_timestamp == origin_prepare_timestamp)
2729 : : {
2730 : 5 : found = true;
2731 : 5 : pfree(buf);
2732 : 5 : break;
2733 : : }
2734 : :
1705 akapila@postgresql.o 2735 :UBC 0 : pfree(buf);
2736 : : }
2737 : : }
1705 akapila@postgresql.o 2738 :CBC 5 : LWLockRelease(TwoPhaseStateLock);
2739 : 5 : return found;
2740 : : }
2741 : :
2742 : : /*
2743 : : * TwoPhaseTransactionGid
2744 : : * Form the prepared transaction GID for two_phase transactions.
2745 : : *
2746 : : * Return the GID in the supplied buffer.
2747 : : */
2748 : : void
599 2749 : 53 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2750 : : {
2751 [ - + ]: 53 : Assert(OidIsValid(subid));
2752 : :
2753 [ - + ]: 53 : if (!TransactionIdIsValid(xid))
599 akapila@postgresql.o 2754 [ # # ]:UBC 0 : ereport(ERROR,
2755 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2756 : : errmsg_internal("invalid two-phase transaction ID")));
2757 : :
599 akapila@postgresql.o 2758 :CBC 53 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2759 : 53 : }
2760 : :
2761 : : /*
2762 : : * IsTwoPhaseTransactionGidForSubid
2763 : : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2764 : : * for the specified 'subid'.
2765 : : */
2766 : : static bool
599 akapila@postgresql.o 2767 :UBC 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2768 : : {
2769 : : int ret;
2770 : : Oid subid_from_gid;
2771 : : TransactionId xid_from_gid;
2772 : : char gid_tmp[GIDSIZE];
2773 : :
2774 : : /* Extract the subid and xid from the given GID */
2775 : 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2776 : :
2777 : : /*
2778 : : * Check that the given GID has expected format, and at least the subid
2779 : : * matches.
2780 : : */
2781 [ # # # # ]: 0 : if (ret != 2 || subid != subid_from_gid)
2782 : 0 : return false;
2783 : :
2784 : : /*
2785 : : * Reconstruct a temporary GID based on the subid and xid extracted from
2786 : : * the given GID and check whether the temporary GID and the given GID
2787 : : * match.
2788 : : */
2789 : 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2790 : :
2791 : 0 : return strcmp(gid, gid_tmp) == 0;
2792 : : }
2793 : :
2794 : : /*
2795 : : * LookupGXactBySubid
2796 : : * Check if the prepared transaction done by apply worker exists.
2797 : : */
2798 : : bool
599 akapila@postgresql.o 2799 :CBC 1 : LookupGXactBySubid(Oid subid)
2800 : : {
2801 : 1 : bool found = false;
2802 : :
2803 : 1 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2804 [ - + ]: 1 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2805 : : {
599 akapila@postgresql.o 2806 :UBC 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2807 : :
2808 : : /* Ignore not-yet-valid GIDs. */
2809 [ # # # # ]: 0 : if (gxact->valid &&
2810 : 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2811 : : {
2812 : 0 : found = true;
2813 : 0 : break;
2814 : : }
2815 : : }
599 akapila@postgresql.o 2816 :CBC 1 : LWLockRelease(TwoPhaseStateLock);
2817 : :
2818 : 1 : return found;
2819 : : }
2820 : :
2821 : : /*
2822 : : * TwoPhaseGetOldestXidInCommit
2823 : : * Return the oldest transaction ID from prepared transactions that are
2824 : : * currently in the commit critical section.
2825 : : *
2826 : : * This function only considers transactions in the currently connected
2827 : : * database. If no matching transactions are found, it returns
2828 : : * InvalidTransactionId.
2829 : : */
2830 : : TransactionId
188 akapila@postgresql.o 2831 :GNC 2699 : TwoPhaseGetOldestXidInCommit(void)
2832 : : {
2833 : 2699 : TransactionId oldestRunningXid = InvalidTransactionId;
2834 : :
2835 : 2699 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2836 : :
2837 [ + + ]: 5249 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2838 : : {
2839 : 2550 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2840 : : PGPROC *commitproc;
2841 : : TransactionId xid;
2842 : :
2843 [ - + ]: 2550 : if (!gxact->valid)
188 akapila@postgresql.o 2844 :UNC 0 : continue;
2845 : :
188 akapila@postgresql.o 2846 [ - + ]:GNC 2550 : if (gxact->locking_backend == INVALID_PROC_NUMBER)
188 akapila@postgresql.o 2847 :UNC 0 : continue;
2848 : :
2849 : : /*
2850 : : * Get the backend that is handling the transaction. It's safe to
2851 : : * access this backend while holding TwoPhaseStateLock, as the backend
2852 : : * can only be destroyed after either removing or unlocking the
2853 : : * current global transaction, both of which require an exclusive
2854 : : * TwoPhaseStateLock.
2855 : : */
188 akapila@postgresql.o 2856 :GNC 2550 : commitproc = GetPGProcByNumber(gxact->locking_backend);
2857 : :
2858 [ - + ]: 2550 : if (MyDatabaseId != commitproc->databaseId)
188 akapila@postgresql.o 2859 :UNC 0 : continue;
2860 : :
188 akapila@postgresql.o 2861 [ - + ]:GNC 2550 : if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
188 akapila@postgresql.o 2862 :UNC 0 : continue;
2863 : :
188 akapila@postgresql.o 2864 :GNC 2550 : xid = XidFromFullTransactionId(gxact->fxid);
2865 : :
2866 [ - + - - ]: 2550 : if (!TransactionIdIsValid(oldestRunningXid) ||
188 akapila@postgresql.o 2867 :UNC 0 : TransactionIdPrecedes(xid, oldestRunningXid))
188 akapila@postgresql.o 2868 :GNC 2550 : oldestRunningXid = xid;
2869 : : }
2870 : :
2871 : 2699 : LWLockRelease(TwoPhaseStateLock);
2872 : :
2873 : 2699 : return oldestRunningXid;
2874 : : }
|