Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * twophase.c
4 : : * Two-phase commit support functions.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/twophase.c
11 : : *
12 : : * NOTES
13 : : * Each global transaction is associated with a global transaction
14 : : * identifier (GID). The client assigns a GID to a postgres
15 : : * transaction with the PREPARE TRANSACTION command.
16 : : *
17 : : * We keep all active global transactions in a shared memory array.
18 : : * When the PREPARE TRANSACTION command is issued, the GID is
19 : : * reserved for the transaction in the array. This is done before
20 : : * a WAL entry is made, because the reservation checks for duplicate
21 : : * GIDs and aborts the transaction if there already is a global
22 : : * transaction in prepared state with the same GID.
23 : : *
24 : : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : : * the XID considered running by TransactionIdIsInProgress. It is also
26 : : * convenient as a PGPROC to hook the gxact's locks to.
27 : : *
28 : : * Information to recover prepared transactions in case of crash is
29 : : * now stored in WAL for the common case. In some cases there will be
30 : : * an extended period between preparing a GXACT and commit/abort, in
31 : : * which case we need to separately record prepared transaction data
32 : : * in permanent storage. This includes locking information, pending
33 : : * notifications etc. All that state information is written to the
34 : : * per-transaction state file in the pg_twophase directory.
35 : : * All prepared transactions will be written prior to shutdown.
36 : : *
37 : : * Life track of state data is following:
38 : : *
39 : : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : : * stores pointer to the start of the WAL record in
41 : : * gxact->prepare_start_lsn.
42 : : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : : * using prepare_start_lsn.
44 : : * * On checkpoint state data copied to files in pg_twophase directory and
45 : : * fsynced
46 : : * * If COMMIT happens after checkpoint then backend reads state data from
47 : : * files
48 : : *
49 : : * During replay and replication, TwoPhaseState also holds information
50 : : * about active prepared transactions that haven't been moved to disk yet.
51 : : *
52 : : * Replay of twophase records happens by the following rules:
53 : : *
54 : : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : : * TwoPhaseState with entries marked with gxact->inredo and
56 : : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : : * the redo position are discarded.
58 : : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : : * gxact->inredo is set to true for such entries.
60 : : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : : * that have gxact->inredo set and are behind the redo_horizon. We
62 : : * save them to disk and then switch gxact->ondisk to true.
63 : : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : : * If gxact->ondisk is true, the corresponding entry from the disk
65 : : * is additionally deleted.
66 : : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : : * and PrescanPreparedTransactions() have been modified to go through
68 : : * gxact->inredo entries that have not made it to disk.
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include <fcntl.h>
75 : : #include <sys/stat.h>
76 : : #include <time.h>
77 : : #include <unistd.h>
78 : :
79 : : #include "access/commit_ts.h"
80 : : #include "access/htup_details.h"
81 : : #include "access/subtrans.h"
82 : : #include "access/transam.h"
83 : : #include "access/twophase.h"
84 : : #include "access/twophase_rmgr.h"
85 : : #include "access/xact.h"
86 : : #include "access/xlog.h"
87 : : #include "access/xloginsert.h"
88 : : #include "access/xlogreader.h"
89 : : #include "access/xlogrecovery.h"
90 : : #include "access/xlogutils.h"
91 : : #include "catalog/pg_type.h"
92 : : #include "catalog/storage.h"
93 : : #include "funcapi.h"
94 : : #include "miscadmin.h"
95 : : #include "pg_trace.h"
96 : : #include "pgstat.h"
97 : : #include "replication/origin.h"
98 : : #include "replication/syncrep.h"
99 : : #include "storage/fd.h"
100 : : #include "storage/ipc.h"
101 : : #include "storage/md.h"
102 : : #include "storage/predicate.h"
103 : : #include "storage/proc.h"
104 : : #include "storage/procarray.h"
105 : : #include "storage/subsystems.h"
106 : : #include "utils/builtins.h"
107 : : #include "utils/injection_point.h"
108 : : #include "utils/memutils.h"
109 : : #include "utils/timestamp.h"
110 : : #include "utils/wait_event.h"
111 : :
112 : : /*
113 : : * Directory where Two-phase commit files reside within PGDATA
114 : : */
115 : : #define TWOPHASE_DIR "pg_twophase"
116 : :
117 : : /* GUC variable, can't be changed after startup */
118 : : int max_prepared_xacts = 0;
119 : :
120 : : /*
121 : : * This struct describes one global transaction that is in prepared state
122 : : * or attempting to become prepared.
123 : : *
124 : : * The lifecycle of a global transaction is:
125 : : *
126 : : * 1. After checking that the requested GID is not in use, set up an entry in
127 : : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
128 : : * and mark it as locked by my backend.
129 : : *
130 : : * 2. After successfully completing prepare, set valid = true and enter the
131 : : * referenced PGPROC into the global ProcArray.
132 : : *
133 : : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
134 : : * valid and not locked, then mark the entry as locked by storing my current
135 : : * proc number into locking_backend. This prevents concurrent attempts to
136 : : * commit or rollback the same prepared xact.
137 : : *
138 : : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
139 : : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
140 : : * the freelist.
141 : : *
142 : : * Note that if the preparing transaction fails between steps 1 and 2, the
143 : : * entry must be removed so that the GID and the GlobalTransaction struct
144 : : * can be reused. See AtAbort_Twophase().
145 : : *
146 : : * typedef struct GlobalTransactionData *GlobalTransaction appears in
147 : : * twophase.h
148 : : */
149 : :
150 : : typedef struct GlobalTransactionData
151 : : {
152 : : GlobalTransaction next; /* list link for free list */
153 : : int pgprocno; /* ID of associated dummy PGPROC */
154 : : TimestampTz prepared_at; /* time of preparation */
155 : :
156 : : /*
157 : : * Note that we need to keep track of two LSNs for each GXACT. We keep
158 : : * track of the start LSN because this is the address we must use to read
159 : : * state data back from WAL when committing a prepared GXACT. We keep
160 : : * track of the end LSN because that is the LSN we need to wait for prior
161 : : * to commit.
162 : : */
163 : : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
164 : : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
165 : : FullTransactionId fxid; /* The GXACT full xid */
166 : :
167 : : Oid owner; /* ID of user that executed the xact */
168 : : ProcNumber locking_backend; /* backend currently working on the xact */
169 : : bool valid; /* true if PGPROC entry is in proc array */
170 : : bool ondisk; /* true if prepare state file is on disk */
171 : : bool inredo; /* true if entry was added via xlog_redo */
172 : : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173 : : } GlobalTransactionData;
174 : :
175 : : /*
176 : : * Two Phase Commit shared state. Access to this struct is protected
177 : : * by TwoPhaseStateLock.
178 : : */
179 : : typedef struct TwoPhaseStateData
180 : : {
181 : : /* Head of linked list of free GlobalTransactionData structs */
182 : : GlobalTransaction freeGXacts;
183 : :
184 : : /* Number of valid prepXacts entries. */
185 : : int numPrepXacts;
186 : :
187 : : /* There are max_prepared_xacts items in this array */
188 : : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 : : } TwoPhaseStateData;
190 : :
191 : : static TwoPhaseStateData *TwoPhaseState;
192 : :
193 : : static void TwoPhaseShmemRequest(void *arg);
194 : : static void TwoPhaseShmemInit(void *arg);
195 : :
196 : : const ShmemCallbacks TwoPhaseShmemCallbacks = {
197 : : .request_fn = TwoPhaseShmemRequest,
198 : : .init_fn = TwoPhaseShmemInit,
199 : : };
200 : :
201 : : /*
202 : : * Global transaction entry currently locked by us, if any. Note that any
203 : : * access to the entry pointed to by this variable must be protected by
204 : : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
205 : : * (since it's just local memory).
206 : : */
207 : : static GlobalTransaction MyLockedGxact = NULL;
208 : :
209 : : static bool twophaseExitRegistered = false;
210 : :
211 : : static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
212 : : static void RecordTransactionCommitPrepared(TransactionId xid,
213 : : int nchildren,
214 : : TransactionId *children,
215 : : int nrels,
216 : : RelFileLocator *rels,
217 : : int nstats,
218 : : xl_xact_stats_item *stats,
219 : : int ninvalmsgs,
220 : : SharedInvalidationMessage *invalmsgs,
221 : : bool initfileinval,
222 : : const char *gid);
223 : : static void RecordTransactionAbortPrepared(TransactionId xid,
224 : : int nchildren,
225 : : TransactionId *children,
226 : : int nrels,
227 : : RelFileLocator *rels,
228 : : int nstats,
229 : : xl_xact_stats_item *stats,
230 : : const char *gid);
231 : : static void ProcessRecords(char *bufptr, FullTransactionId fxid,
232 : : const TwoPhaseCallback callbacks[]);
233 : : static void RemoveGXact(GlobalTransaction gxact);
234 : :
235 : : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
236 : : static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
237 : : XLogRecPtr prepare_start_lsn,
238 : : bool fromdisk, bool setParent, bool setNextXid);
239 : : static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
240 : : const char *gid, TimestampTz prepared_at, Oid owner,
241 : : Oid databaseid);
242 : : static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
243 : : static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
244 : :
245 : : /*
246 : : * Register shared memory for two-phase state.
247 : : */
248 : : static void
29 heikki.linnakangas@i 249 :GNC 1244 : TwoPhaseShmemRequest(void *arg)
250 : : {
251 : : Size size;
252 : :
253 : : /* Need the fixed struct, the array of pointers, and the GTD structs */
7563 tgl@sss.pgh.pa.us 254 :CBC 1244 : size = offsetof(TwoPhaseStateData, prepXacts);
255 : 1244 : size = add_size(size, mul_size(max_prepared_xacts,
256 : : sizeof(GlobalTransaction)));
257 : 1244 : size = MAXALIGN(size);
258 : 1244 : size = add_size(size, mul_size(max_prepared_xacts,
259 : : sizeof(GlobalTransactionData)));
29 heikki.linnakangas@i 260 :GNC 1244 : ShmemRequestStruct(.name = "Prepared Transaction Table",
261 : : .size = size,
262 : : .ptr = (void **) &TwoPhaseState,
263 : : );
7627 tgl@sss.pgh.pa.us 264 :GIC 1244 : }
265 : :
266 : : /*
267 : : * Initialize shared memory for two-phase state.
268 : : */
269 : : static void
29 heikki.linnakangas@i 270 :GNC 1241 : TwoPhaseShmemInit(void *arg)
271 : : {
272 : : GlobalTransaction gxacts;
273 : : int i;
274 : :
275 : 1241 : TwoPhaseState->freeGXacts = NULL;
276 : 1241 : TwoPhaseState->numPrepXacts = 0;
277 : :
278 : : /*
279 : : * Initialize the linked list of free GlobalTransactionData structs
280 : : */
281 : 1241 : gxacts = (GlobalTransaction)
282 : 1241 : ((char *) TwoPhaseState +
283 : 1241 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
284 : : sizeof(GlobalTransaction) * max_prepared_xacts));
285 [ + + ]: 2113 : for (i = 0; i < max_prepared_xacts; i++)
286 : : {
287 : : /* insert into linked list */
288 : 872 : gxacts[i].next = TwoPhaseState->freeGXacts;
289 : 872 : TwoPhaseState->freeGXacts = &gxacts[i];
290 : :
291 : : /* associate it with a PGPROC assigned by ProcGlobalShmemInit */
292 : 872 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
293 : : }
7627 tgl@sss.pgh.pa.us 294 :CBC 1241 : }
295 : :
296 : : /*
297 : : * Exit hook to unlock the global transaction entry we're working on.
298 : : */
299 : : static void
4373 heikki.linnakangas@i 300 : 142 : AtProcExit_Twophase(int code, Datum arg)
301 : : {
302 : : /* same logic as abort */
303 : 142 : AtAbort_Twophase();
304 : 142 : }
305 : :
306 : : /*
307 : : * Abort hook to unlock the global transaction entry we're working on.
308 : : */
309 : : void
310 : 35433 : AtAbort_Twophase(void)
311 : : {
312 [ + + ]: 35433 : if (MyLockedGxact == NULL)
313 : 35431 : return;
314 : :
315 : : /*
316 : : * What to do with the locked global transaction entry? If we were in the
317 : : * process of preparing the transaction, but haven't written the WAL
318 : : * record and state file yet, the transaction must not be considered as
319 : : * prepared. Likewise, if we are in the process of finishing an
320 : : * already-prepared transaction, and fail after having already written the
321 : : * 2nd phase commit or rollback record to the WAL, the transaction should
322 : : * not be considered as prepared anymore. In those cases, just remove the
323 : : * entry from shared memory.
324 : : *
325 : : * Otherwise, the entry must be left in place so that the transaction can
326 : : * be finished later, so just unlock it.
327 : : *
328 : : * If we abort during prepare, after having written the WAL record, we
329 : : * might not have transferred all locks and other state to the prepared
330 : : * transaction yet. Likewise, if we abort during commit or rollback,
331 : : * after having written the WAL record, we might not have released all the
332 : : * resources held by the transaction yet. In those cases, the in-memory
333 : : * state can be wrong, but it's too late to back out.
334 : : */
3247 alvherre@alvh.no-ip. 335 : 2 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
4373 heikki.linnakangas@i 336 [ + - ]: 2 : if (!MyLockedGxact->valid)
337 : 2 : RemoveGXact(MyLockedGxact);
338 : : else
793 heikki.linnakangas@i 339 :UBC 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
3247 alvherre@alvh.no-ip. 340 :CBC 2 : LWLockRelease(TwoPhaseStateLock);
341 : :
4373 heikki.linnakangas@i 342 : 2 : MyLockedGxact = NULL;
343 : : }
344 : :
345 : : /*
346 : : * This is called after we have finished transferring state to the prepared
347 : : * PGPROC entry.
348 : : */
349 : : void
3916 andres@anarazel.de 350 : 328 : PostPrepare_Twophase(void)
351 : : {
4373 heikki.linnakangas@i 352 : 328 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
793 353 : 328 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
4373 354 : 328 : LWLockRelease(TwoPhaseStateLock);
355 : :
356 : 328 : MyLockedGxact = NULL;
357 : 328 : }
358 : :
359 : :
360 : : /*
361 : : * MarkAsPreparing
362 : : * Reserve the GID for the given transaction.
363 : : */
364 : : GlobalTransaction
302 michael@paquier.xyz 365 :GNC 315 : MarkAsPreparing(FullTransactionId fxid, const char *gid,
366 : : TimestampTz prepared_at, Oid owner, Oid databaseid)
367 : : {
368 : : GlobalTransaction gxact;
369 : : int i;
370 : :
7627 tgl@sss.pgh.pa.us 371 [ - + ]:CBC 315 : if (strlen(gid) >= GIDSIZE)
7627 tgl@sss.pgh.pa.us 372 [ # # ]:UBC 0 : ereport(ERROR,
373 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
374 : : errmsg("transaction identifier \"%s\" is too long",
375 : : gid)));
376 : :
377 : : /* fail immediately if feature is disabled */
6221 tgl@sss.pgh.pa.us 378 [ + + ]:CBC 315 : if (max_prepared_xacts == 0)
379 [ + - ]: 16 : ereport(ERROR,
380 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
381 : : errmsg("prepared transactions are disabled"),
382 : : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
383 : :
384 : : /* on first call, register the exit hook */
4373 heikki.linnakangas@i 385 [ + + ]: 299 : if (!twophaseExitRegistered)
386 : : {
387 : 78 : before_shmem_exit(AtProcExit_Twophase, 0);
388 : 78 : twophaseExitRegistered = true;
389 : : }
390 : :
391 : 299 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
392 : :
393 : : /* Check for conflicting GID */
7627 tgl@sss.pgh.pa.us 394 [ + + ]: 528 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
395 : : {
396 : 231 : gxact = TwoPhaseState->prepXacts[i];
397 [ + + ]: 231 : if (strcmp(gxact->gid, gid) == 0)
398 : : {
399 [ + - ]: 2 : ereport(ERROR,
400 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
401 : : errmsg("transaction identifier \"%s\" is already in use",
402 : : gid)));
403 : : }
404 : : }
405 : :
406 : : /* Get a free gxact from the freelist */
6393 407 [ - + ]: 297 : if (TwoPhaseState->freeGXacts == NULL)
7627 tgl@sss.pgh.pa.us 408 [ # # ]:UBC 0 : ereport(ERROR,
409 : : (errcode(ERRCODE_OUT_OF_MEMORY),
410 : : errmsg("maximum number of prepared transactions reached"),
411 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
412 : : max_prepared_xacts)));
6393 tgl@sss.pgh.pa.us 413 :CBC 297 : gxact = TwoPhaseState->freeGXacts;
5018 414 : 297 : TwoPhaseState->freeGXacts = gxact->next;
415 : :
302 michael@paquier.xyz 416 :GNC 297 : MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
417 : :
3318 simon@2ndQuadrant.co 418 :CBC 297 : gxact->ondisk = false;
419 : :
420 : : /* And insert it into the active array */
421 [ - + ]: 297 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
422 : 297 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
423 : :
424 : 297 : LWLockRelease(TwoPhaseStateLock);
425 : :
426 : 297 : return gxact;
427 : : }
428 : :
429 : : /*
430 : : * MarkAsPreparingGuts
431 : : *
432 : : * This uses a gxact struct and puts it into the active array.
433 : : * NOTE: this is also used when reloading a gxact after a crash; so avoid
434 : : * assuming that we can use very much backend context.
435 : : *
436 : : * Note: This function should be called with appropriate locks held.
437 : : */
438 : : static void
302 michael@paquier.xyz 439 :GNC 330 : MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
440 : : const char *gid, TimestampTz prepared_at, Oid owner,
441 : : Oid databaseid)
442 : : {
443 : : PGPROC *proc;
444 : : int i;
445 : 330 : TransactionId xid = XidFromFullTransactionId(fxid);
446 : :
3247 alvherre@alvh.no-ip. 447 [ - + ]:CBC 330 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
448 : :
3318 simon@2ndQuadrant.co 449 [ - + ]: 330 : Assert(gxact != NULL);
793 heikki.linnakangas@i 450 : 330 : proc = GetPGProcByNumber(gxact->pgprocno);
451 : :
452 : : /* Initialize the PGPROC entry */
5275 rhaas@postgresql.org 453 [ + - + - : 37290 : MemSet(proc, 0, sizeof(PGPROC));
+ - + - +
+ ]
2148 peter@eisentraut.org 454 : 330 : proc->waitStatus = PROC_WAIT_STATUS_OK;
793 heikki.linnakangas@i 455 [ + + ]: 330 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
456 : : {
457 : : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
458 : 297 : proc->vxid.lxid = MyProc->vxid.lxid;
459 : 297 : proc->vxid.procNumber = MyProcNumber;
460 : : }
461 : : else
462 : : {
1655 noah@leadboat.com 463 [ - + - - ]: 33 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
464 : : /* GetLockConflicts() uses this to specify a wait on the XID */
793 heikki.linnakangas@i 465 : 33 : proc->vxid.lxid = xid;
466 : 33 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
467 : : }
2090 andres@anarazel.de 468 : 330 : proc->xid = xid;
2091 469 [ - + ]: 330 : Assert(proc->xmin == InvalidTransactionId);
1488 rhaas@postgresql.org 470 : 330 : proc->delayChkptFlags = 0;
1996 alvherre@alvh.no-ip. 471 : 330 : proc->statusFlags = 0;
5275 rhaas@postgresql.org 472 : 330 : proc->pid = 0;
473 : 330 : proc->databaseId = databaseid;
474 : 330 : proc->roleId = owner;
2822 michael@paquier.xyz 475 : 330 : proc->tempNamespaceId = InvalidOid;
90 heikki.linnakangas@i 476 :GNC 330 : proc->backendType = B_INVALID;
1262 andres@anarazel.de 477 :CBC 330 : proc->lwWaiting = LW_WS_NOT_WAITING;
5209 heikki.linnakangas@i 478 : 330 : proc->lwWaitMode = 0;
5275 rhaas@postgresql.org 479 : 330 : proc->waitLock = NULL;
74 heikki.linnakangas@i 480 :GNC 330 : dlist_node_init(&proc->waitLink);
5275 rhaas@postgresql.org 481 :CBC 330 : proc->waitProcLock = NULL;
1898 fujii@postgresql.org 482 : 330 : pg_atomic_init_u64(&proc->waitStart, 0);
7450 tgl@sss.pgh.pa.us 483 [ + + ]: 5610 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
1203 andres@anarazel.de 484 : 5280 : dlist_init(&proc->myProcLocks[i]);
485 : : /* subxid data must be filled later by GXactLoadSubxactData */
2090 486 : 330 : proc->subxidStatus.overflowed = false;
487 : 330 : proc->subxidStatus.count = 0;
488 : :
7626 tgl@sss.pgh.pa.us 489 : 330 : gxact->prepared_at = prepared_at;
302 michael@paquier.xyz 490 :GNC 330 : gxact->fxid = fxid;
7627 tgl@sss.pgh.pa.us 491 :CBC 330 : gxact->owner = owner;
793 heikki.linnakangas@i 492 : 330 : gxact->locking_backend = MyProcNumber;
7627 tgl@sss.pgh.pa.us 493 : 330 : gxact->valid = false;
3318 simon@2ndQuadrant.co 494 : 330 : gxact->inredo = false;
7627 tgl@sss.pgh.pa.us 495 : 330 : strcpy(gxact->gid, gid);
496 : :
497 : : /*
498 : : * Remember that we have this GlobalTransaction entry locked for us. If we
499 : : * abort after this, we must release it.
500 : : */
4373 heikki.linnakangas@i 501 : 330 : MyLockedGxact = gxact;
7627 tgl@sss.pgh.pa.us 502 : 330 : }
503 : :
504 : : /*
505 : : * GXactLoadSubxactData
506 : : *
507 : : * If the transaction being persisted had any subtransactions, this must
508 : : * be called before MarkAsPrepared() to load information into the dummy
509 : : * PGPROC.
510 : : */
511 : : static void
512 : 132 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
513 : : TransactionId *children)
514 : : {
793 heikki.linnakangas@i 515 : 132 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
516 : :
517 : : /* We need no extra lock since the GXACT isn't valid yet */
7627 tgl@sss.pgh.pa.us 518 [ + + ]: 132 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
519 : : {
2090 andres@anarazel.de 520 : 4 : proc->subxidStatus.overflowed = true;
7627 tgl@sss.pgh.pa.us 521 : 4 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
522 : : }
523 [ + + ]: 132 : if (nsubxacts > 0)
524 : : {
5275 rhaas@postgresql.org 525 : 115 : memcpy(proc->subxids.xids, children,
526 : : nsubxacts * sizeof(TransactionId));
2090 andres@anarazel.de 527 : 115 : proc->subxidStatus.count = nsubxacts;
528 : : }
7627 tgl@sss.pgh.pa.us 529 : 132 : }
530 : :
531 : : /*
532 : : * MarkAsPrepared
533 : : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
534 : : *
535 : : * lock_held indicates whether caller already holds TwoPhaseStateLock.
536 : : */
537 : : static void
3247 alvherre@alvh.no-ip. 538 : 328 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
539 : : {
540 : : /* Lock here may be overkill, but I'm not convinced of that ... */
541 [ + + ]: 328 : if (!lock_held)
542 : 295 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
7627 tgl@sss.pgh.pa.us 543 [ - + ]: 328 : Assert(!gxact->valid);
544 : 328 : gxact->valid = true;
3247 alvherre@alvh.no-ip. 545 [ + + ]: 328 : if (!lock_held)
546 : 295 : LWLockRelease(TwoPhaseStateLock);
547 : :
548 : : /*
549 : : * Put it into the global ProcArray so TransactionIdIsInProgress considers
550 : : * the XID as still running.
551 : : */
793 heikki.linnakangas@i 552 : 328 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
7627 tgl@sss.pgh.pa.us 553 : 328 : }
554 : :
555 : : /*
556 : : * LockGXact
557 : : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
558 : : */
559 : : static GlobalTransaction
7616 560 : 317 : LockGXact(const char *gid, Oid user)
561 : : {
562 : : int i;
563 : :
564 : : /* on first call, register the exit hook */
4373 heikki.linnakangas@i 565 [ + + ]: 317 : if (!twophaseExitRegistered)
566 : : {
567 : 64 : before_shmem_exit(AtProcExit_Twophase, 0);
568 : 64 : twophaseExitRegistered = true;
569 : : }
570 : :
7627 tgl@sss.pgh.pa.us 571 : 317 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
572 : :
573 [ + + ]: 495 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
574 : : {
7507 bruce@momjian.us 575 : 481 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
793 heikki.linnakangas@i 576 : 481 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
577 : :
578 : : /* Ignore not-yet-valid GIDs */
7627 tgl@sss.pgh.pa.us 579 [ - + ]: 481 : if (!gxact->valid)
7627 tgl@sss.pgh.pa.us 580 :UBC 0 : continue;
7627 tgl@sss.pgh.pa.us 581 [ + + ]:CBC 481 : if (strcmp(gxact->gid, gid) != 0)
582 : 178 : continue;
583 : :
584 : : /* Found it, but has someone else got it locked? */
793 heikki.linnakangas@i 585 [ - + ]: 303 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
4373 heikki.linnakangas@i 586 [ # # ]:UBC 0 : ereport(ERROR,
587 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
588 : : errmsg("prepared transaction with identifier \"%s\" is busy",
589 : : gid)));
590 : :
7627 tgl@sss.pgh.pa.us 591 [ - + - - ]:CBC 303 : if (user != gxact->owner && !superuser_arg(user))
7627 tgl@sss.pgh.pa.us 592 [ # # ]:UBC 0 : ereport(ERROR,
593 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
594 : : errmsg("permission denied to finish prepared transaction"),
595 : : errhint("Must be superuser or the user that prepared the transaction.")));
596 : :
597 : : /*
598 : : * Note: it probably would be possible to allow committing from
599 : : * another database; but at the moment NOTIFY is known not to work and
600 : : * there may be some other issues as well. Hence disallow until
601 : : * someone gets motivated to make it work.
602 : : */
5275 rhaas@postgresql.org 603 [ - + ]:CBC 303 : if (MyDatabaseId != proc->databaseId)
7021 tgl@sss.pgh.pa.us 604 [ # # ]:UBC 0 : ereport(ERROR,
605 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
606 : : errmsg("prepared transaction belongs to another database"),
607 : : errhint("Connect to the database where the transaction was prepared to finish it.")));
608 : :
609 : : /* OK for me to lock it */
793 heikki.linnakangas@i 610 :CBC 303 : gxact->locking_backend = MyProcNumber;
4373 611 : 303 : MyLockedGxact = gxact;
612 : :
7627 tgl@sss.pgh.pa.us 613 : 303 : LWLockRelease(TwoPhaseStateLock);
614 : :
615 : 303 : return gxact;
616 : : }
617 : :
7627 tgl@sss.pgh.pa.us 618 :GBC 14 : LWLockRelease(TwoPhaseStateLock);
619 : :
620 [ + - ]: 14 : ereport(ERROR,
621 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
622 : : errmsg("prepared transaction with identifier \"%s\" does not exist",
623 : : gid)));
624 : :
625 : : /* NOTREACHED */
626 : : return NULL;
627 : : }
628 : :
629 : : /*
630 : : * RemoveGXact
631 : : * Remove the prepared transaction from the shared memory array.
632 : : *
633 : : * NB: caller should have already removed it from ProcArray
634 : : */
635 : : static void
7627 tgl@sss.pgh.pa.us 636 :CBC 367 : RemoveGXact(GlobalTransaction gxact)
637 : : {
638 : : int i;
639 : :
3247 alvherre@alvh.no-ip. 640 [ - + ]: 367 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
641 : :
7627 tgl@sss.pgh.pa.us 642 [ + - ]: 543 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
643 : : {
644 [ + + ]: 543 : if (gxact == TwoPhaseState->prepXacts[i])
645 : : {
646 : : /* remove from the active array */
647 : 367 : TwoPhaseState->numPrepXacts--;
648 : 367 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
649 : :
650 : : /* and put it back in the freelist */
5275 rhaas@postgresql.org 651 : 367 : gxact->next = TwoPhaseState->freeGXacts;
6393 tgl@sss.pgh.pa.us 652 : 367 : TwoPhaseState->freeGXacts = gxact;
653 : :
7627 654 : 367 : return;
655 : : }
656 : : }
657 : :
7627 tgl@sss.pgh.pa.us 658 [ # # ]:UBC 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
659 : : }
660 : :
661 : : /*
662 : : * Returns an array of all prepared transactions for the user-level
663 : : * function pg_prepared_xact.
664 : : *
665 : : * The returned array and all its elements are copies of internal data
666 : : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
667 : : *
668 : : * WARNING -- we return even those transactions that are not fully prepared
669 : : * yet. The caller should filter them out if he doesn't want them.
670 : : *
671 : : * The returned array is palloc'd.
672 : : */
673 : : static int
7627 tgl@sss.pgh.pa.us 674 :CBC 109 : GetPreparedTransactionList(GlobalTransaction *gxacts)
675 : : {
676 : : GlobalTransaction array;
677 : : int num;
678 : : int i;
679 : :
680 : 109 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
681 : :
682 [ + + ]: 109 : if (TwoPhaseState->numPrepXacts == 0)
683 : : {
684 : 68 : LWLockRelease(TwoPhaseStateLock);
685 : :
686 : 68 : *gxacts = NULL;
687 : 68 : return 0;
688 : : }
689 : :
690 : 41 : num = TwoPhaseState->numPrepXacts;
146 michael@paquier.xyz 691 :GNC 41 : array = palloc_array(GlobalTransactionData, num);
7627 tgl@sss.pgh.pa.us 692 :CBC 41 : *gxacts = array;
693 [ + + ]: 87 : for (i = 0; i < num; i++)
694 : 46 : memcpy(array + i, TwoPhaseState->prepXacts[i],
695 : : sizeof(GlobalTransactionData));
696 : :
697 : 41 : LWLockRelease(TwoPhaseStateLock);
698 : :
699 : 41 : return num;
700 : : }
701 : :
702 : :
703 : : /* Working status for pg_prepared_xact */
704 : : typedef struct
705 : : {
706 : : GlobalTransaction array;
707 : : int ngxacts;
708 : : int currIdx;
709 : : } Working_State;
710 : :
711 : : /*
712 : : * pg_prepared_xact
713 : : * Produce a view with one row per prepared transaction.
714 : : *
715 : : * This function is here so we don't have to export the
716 : : * GlobalTransactionData struct definition.
717 : : */
718 : : Datum
719 : 155 : pg_prepared_xact(PG_FUNCTION_ARGS)
720 : : {
721 : : FuncCallContext *funcctx;
722 : : Working_State *status;
723 : :
724 [ + + ]: 155 : if (SRF_IS_FIRSTCALL())
725 : : {
726 : : TupleDesc tupdesc;
727 : : MemoryContext oldcontext;
728 : :
729 : : /* create a function context for cross-call persistence */
730 : 109 : funcctx = SRF_FIRSTCALL_INIT();
731 : :
732 : : /*
733 : : * Switch to memory context appropriate for multiple function calls
734 : : */
735 : 109 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
736 : :
737 : : /* build tupdesc for result tuples */
738 : : /* this had better match pg_prepared_xacts view in system_views.sql */
2723 andres@anarazel.de 739 : 109 : tupdesc = CreateTemplateTupleDesc(5);
7627 tgl@sss.pgh.pa.us 740 : 109 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
741 : : XIDOID, -1, 0);
742 : 109 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
743 : : TEXTOID, -1, 0);
7626 744 : 109 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
745 : : TIMESTAMPTZOID, -1, 0);
746 : 109 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
747 : : OIDOID, -1, 0);
748 : 109 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
749 : : OIDOID, -1, 0);
750 : :
50 drowley@postgresql.o 751 :GNC 109 : TupleDescFinalize(tupdesc);
7627 tgl@sss.pgh.pa.us 752 :CBC 109 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
753 : :
754 : : /*
755 : : * Collect all the 2PC status information that we will format and send
756 : : * out as a result set.
757 : : */
146 michael@paquier.xyz 758 :GNC 109 : status = palloc_object(Working_State);
523 peter@eisentraut.org 759 :CBC 109 : funcctx->user_fctx = status;
760 : :
7627 tgl@sss.pgh.pa.us 761 : 109 : status->ngxacts = GetPreparedTransactionList(&status->array);
762 : 109 : status->currIdx = 0;
763 : :
764 : 109 : MemoryContextSwitchTo(oldcontext);
765 : : }
766 : :
767 : 155 : funcctx = SRF_PERCALL_SETUP();
768 : 155 : status = (Working_State *) funcctx->user_fctx;
769 : :
770 [ + + + + ]: 155 : while (status->array != NULL && status->currIdx < status->ngxacts)
771 : : {
772 : 46 : GlobalTransaction gxact = &status->array[status->currIdx++];
803 heikki.linnakangas@i 773 : 46 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1389 peter@eisentraut.org 774 : 46 : Datum values[5] = {0};
775 : 46 : bool nulls[5] = {0};
776 : : HeapTuple tuple;
777 : : Datum result;
778 : :
7627 tgl@sss.pgh.pa.us 779 [ - + ]: 46 : if (!gxact->valid)
7627 tgl@sss.pgh.pa.us 780 :UBC 0 : continue;
781 : :
782 : : /*
783 : : * Form tuple with appropriate data.
784 : : */
785 : :
2090 andres@anarazel.de 786 :CBC 46 : values[0] = TransactionIdGetDatum(proc->xid);
6615 tgl@sss.pgh.pa.us 787 : 46 : values[1] = CStringGetTextDatum(gxact->gid);
7626 788 : 46 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
7616 789 : 46 : values[3] = ObjectIdGetDatum(gxact->owner);
5275 rhaas@postgresql.org 790 : 46 : values[4] = ObjectIdGetDatum(proc->databaseId);
791 : :
7627 tgl@sss.pgh.pa.us 792 : 46 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
793 : 46 : result = HeapTupleGetDatum(tuple);
794 : 46 : SRF_RETURN_NEXT(funcctx, result);
795 : : }
796 : :
797 : 109 : SRF_RETURN_DONE(funcctx);
798 : : }
799 : :
800 : : /*
801 : : * TwoPhaseGetGXact
802 : : * Get the GlobalTransaction struct for a prepared transaction
803 : : * specified by XID
804 : : *
805 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
806 : : * caller had better hold it.
807 : : */
808 : : static GlobalTransaction
302 michael@paquier.xyz 809 :GNC 1344 : TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
810 : : {
5018 tgl@sss.pgh.pa.us 811 :CBC 1344 : GlobalTransaction result = NULL;
812 : : int i;
813 : :
814 : : static FullTransactionId cached_fxid = {InvalidTransactionId};
815 : : static GlobalTransaction cached_gxact = NULL;
816 : :
2626 michael@paquier.xyz 817 [ + + - + ]: 1344 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
818 : :
819 : : /*
820 : : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
821 : : * repeatedly for the same XID. We can save work with a simple cache.
822 : : */
302 michael@paquier.xyz 823 [ + + ]:GNC 1344 : if (FullTransactionIdEquals(fxid, cached_fxid))
5018 tgl@sss.pgh.pa.us 824 :CBC 939 : return cached_gxact;
825 : :
2626 michael@paquier.xyz 826 [ + + ]: 405 : if (!lock_held)
827 : 328 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
828 : :
7627 tgl@sss.pgh.pa.us 829 [ + - ]: 637 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
830 : : {
7507 bruce@momjian.us 831 : 637 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
832 : :
302 michael@paquier.xyz 833 [ + + ]:GNC 637 : if (FullTransactionIdEquals(gxact->fxid, fxid))
834 : : {
5018 tgl@sss.pgh.pa.us 835 :CBC 405 : result = gxact;
7627 836 : 405 : break;
837 : : }
838 : : }
839 : :
2626 michael@paquier.xyz 840 [ + + ]: 405 : if (!lock_held)
841 : 328 : LWLockRelease(TwoPhaseStateLock);
842 : :
7627 tgl@sss.pgh.pa.us 843 [ - + ]: 405 : if (result == NULL) /* should not happen */
302 michael@paquier.xyz 844 [ # # ]:UNC 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u",
845 : : XidFromFullTransactionId(fxid));
846 : :
302 michael@paquier.xyz 847 :GNC 405 : cached_fxid = fxid;
5018 tgl@sss.pgh.pa.us 848 :CBC 405 : cached_gxact = result;
849 : :
7627 850 : 405 : return result;
851 : : }
852 : :
853 : : /*
854 : : * TwoPhaseGetXidByVirtualXID
855 : : * Lookup VXID among xacts prepared since last startup.
856 : : *
857 : : * (This won't find recovered xacts.) If more than one matches, return any
858 : : * and set "have_more" to true. To witness multiple matches, a single
859 : : * proc number must consume 2^32 LXIDs, with no intervening database restart.
860 : : */
861 : : TransactionId
1655 noah@leadboat.com 862 : 87 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
863 : : bool *have_more)
864 : : {
865 : : int i;
866 : 87 : TransactionId result = InvalidTransactionId;
867 : :
868 [ - + ]: 87 : Assert(VirtualTransactionIdIsValid(vxid));
869 : 87 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
870 : :
871 [ + + ]: 138 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
872 : : {
873 : 51 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
874 : : PGPROC *proc;
875 : : VirtualTransactionId proc_vxid;
876 : :
877 [ + + ]: 51 : if (!gxact->valid)
878 : 1 : continue;
793 heikki.linnakangas@i 879 : 50 : proc = GetPGProcByNumber(gxact->pgprocno);
1655 noah@leadboat.com 880 : 50 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
881 [ + + + + ]: 50 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
882 : : {
883 : : /*
884 : : * Startup process sets proc->vxid.procNumber to
885 : : * INVALID_PROC_NUMBER.
886 : : */
887 [ - + ]: 17 : Assert(!gxact->inredo);
888 : :
889 [ - + ]: 17 : if (result != InvalidTransactionId)
890 : : {
1655 noah@leadboat.com 891 :UBC 0 : *have_more = true;
892 : 0 : break;
893 : : }
302 michael@paquier.xyz 894 :GNC 17 : result = XidFromFullTransactionId(gxact->fxid);
895 : : }
896 : : }
897 : :
1655 noah@leadboat.com 898 :CBC 87 : LWLockRelease(TwoPhaseStateLock);
899 : :
900 : 87 : return result;
901 : : }
902 : :
903 : : /*
904 : : * TwoPhaseGetDummyProcNumber
905 : : * Get the dummy proc number for prepared transaction
906 : : *
907 : : * Dummy proc numbers are similar to proc numbers of real backends. They
908 : : * start at FIRST_PREPARED_XACT_PROC_NUMBER, and are unique across all
909 : : * currently active real backends and prepared transactions. If lock_held is
910 : : * set to true, TwoPhaseStateLock will not be taken, so the caller had better
911 : : * hold it.
912 : : */
913 : : ProcNumber
302 michael@paquier.xyz 914 :GNC 147 : TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
915 : : {
916 : 147 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
917 : :
793 heikki.linnakangas@i 918 :CBC 147 : return gxact->pgprocno;
919 : : }
920 : :
921 : : /*
922 : : * TwoPhaseGetDummyProc
923 : : * Get the PGPROC that represents a prepared transaction
924 : : *
925 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
926 : : * caller had better hold it.
927 : : */
928 : : PGPROC *
302 michael@paquier.xyz 929 :GNC 1197 : TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
930 : : {
931 : 1197 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
932 : :
803 heikki.linnakangas@i 933 :CBC 1197 : return GetPGProcByNumber(gxact->pgprocno);
934 : : }
935 : :
936 : : /************************************************************************/
937 : : /* State file support */
938 : : /************************************************************************/
939 : :
940 : : /*
941 : : * Compute the FullTransactionId for the given TransactionId.
942 : : *
943 : : * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
944 : : * PREPARED. After those commands, concurrent vac_truncate_clog() may make
945 : : * the xid cease to qualify as allowable. XXX Not all callers limit their
946 : : * calls accordingly.
947 : : */
948 : : static inline FullTransactionId
473 michael@paquier.xyz 949 : 362 : AdjustToFullTransactionId(TransactionId xid)
950 : : {
951 [ - + ]: 362 : Assert(TransactionIdIsValid(xid));
465 noah@leadboat.com 952 : 362 : return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
953 : : }
954 : :
955 : : static inline int
302 michael@paquier.xyz 956 :GNC 573 : TwoPhaseFilePath(char *path, FullTransactionId fxid)
957 : : {
888 akorotkov@postgresql 958 :CBC 1146 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
959 : 573 : EpochFromFullTransactionId(fxid),
960 : 573 : XidFromFullTransactionId(fxid));
961 : : }
962 : :
963 : : /*
964 : : * 2PC state file format:
965 : : *
966 : : * 1. TwoPhaseFileHeader
967 : : * 2. TransactionId[] (subtransactions)
968 : : * 3. RelFileLocator[] (files to be deleted at commit)
969 : : * 4. RelFileLocator[] (files to be deleted at abort)
970 : : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
971 : : * 6. TwoPhaseRecordOnDisk
972 : : * 7. ...
973 : : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
974 : : * 9. checksum (CRC-32C)
975 : : *
976 : : * Each segment except the final checksum is MAXALIGN'd.
977 : : */
978 : :
979 : : /*
980 : : * Header for a 2PC state file
981 : : */
982 : : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
983 : :
984 : : typedef xl_xact_prepare TwoPhaseFileHeader;
985 : :
986 : : /*
987 : : * Header for each record in a state file
988 : : *
989 : : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
990 : : * The rmgr data will be stored starting on a MAXALIGN boundary.
991 : : */
992 : : typedef struct TwoPhaseRecordOnDisk
993 : : {
994 : : uint32 len; /* length of rmgr data */
995 : : TwoPhaseRmgrId rmid; /* resource manager for this record */
996 : : uint16 info; /* flag bits for use by rmgr */
997 : : } TwoPhaseRecordOnDisk;
998 : :
999 : : /*
1000 : : * During prepare, the state file is assembled in memory before writing it
1001 : : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
1002 : : * for that.
1003 : : */
1004 : : typedef struct StateFileChunk
1005 : : {
1006 : : char *data;
1007 : : uint32 len;
1008 : : struct StateFileChunk *next;
1009 : : } StateFileChunk;
1010 : :
1011 : : static struct xllist
1012 : : {
1013 : : StateFileChunk *head; /* first data block in the chain */
1014 : : StateFileChunk *tail; /* last block in chain */
1015 : : uint32 num_chunks;
1016 : : uint32 bytes_free; /* free bytes left in tail block */
1017 : : uint32 total_len; /* total data bytes in chain */
1018 : : } records;
1019 : :
1020 : :
1021 : : /*
1022 : : * Append a block of data to records data structure.
1023 : : *
1024 : : * NB: each block is padded to a MAXALIGN multiple. This must be
1025 : : * accounted for when the file is later read!
1026 : : *
1027 : : * The data is copied, so the caller is free to modify it afterwards.
1028 : : */
1029 : : static void
7627 tgl@sss.pgh.pa.us 1030 : 3699 : save_state_data(const void *data, uint32 len)
1031 : : {
7507 bruce@momjian.us 1032 : 3699 : uint32 padlen = MAXALIGN(len);
1033 : :
7627 tgl@sss.pgh.pa.us 1034 [ + + ]: 3699 : if (padlen > records.bytes_free)
1035 : : {
146 michael@paquier.xyz 1036 :GNC 79 : records.tail->next = palloc0_object(StateFileChunk);
7627 tgl@sss.pgh.pa.us 1037 :CBC 79 : records.tail = records.tail->next;
1038 : 79 : records.tail->len = 0;
1039 : 79 : records.tail->next = NULL;
4184 heikki.linnakangas@i 1040 : 79 : records.num_chunks++;
1041 : :
7627 tgl@sss.pgh.pa.us 1042 : 79 : records.bytes_free = Max(padlen, 512);
1043 : 79 : records.tail->data = palloc(records.bytes_free);
1044 : : }
1045 : :
154 peter@eisentraut.org 1046 :GNC 3699 : memcpy(records.tail->data + records.tail->len, data, len);
7627 tgl@sss.pgh.pa.us 1047 :CBC 3699 : records.tail->len += padlen;
1048 : 3699 : records.bytes_free -= padlen;
1049 : 3699 : records.total_len += padlen;
1050 : 3699 : }
1051 : :
1052 : : /*
1053 : : * Start preparing a state file.
1054 : : *
1055 : : * Initializes data structure and inserts the 2PC file header record.
1056 : : */
1057 : : void
1058 : 297 : StartPrepare(GlobalTransaction gxact)
1059 : : {
803 heikki.linnakangas@i 1060 : 297 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
302 michael@paquier.xyz 1061 :GNC 297 : TransactionId xid = XidFromFullTransactionId(gxact->fxid);
1062 : : TwoPhaseFileHeader hdr;
1063 : : TransactionId *children;
1064 : : RelFileLocator *commitrels;
1065 : : RelFileLocator *abortrels;
1490 andres@anarazel.de 1066 :CBC 297 : xl_xact_stats_item *abortstats = NULL;
1067 : 297 : xl_xact_stats_item *commitstats = NULL;
1068 : : SharedInvalidationMessage *invalmsgs;
1069 : :
1070 : : /* Initialize linked list */
146 michael@paquier.xyz 1071 :GNC 297 : records.head = palloc0_object(StateFileChunk);
7627 tgl@sss.pgh.pa.us 1072 :CBC 297 : records.head->len = 0;
1073 : 297 : records.head->next = NULL;
1074 : :
1075 : 297 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1076 : 297 : records.head->data = palloc(records.bytes_free);
1077 : :
1078 : 297 : records.tail = records.head;
4184 heikki.linnakangas@i 1079 : 297 : records.num_chunks = 1;
1080 : :
7627 tgl@sss.pgh.pa.us 1081 : 297 : records.total_len = 0;
1082 : :
1083 : : /* Create header */
1084 : 297 : hdr.magic = TWOPHASE_MAGIC;
1085 : 297 : hdr.total_len = 0; /* EndPrepare will fill this in */
1086 : 297 : hdr.xid = xid;
5275 rhaas@postgresql.org 1087 : 297 : hdr.database = proc->databaseId;
7626 tgl@sss.pgh.pa.us 1088 : 297 : hdr.prepared_at = gxact->prepared_at;
1089 : 297 : hdr.owner = gxact->owner;
7627 1090 : 297 : hdr.nsubxacts = xactGetCommittedChildren(&children);
5744 rhaas@postgresql.org 1091 : 297 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1092 : 297 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1490 andres@anarazel.de 1093 : 297 : hdr.ncommitstats =
1094 : 297 : pgstat_get_transactional_drops(true, &commitstats);
1095 : 297 : hdr.nabortstats =
1096 : 297 : pgstat_get_transactional_drops(false, &abortstats);
5981 simon@2ndQuadrant.co 1097 : 297 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1098 : : &hdr.initfileinval);
3240 tgl@sss.pgh.pa.us 1099 : 297 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1100 : : /* EndPrepare will fill the origin data, if necessary */
1541 michael@paquier.xyz 1101 : 297 : hdr.origin_lsn = InvalidXLogRecPtr;
1102 : 297 : hdr.origin_timestamp = 0;
1103 : :
7627 tgl@sss.pgh.pa.us 1104 : 297 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
3708 simon@2ndQuadrant.co 1105 : 297 : save_state_data(gxact->gid, hdr.gidlen);
1106 : :
1107 : : /*
1108 : : * Add the additional info about subxacts, deletable files and cache
1109 : : * invalidation messages.
1110 : : */
7627 tgl@sss.pgh.pa.us 1111 [ + + ]: 297 : if (hdr.nsubxacts > 0)
1112 : : {
1113 : 99 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1114 : : /* While we have the child-xact data, stuff it in the gxact too */
1115 : 99 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1116 : : }
1117 [ + + ]: 297 : if (hdr.ncommitrels > 0)
1118 : : {
1399 rhaas@postgresql.org 1119 : 21 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
7627 tgl@sss.pgh.pa.us 1120 : 21 : pfree(commitrels);
1121 : : }
1122 [ + + ]: 297 : if (hdr.nabortrels > 0)
1123 : : {
1399 rhaas@postgresql.org 1124 : 29 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
7627 tgl@sss.pgh.pa.us 1125 : 29 : pfree(abortrels);
1126 : : }
1490 andres@anarazel.de 1127 [ + + ]: 297 : if (hdr.ncommitstats > 0)
1128 : : {
1129 : 21 : save_state_data(commitstats,
1130 : 21 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1131 : 21 : pfree(commitstats);
1132 : : }
1133 [ + + ]: 297 : if (hdr.nabortstats > 0)
1134 : : {
1135 : 25 : save_state_data(abortstats,
1454 tgl@sss.pgh.pa.us 1136 : 25 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1490 andres@anarazel.de 1137 : 25 : pfree(abortstats);
1138 : : }
5981 simon@2ndQuadrant.co 1139 [ + + ]: 297 : if (hdr.ninvalmsgs > 0)
1140 : : {
1141 : 37 : save_state_data(invalmsgs,
1142 : 37 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1143 : 37 : pfree(invalmsgs);
1144 : : }
7627 tgl@sss.pgh.pa.us 1145 : 297 : }
1146 : :
1147 : : /*
1148 : : * Finish preparing state data and writing it to WAL.
1149 : : */
1150 : : void
1151 : 295 : EndPrepare(GlobalTransaction gxact)
1152 : : {
1153 : : TwoPhaseFileHeader *hdr;
1154 : : StateFileChunk *record;
1155 : : bool replorigin;
1156 : :
1157 : : /* Add the end sentinel to the list of 2PC records */
1158 : 295 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1159 : : NULL, 0);
1160 : :
1161 : : /* Go back and fill in total_len in the file header record */
1162 : 295 : hdr = (TwoPhaseFileHeader *) records.head->data;
1163 [ - + ]: 295 : Assert(hdr->magic == TWOPHASE_MAGIC);
4039 heikki.linnakangas@i 1164 : 295 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1165 : :
97 msawada@postgresql.o 1166 [ + + ]:GNC 321 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
1167 [ + - ]: 26 : replorigin_xact_state.origin != DoNotReplicateId);
1168 : :
2960 simon@2ndQuadrant.co 1169 [ + + ]:CBC 295 : if (replorigin)
1170 : : {
97 msawada@postgresql.o 1171 :GNC 26 : hdr->origin_lsn = replorigin_xact_state.origin_lsn;
1172 : 26 : hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
1173 : : }
1174 : :
1175 : : /*
1176 : : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1177 : : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1178 : : * where we write data to file and then re-read at commit time.
1179 : : */
6560 heikki.linnakangas@i 1180 [ - + ]:CBC 295 : if (hdr->total_len > MaxAllocSize)
6560 heikki.linnakangas@i 1181 [ # # ]:UBC 0 : ereport(ERROR,
1182 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1183 : : errmsg("two-phase state file maximum length exceeded")));
1184 : :
1185 : : /*
1186 : : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1187 : : * cover us, so no need to calculate a separate CRC.
1188 : : *
1189 : : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1190 : : * starting immediately after the WAL record is inserted could complete
1191 : : * without fsync'ing our state file. (This is essentially the same kind
1192 : : * of race condition as the COMMIT-to-clog-write case that
1193 : : * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
1194 : : * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
1195 : : * the critical commit section. We need to know about such transactions
1196 : : * for conflict detection in logical replication. See
1197 : : * GetOldestActiveTransactionId(true, false) and its use.
1198 : : *
1199 : : * We save the PREPARE record's location in the gxact for later use by
1200 : : * CheckPointTwoPhase.
1201 : : */
4184 heikki.linnakangas@i 1202 :CBC 295 : XLogEnsureRecordSpace(0, records.num_chunks);
1203 : :
7627 tgl@sss.pgh.pa.us 1204 : 295 : START_CRIT_SECTION();
1205 : :
1488 rhaas@postgresql.org 1206 [ - + ]: 295 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1207 : 295 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1208 : :
4184 heikki.linnakangas@i 1209 : 295 : XLogBeginInsert();
1210 [ + + ]: 669 : for (record = records.head; record != NULL; record = record->next)
1211 : 374 : XLogRegisterData(record->data, record->len);
1212 : :
2960 simon@2ndQuadrant.co 1213 : 295 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1214 : :
3758 1215 : 295 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1216 : :
2960 1217 [ + + ]: 295 : if (replorigin)
1218 : : {
1219 : : /* Move LSNs forward for this replication origin */
97 msawada@postgresql.o 1220 :GNC 26 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
1221 : : gxact->prepare_end_lsn);
1222 : : }
1223 : :
3758 simon@2ndQuadrant.co 1224 :CBC 295 : XLogFlush(gxact->prepare_end_lsn);
1225 : :
1226 : : /* If we crash now, we have prepared: WAL replay will fix things */
1227 : :
1228 : : /* Store record's start location to read that later on Commit */
1229 : 295 : gxact->prepare_start_lsn = ProcLastRecPtr;
1230 : :
1231 : : /*
1232 : : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1233 : : * as not running our XID (which it will do immediately after this
1234 : : * function returns), others can commit/rollback the xact.
1235 : : *
1236 : : * NB: a side effect of this is to make a dummy ProcArray entry for the
1237 : : * prepared XID. This must happen before we clear the XID from MyProc /
1238 : : * ProcGlobal->xids[], else there is a window where the XID is not running
1239 : : * according to TransactionIdIsInProgress, and onlookers would be entitled
1240 : : * to assume the xact crashed. Instead we have a window where the same
1241 : : * XID appears twice in ProcArray, which is OK.
1242 : : */
3247 alvherre@alvh.no-ip. 1243 : 295 : MarkAsPrepared(gxact, false);
1244 : :
1245 : : /*
1246 : : * Now we can mark ourselves as out of the commit critical section: a
1247 : : * checkpoint starting after this will certainly see the gxact as a
1248 : : * candidate for fsyncing.
1249 : : */
1488 rhaas@postgresql.org 1250 : 295 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1251 : :
1252 : : /*
1253 : : * Remember that we have this GlobalTransaction entry locked for us. If
1254 : : * we crash after this point, it's too late to abort, but we must unlock
1255 : : * it so that the prepared transaction can be committed or rolled back.
1256 : : */
4373 heikki.linnakangas@i 1257 : 295 : MyLockedGxact = gxact;
1258 : :
7627 tgl@sss.pgh.pa.us 1259 [ - + ]: 295 : END_CRIT_SECTION();
1260 : :
1261 : : /*
1262 : : * Wait for synchronous replication, if required.
1263 : : *
1264 : : * Note that at this stage we have marked the prepare, but still show as
1265 : : * running in the procarray (twice!) and continue to hold locks.
1266 : : */
3689 rhaas@postgresql.org 1267 : 295 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1268 : :
7627 tgl@sss.pgh.pa.us 1269 : 295 : records.tail = records.head = NULL;
4184 heikki.linnakangas@i 1270 : 295 : records.num_chunks = 0;
7627 tgl@sss.pgh.pa.us 1271 : 295 : }
1272 : :
1273 : : /*
1274 : : * Register a 2PC record to be written to state file.
1275 : : */
1276 : : void
1277 : 1584 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1278 : : const void *data, uint32 len)
1279 : : {
1280 : : TwoPhaseRecordOnDisk record;
1281 : :
1282 : 1584 : record.rmid = rmid;
1283 : 1584 : record.info = info;
1284 : 1584 : record.len = len;
1285 : 1584 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1286 [ + + ]: 1584 : if (len > 0)
1287 : 1289 : save_state_data(data, len);
1288 : 1584 : }
1289 : :
1290 : :
1291 : : /*
1292 : : * Read and validate the state file for xid.
1293 : : *
1294 : : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1295 : : * contents of the file, issuing an error when finding corrupted data. If
1296 : : * missing_ok is true, which indicates that missing files can be safely
1297 : : * ignored, then return NULL. This state can be reached when doing recovery
1298 : : * after discarding two-phase files from frozen epochs.
1299 : : */
1300 : : static char *
302 michael@paquier.xyz 1301 :GNC 437 : ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
1302 : : {
1303 : : char path[MAXPGPATH];
1304 : : char *buf;
1305 : : TwoPhaseFileHeader *hdr;
1306 : : int fd;
1307 : : struct stat stat;
1308 : : uint32 crc_offset;
1309 : : pg_crc32c calc_crc,
1310 : : file_crc;
1311 : : int r;
1312 : :
1313 : 437 : TwoPhaseFilePath(path, fxid);
1314 : :
3146 peter_e@gmx.net 1315 :CBC 437 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
7627 tgl@sss.pgh.pa.us 1316 [ + + ]: 437 : if (fd < 0)
1317 : : {
2797 michael@paquier.xyz 1318 [ + - + - ]: 362 : if (missing_ok && errno == ENOENT)
1319 : 362 : return NULL;
1320 : :
2797 michael@paquier.xyz 1321 [ # # ]:UBC 0 : ereport(ERROR,
1322 : : (errcode_for_file_access(),
1323 : : errmsg("could not open file \"%s\": %m", path)));
1324 : : }
1325 : :
1326 : : /*
1327 : : * Check file length. We can determine a lower bound pretty easily. We
1328 : : * set an upper bound to avoid palloc() failure on a corrupt file, though
1329 : : * we can't guarantee that we won't get an out of memory error anyway,
1330 : : * even on a valid file.
1331 : : */
7627 tgl@sss.pgh.pa.us 1332 [ - + ]:CBC 75 : if (fstat(fd, &stat))
2797 michael@paquier.xyz 1333 [ # # ]:UBC 0 : ereport(ERROR,
1334 : : (errcode_for_file_access(),
1335 : : errmsg("could not stat file \"%s\": %m", path)));
1336 : :
7627 tgl@sss.pgh.pa.us 1337 [ + - ]:CBC 75 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1338 : : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
4039 heikki.linnakangas@i 1339 : 75 : sizeof(pg_crc32c)) ||
6560 1340 [ - + ]: 75 : stat.st_size > MaxAllocSize)
2797 michael@paquier.xyz 1341 [ # # ]:UBC 0 : ereport(ERROR,
1342 : : (errcode(ERRCODE_DATA_CORRUPTED),
1343 : : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1344 : : "incorrect size of file \"%s\": %lld bytes",
1345 : : (long long int) stat.st_size, path,
1346 : : (long long int) stat.st_size)));
1347 : :
4039 heikki.linnakangas@i 1348 :CBC 75 : crc_offset = stat.st_size - sizeof(pg_crc32c);
7627 tgl@sss.pgh.pa.us 1349 [ - + ]: 75 : if (crc_offset != MAXALIGN(crc_offset))
2797 michael@paquier.xyz 1350 [ # # ]:UBC 0 : ereport(ERROR,
1351 : : (errcode(ERRCODE_DATA_CORRUPTED),
1352 : : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1353 : : path)));
1354 : :
1355 : : /*
1356 : : * OK, slurp in the file.
1357 : : */
7627 tgl@sss.pgh.pa.us 1358 :CBC 75 : buf = (char *) palloc(stat.st_size);
1359 : :
3335 rhaas@postgresql.org 1360 : 75 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
2848 michael@paquier.xyz 1361 : 75 : r = read(fd, buf, stat.st_size);
1362 [ - + ]: 75 : if (r != stat.st_size)
1363 : : {
2797 michael@paquier.xyz 1364 [ # # ]:UBC 0 : if (r < 0)
1365 [ # # ]: 0 : ereport(ERROR,
1366 : : (errcode_for_file_access(),
1367 : : errmsg("could not read file \"%s\": %m", path)));
1368 : : else
1369 [ # # ]: 0 : ereport(ERROR,
1370 : : (errmsg("could not read file \"%s\": read %d of %lld",
1371 : : path, r, (long long int) stat.st_size)));
1372 : : }
1373 : :
3335 rhaas@postgresql.org 1374 :CBC 75 : pgstat_report_wait_end();
1375 : :
2495 peter@eisentraut.org 1376 [ - + ]: 75 : if (CloseTransientFile(fd) != 0)
2614 michael@paquier.xyz 1377 [ # # ]:UBC 0 : ereport(ERROR,
1378 : : (errcode_for_file_access(),
1379 : : errmsg("could not close file \"%s\": %m", path)));
1380 : :
7627 tgl@sss.pgh.pa.us 1381 :CBC 75 : hdr = (TwoPhaseFileHeader *) buf;
2797 michael@paquier.xyz 1382 [ - + ]: 75 : if (hdr->magic != TWOPHASE_MAGIC)
2797 michael@paquier.xyz 1383 [ # # ]:UBC 0 : ereport(ERROR,
1384 : : (errcode(ERRCODE_DATA_CORRUPTED),
1385 : : errmsg("invalid magic number stored in file \"%s\"",
1386 : : path)));
1387 : :
2797 michael@paquier.xyz 1388 [ - + ]:CBC 75 : if (hdr->total_len != stat.st_size)
2797 michael@paquier.xyz 1389 [ # # ]:UBC 0 : ereport(ERROR,
1390 : : (errcode(ERRCODE_DATA_CORRUPTED),
1391 : : errmsg("invalid size stored in file \"%s\"",
1392 : : path)));
1393 : :
4200 heikki.linnakangas@i 1394 :CBC 75 : INIT_CRC32C(calc_crc);
1395 : 75 : COMP_CRC32C(calc_crc, buf, crc_offset);
1396 : 75 : FIN_CRC32C(calc_crc);
1397 : :
4039 1398 : 75 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1399 : :
4200 1400 [ - + ]: 75 : if (!EQ_CRC32C(calc_crc, file_crc))
2797 michael@paquier.xyz 1401 [ # # ]:UBC 0 : ereport(ERROR,
1402 : : (errcode(ERRCODE_DATA_CORRUPTED),
1403 : : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1404 : : path)));
1405 : :
7627 tgl@sss.pgh.pa.us 1406 :CBC 75 : return buf;
1407 : : }
1408 : :
1409 : :
1410 : : /*
1411 : : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1412 : : * twophase files and ReadTwoPhaseFile should be used instead.
1413 : : *
1414 : : * Note clearly that this function can access WAL during normal operation,
1415 : : * similarly to the way WALSender or Logical Decoding would do.
1416 : : */
1417 : : static void
3758 simon@2ndQuadrant.co 1418 : 382 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1419 : : {
1420 : : XLogRecord *record;
1421 : : XLogReaderState *xlogreader;
1422 : : char *errormsg;
1423 : :
1821 tmunro@postgresql.or 1424 : 382 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1425 : 382 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1426 : : .segment_open = &wal_segment_open,
1427 : : .segment_close = &wal_segment_close),
1428 : : NULL);
3758 simon@2ndQuadrant.co 1429 [ - + ]: 382 : if (!xlogreader)
3758 simon@2ndQuadrant.co 1430 [ # # ]:UBC 0 : ereport(ERROR,
1431 : : (errcode(ERRCODE_OUT_OF_MEMORY),
1432 : : errmsg("out of memory"),
1433 : : errdetail("Failed while allocating a WAL reading processor.")));
1434 : :
2291 heikki.linnakangas@i 1435 :CBC 382 : XLogBeginRead(xlogreader, lsn);
1821 tmunro@postgresql.or 1436 : 382 : record = XLogReadRecord(xlogreader, &errormsg);
1437 : :
3758 simon@2ndQuadrant.co 1438 [ - + ]: 382 : if (record == NULL)
1439 : : {
1636 noah@leadboat.com 1440 [ # # ]:UBC 0 : if (errormsg)
1441 [ # # ]: 0 : ereport(ERROR,
1442 : : (errcode_for_file_access(),
1443 : : errmsg("could not read two-phase state from WAL at %X/%08X: %s",
1444 : : LSN_FORMAT_ARGS(lsn), errormsg)));
1445 : : else
1446 [ # # ]: 0 : ereport(ERROR,
1447 : : (errcode_for_file_access(),
1448 : : errmsg("could not read two-phase state from WAL at %X/%08X",
1449 : : LSN_FORMAT_ARGS(lsn))));
1450 : : }
1451 : :
3758 simon@2ndQuadrant.co 1452 [ + - ]:CBC 382 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1453 [ - + ]: 382 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
3758 simon@2ndQuadrant.co 1454 [ # # ]:UBC 0 : ereport(ERROR,
1455 : : (errcode_for_file_access(),
1456 : : errmsg("expected two-phase state data is not present in WAL at %X/%08X",
1457 : : LSN_FORMAT_ARGS(lsn))));
1458 : :
3758 simon@2ndQuadrant.co 1459 [ + + ]:CBC 382 : if (len != NULL)
1460 : 25 : *len = XLogRecGetDataLen(xlogreader);
1461 : :
146 michael@paquier.xyz 1462 :GNC 382 : *buf = palloc_array(char, XLogRecGetDataLen(xlogreader));
3758 simon@2ndQuadrant.co 1463 :CBC 382 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1464 : :
1465 : 382 : XLogReaderFree(xlogreader);
1466 : 382 : }
1467 : :
1468 : :
1469 : : /*
1470 : : * Confirms an xid is prepared, during recovery
1471 : : */
1472 : : bool
5981 1473 : 362 : StandbyTransactionIdIsPrepared(TransactionId xid)
1474 : : {
1475 : : char *buf;
1476 : : TwoPhaseFileHeader *hdr;
1477 : : bool result;
1478 : : FullTransactionId fxid;
1479 : :
1480 [ - + ]: 362 : Assert(TransactionIdIsValid(xid));
1481 : :
5851 tgl@sss.pgh.pa.us 1482 [ - + ]: 362 : if (max_prepared_xacts <= 0)
5782 bruce@momjian.us 1483 :UBC 0 : return false; /* nothing to do */
1484 : :
1485 : : /* Read and validate file */
302 michael@paquier.xyz 1486 :GNC 362 : fxid = AdjustToFullTransactionId(xid);
1487 : 362 : buf = ReadTwoPhaseFile(fxid, true);
5981 simon@2ndQuadrant.co 1488 [ + - ]:CBC 362 : if (buf == NULL)
1489 : 362 : return false;
1490 : :
1491 : : /* Check header also */
5981 simon@2ndQuadrant.co 1492 :UBC 0 : hdr = (TwoPhaseFileHeader *) buf;
1493 : 0 : result = TransactionIdEquals(hdr->xid, xid);
1494 : 0 : pfree(buf);
1495 : :
1496 : 0 : return result;
1497 : : }
1498 : :
1499 : : /*
1500 : : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1501 : : */
1502 : : void
7626 tgl@sss.pgh.pa.us 1503 :CBC 317 : FinishPreparedTransaction(const char *gid, bool isCommit)
1504 : : {
1505 : : GlobalTransaction gxact;
1506 : : PGPROC *proc;
1507 : : FullTransactionId fxid;
1508 : : TransactionId xid;
1509 : : bool ondisk;
1510 : : char *buf;
1511 : : char *bufptr;
1512 : : TwoPhaseFileHeader *hdr;
1513 : : TransactionId latestXid;
1514 : : TransactionId *children;
1515 : : RelFileLocator *commitrels;
1516 : : RelFileLocator *abortrels;
1517 : : RelFileLocator *delrels;
1518 : : int ndelrels;
1519 : : xl_xact_stats_item *commitstats;
1520 : : xl_xact_stats_item *abortstats;
1521 : : SharedInvalidationMessage *invalmsgs;
1522 : :
1523 : : /*
1524 : : * Validate the GID, and lock the GXACT to ensure that two backends do not
1525 : : * try to commit the same GID at once.
1526 : : */
7627 1527 : 317 : gxact = LockGXact(gid, GetUserId());
803 heikki.linnakangas@i 1528 : 303 : proc = GetPGProcByNumber(gxact->pgprocno);
302 michael@paquier.xyz 1529 :GNC 303 : fxid = gxact->fxid;
1530 : 303 : xid = XidFromFullTransactionId(fxid);
1531 : :
1532 : : /*
1533 : : * Read and validate 2PC state data. State data will typically be stored
1534 : : * in WAL files if the LSN is after the last checkpoint record, or moved
1535 : : * to disk if for some reason they have lived for a long time.
1536 : : */
3758 simon@2ndQuadrant.co 1537 [ + + ]:CBC 303 : if (gxact->ondisk)
302 michael@paquier.xyz 1538 :GNC 25 : buf = ReadTwoPhaseFile(fxid, false);
1539 : : else
3758 simon@2ndQuadrant.co 1540 :CBC 278 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1541 : :
1542 : :
1543 : : /*
1544 : : * Disassemble the header area
1545 : : */
7627 tgl@sss.pgh.pa.us 1546 : 303 : hdr = (TwoPhaseFileHeader *) buf;
1547 [ - + ]: 303 : Assert(TransactionIdEquals(hdr->xid, xid));
1548 : 303 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
3708 simon@2ndQuadrant.co 1549 : 303 : bufptr += MAXALIGN(hdr->gidlen);
7627 tgl@sss.pgh.pa.us 1550 : 303 : children = (TransactionId *) bufptr;
1551 : 303 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1399 rhaas@postgresql.org 1552 : 303 : commitrels = (RelFileLocator *) bufptr;
1553 : 303 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1554 : 303 : abortrels = (RelFileLocator *) bufptr;
1555 : 303 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1454 tgl@sss.pgh.pa.us 1556 : 303 : commitstats = (xl_xact_stats_item *) bufptr;
1490 andres@anarazel.de 1557 : 303 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1454 tgl@sss.pgh.pa.us 1558 : 303 : abortstats = (xl_xact_stats_item *) bufptr;
1490 andres@anarazel.de 1559 : 303 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
5981 simon@2ndQuadrant.co 1560 : 303 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1561 : 303 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1562 : :
1563 : : /* compute latestXid among all children */
6814 tgl@sss.pgh.pa.us 1564 : 303 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1565 : :
1566 : : /* Prevent cancel/die interrupt while cleaning up */
2924 teodor@sigaev.ru 1567 : 303 : HOLD_INTERRUPTS();
1568 : :
1569 : : /*
1570 : : * The order of operations here is critical: make the XLOG entry for
1571 : : * commit or abort, then mark the transaction committed or aborted in
1572 : : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1573 : : * TransactionIdIsInProgress will stop saying the prepared xact is in
1574 : : * progress), then run the post-commit or post-abort callbacks. The
1575 : : * callbacks will release the locks the transaction held.
1576 : : */
7627 tgl@sss.pgh.pa.us 1577 [ + + ]: 303 : if (isCommit)
1578 : 255 : RecordTransactionCommitPrepared(xid,
1579 : : hdr->nsubxacts, children,
1580 : : hdr->ncommitrels, commitrels,
1581 : : hdr->ncommitstats,
1582 : : commitstats,
1583 : : hdr->ninvalmsgs, invalmsgs,
2960 simon@2ndQuadrant.co 1584 : 255 : hdr->initfileinval, gid);
1585 : : else
7627 tgl@sss.pgh.pa.us 1586 : 48 : RecordTransactionAbortPrepared(xid,
1587 : : hdr->nsubxacts, children,
1588 : : hdr->nabortrels, abortrels,
1589 : : hdr->nabortstats,
1590 : : abortstats,
1591 : : gid);
1592 : :
5275 rhaas@postgresql.org 1593 : 303 : ProcArrayRemove(proc, latestXid);
1594 : :
1595 : : /*
1596 : : * In case we fail while running the callbacks, mark the gxact invalid so
1597 : : * no one else will try to commit/rollback, and so it will be recycled if
1598 : : * we fail after this point. It is still locked by our backend so it
1599 : : * won't go away yet.
1600 : : *
1601 : : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1602 : : */
7627 tgl@sss.pgh.pa.us 1603 : 303 : gxact->valid = false;
1604 : :
1605 : : /*
1606 : : * We have to remove any files that were supposed to be dropped. For
1607 : : * consistency with the regular xact.c code paths, must do this before
1608 : : * releasing locks, so do it before running the callbacks.
1609 : : *
1610 : : * NB: this code knows that we couldn't be dropping any temp rels ...
1611 : : */
1612 [ + + ]: 303 : if (isCommit)
1613 : : {
6376 heikki.linnakangas@i 1614 : 255 : delrels = commitrels;
1615 : 255 : ndelrels = hdr->ncommitrels;
1616 : : }
1617 : : else
1618 : : {
1619 : 48 : delrels = abortrels;
1620 : 48 : ndelrels = hdr->nabortrels;
1621 : : }
1622 : :
1623 : : /* Make sure files supposed to be dropped are dropped */
2861 fujii@postgresql.org 1624 : 303 : DropRelationFiles(delrels, ndelrels, false);
1625 : :
1490 andres@anarazel.de 1626 [ + + ]: 303 : if (isCommit)
1627 : 255 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1628 : : else
1629 : 48 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1630 : :
1631 : : /*
1632 : : * Handle cache invalidation messages.
1633 : : *
1634 : : * Relcache init file invalidation requires processing both before and
1635 : : * after we send the SI messages, only when committing. See
1636 : : * AtEOXact_Inval().
1637 : : */
1727 michael@paquier.xyz 1638 [ + + ]: 303 : if (isCommit)
1639 : : {
1640 [ - + ]: 255 : if (hdr->initfileinval)
1727 michael@paquier.xyz 1641 :UBC 0 : RelationCacheInitFilePreInvalidate();
1727 michael@paquier.xyz 1642 :CBC 255 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1643 [ - + ]: 255 : if (hdr->initfileinval)
1727 michael@paquier.xyz 1644 :UBC 0 : RelationCacheInitFilePostInvalidate();
1645 : : }
1646 : :
1647 : : /*
1648 : : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1649 : : * while holding it to avoid potential conflicts with other transactions
1650 : : * attempting to use the same GID, so the lock is released once the shared
1651 : : * memory state is cleared.
1652 : : */
2626 michael@paquier.xyz 1653 :CBC 303 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1654 : :
1655 : : /* And now do the callbacks */
7626 tgl@sss.pgh.pa.us 1656 [ + + ]: 303 : if (isCommit)
302 michael@paquier.xyz 1657 :GNC 255 : ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
1658 : : else
1659 : 48 : ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
1660 : :
1661 : 303 : PredicateLockTwoPhaseFinish(fxid, isCommit);
1662 : :
1663 : : /*
1664 : : * Read this value while holding the two-phase lock, as the on-disk 2PC
1665 : : * file is physically removed after the lock is released.
1666 : : */
581 michael@paquier.xyz 1667 :CBC 303 : ondisk = gxact->ondisk;
1668 : :
1669 : : /* Clear shared memory state */
2626 1670 : 303 : RemoveGXact(gxact);
1671 : :
1672 : : /*
1673 : : * Release the lock as all callbacks are called and shared memory cleanup
1674 : : * is done.
1675 : : */
1676 : 303 : LWLockRelease(TwoPhaseStateLock);
1677 : :
1678 : : /* Count the prepared xact as committed or aborted */
2582 akapila@postgresql.o 1679 : 303 : AtEOXact_PgStat(isCommit, false);
1680 : :
1681 : : /*
1682 : : * And now we can clean up any files we may have left.
1683 : : */
581 michael@paquier.xyz 1684 [ + + ]: 303 : if (ondisk)
302 michael@paquier.xyz 1685 :GNC 25 : RemoveTwoPhaseFile(fxid, true);
1686 : :
4373 heikki.linnakangas@i 1687 :CBC 303 : MyLockedGxact = NULL;
1688 : :
2924 teodor@sigaev.ru 1689 [ - + ]: 303 : RESUME_INTERRUPTS();
1690 : :
7627 tgl@sss.pgh.pa.us 1691 : 303 : pfree(buf);
1692 : 303 : }
1693 : :
1694 : : /*
1695 : : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1696 : : */
1697 : : static void
302 michael@paquier.xyz 1698 :GNC 336 : ProcessRecords(char *bufptr, FullTransactionId fxid,
1699 : : const TwoPhaseCallback callbacks[])
1700 : : {
1701 : : for (;;)
7627 tgl@sss.pgh.pa.us 1702 :CBC 1490 : {
1703 : 1826 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1704 : :
1705 [ - + ]: 1826 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1706 [ + + ]: 1826 : if (record->rmid == TWOPHASE_RM_END_ID)
1707 : 336 : break;
1708 : :
1709 : 1490 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1710 : :
1711 [ + + ]: 1490 : if (callbacks[record->rmid] != NULL)
302 michael@paquier.xyz 1712 :GNC 1411 : callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
1713 : :
7627 tgl@sss.pgh.pa.us 1714 :CBC 1490 : bufptr += MAXALIGN(record->len);
1715 : : }
1716 : 336 : }
1717 : :
1718 : : /*
1719 : : * Remove the 2PC file.
1720 : : *
1721 : : * If giveWarning is false, do not complain about file-not-present;
1722 : : * this is an expected case during WAL replay.
1723 : : *
1724 : : * This routine is used at early stages at recovery where future and
1725 : : * past orphaned files are checked, hence the FullTransactionId to build
1726 : : * a complete file name fit for the removal.
1727 : : */
1728 : : static void
302 michael@paquier.xyz 1729 :GNC 30 : RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
1730 : : {
1731 : : char path[MAXPGPATH];
1732 : :
1733 : 30 : TwoPhaseFilePath(path, fxid);
7627 tgl@sss.pgh.pa.us 1734 [ - + ]:CBC 30 : if (unlink(path))
7627 tgl@sss.pgh.pa.us 1735 [ # # # # ]:UBC 0 : if (errno != ENOENT || giveWarning)
1736 [ # # ]: 0 : ereport(WARNING,
1737 : : (errcode_for_file_access(),
1738 : : errmsg("could not remove file \"%s\": %m", path)));
7627 tgl@sss.pgh.pa.us 1739 :CBC 30 : }
1740 : :
1741 : : /*
1742 : : * Recreates a state file. This is used in WAL replay and during
1743 : : * checkpoint creation.
1744 : : *
1745 : : * Note: content and len don't include CRC.
1746 : : */
1747 : : static void
302 michael@paquier.xyz 1748 :GNC 25 : RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
1749 : : {
1750 : : char path[MAXPGPATH];
1751 : : pg_crc32c statefile_crc;
1752 : : int fd;
1753 : :
1754 : : /* Recompute CRC */
4200 heikki.linnakangas@i 1755 :CBC 25 : INIT_CRC32C(statefile_crc);
1756 : 25 : COMP_CRC32C(statefile_crc, content, len);
1757 : 25 : FIN_CRC32C(statefile_crc);
1758 : :
302 michael@paquier.xyz 1759 :GNC 25 : TwoPhaseFilePath(path, fxid);
1760 : :
4907 heikki.linnakangas@i 1761 :CBC 25 : fd = OpenTransientFile(path,
1762 : : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
7627 tgl@sss.pgh.pa.us 1763 [ - + ]: 25 : if (fd < 0)
7627 tgl@sss.pgh.pa.us 1764 [ # # ]:UBC 0 : ereport(ERROR,
1765 : : (errcode_for_file_access(),
1766 : : errmsg("could not recreate file \"%s\": %m", path)));
1767 : :
1768 : : /* Write content and CRC */
2830 michael@paquier.xyz 1769 :CBC 25 : errno = 0;
3335 rhaas@postgresql.org 1770 : 25 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
7627 tgl@sss.pgh.pa.us 1771 [ - + ]: 25 : if (write(fd, content, len) != len)
1772 : : {
1773 : : /* if write didn't set errno, assume problem is no disk space */
2575 michael@paquier.xyz 1774 [ # # ]:UBC 0 : if (errno == 0)
1775 : 0 : errno = ENOSPC;
7627 tgl@sss.pgh.pa.us 1776 [ # # ]: 0 : ereport(ERROR,
1777 : : (errcode_for_file_access(),
1778 : : errmsg("could not write file \"%s\": %m", path)));
1779 : : }
4039 heikki.linnakangas@i 1780 [ - + ]:CBC 25 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1781 : : {
1782 : : /* if write didn't set errno, assume problem is no disk space */
2575 michael@paquier.xyz 1783 [ # # ]:UBC 0 : if (errno == 0)
1784 : 0 : errno = ENOSPC;
7627 tgl@sss.pgh.pa.us 1785 [ # # ]: 0 : ereport(ERROR,
1786 : : (errcode_for_file_access(),
1787 : : errmsg("could not write file \"%s\": %m", path)));
1788 : : }
3335 rhaas@postgresql.org 1789 :CBC 25 : pgstat_report_wait_end();
1790 : :
1791 : : /*
1792 : : * We must fsync the file because the end-of-replay checkpoint will not do
1793 : : * so, there being no GXACT in shared memory yet to tell it to.
1794 : : */
1795 : 25 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
7627 tgl@sss.pgh.pa.us 1796 [ - + ]: 25 : if (pg_fsync(fd) != 0)
7627 tgl@sss.pgh.pa.us 1797 [ # # ]:UBC 0 : ereport(ERROR,
1798 : : (errcode_for_file_access(),
1799 : : errmsg("could not fsync file \"%s\": %m", path)));
3335 rhaas@postgresql.org 1800 :CBC 25 : pgstat_report_wait_end();
1801 : :
4907 heikki.linnakangas@i 1802 [ - + ]: 25 : if (CloseTransientFile(fd) != 0)
7627 tgl@sss.pgh.pa.us 1803 [ # # ]:UBC 0 : ereport(ERROR,
1804 : : (errcode_for_file_access(),
1805 : : errmsg("could not close file \"%s\": %m", path)));
7627 tgl@sss.pgh.pa.us 1806 :CBC 25 : }
1807 : :
1808 : : /*
1809 : : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1810 : : *
1811 : : * We must fsync the state file of any GXACT that is valid or has been
1812 : : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1813 : : * horizon. (If the gxact isn't valid yet, has not been generated in
1814 : : * redo, or has a later LSN, this checkpoint is not responsible for
1815 : : * fsyncing it.)
1816 : : *
1817 : : * This is deliberately run as late as possible in the checkpoint sequence,
1818 : : * because GXACTs ordinarily have short lifespans, and so it is quite
1819 : : * possible that GXACTs that were valid at checkpoint start will no longer
1820 : : * exist if we wait a little bit. With typical checkpoint settings this
1821 : : * will be about 3 minutes for an online checkpoint, so as a result we
1822 : : * expect that there will be no GXACTs that need to be copied to disk.
1823 : : *
1824 : : * If a GXACT remains valid across multiple checkpoints, it will already
1825 : : * be on disk so we don't bother to repeat that write.
1826 : : */
1827 : : void
7625 1828 : 1944 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1829 : : {
1830 : : int i;
3758 simon@2ndQuadrant.co 1831 : 1944 : int serialized_xacts = 0;
1832 : :
7625 tgl@sss.pgh.pa.us 1833 [ + + ]: 1944 : if (max_prepared_xacts <= 0)
1834 : 1387 : return; /* nothing to do */
1835 : :
1836 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1837 : :
1838 : : /*
1839 : : * We are expecting there to be zero GXACTs that need to be copied to
1840 : : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1841 : : * simplicity. This prevents any new xacts from preparing while this
1842 : : * occurs, which shouldn't be a problem since the presence of long-lived
1843 : : * prepared xacts indicates the transaction manager isn't active.
1844 : : *
1845 : : * It's also possible to move I/O out of the lock, but on every error we
1846 : : * should check whether somebody committed our transaction in different
1847 : : * backend. Let's leave this optimization for future, if somebody will
1848 : : * spot that this place cause bottleneck.
1849 : : *
1850 : : * Note that it isn't possible for there to be a GXACT with a
1851 : : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1852 : : * because of the efforts with delayChkptFlags.
1853 : : */
1854 : 557 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1855 [ + + ]: 590 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1856 : : {
1857 : : /*
1858 : : * Note that we are using gxact not PGPROC so this works in recovery
1859 : : * also
1860 : : */
7507 bruce@momjian.us 1861 : 33 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1862 : :
3318 simon@2ndQuadrant.co 1863 [ + + + - ]: 33 : if ((gxact->valid || gxact->inredo) &&
3758 1864 [ + + ]: 33 : !gxact->ondisk &&
1865 [ + + ]: 29 : gxact->prepare_end_lsn <= redo_horizon)
1866 : : {
1867 : : char *buf;
1868 : : int len;
1869 : :
1870 : 25 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
302 michael@paquier.xyz 1871 :GNC 25 : RecreateTwoPhaseFile(gxact->fxid, buf, len);
3758 simon@2ndQuadrant.co 1872 :CBC 25 : gxact->ondisk = true;
3318 1873 : 25 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1874 : 25 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
3758 1875 : 25 : pfree(buf);
1876 : 25 : serialized_xacts++;
1877 : : }
1878 : : }
1879 : 557 : LWLockRelease(TwoPhaseStateLock);
1880 : :
1881 : : /*
1882 : : * Flush unconditionally the parent directory to make any information
1883 : : * durable on disk. Two-phase files could have been removed and those
1884 : : * removals need to be made persistent as well as any files newly created
1885 : : * previously since the last checkpoint.
1886 : : */
3326 teodor@sigaev.ru 1887 : 557 : fsync_fname(TWOPHASE_DIR, true);
1888 : :
1889 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1890 : :
3758 simon@2ndQuadrant.co 1891 [ + - + + ]: 557 : if (log_checkpoints && serialized_xacts > 0)
1892 [ + - ]: 21 : ereport(LOG,
1893 : : (errmsg_plural("%u two-phase state file was written "
1894 : : "for a long-running prepared transaction",
1895 : : "%u two-phase state files were written "
1896 : : "for long-running prepared transactions",
1897 : : serialized_xacts,
1898 : : serialized_xacts)));
1899 : : }
1900 : :
1901 : : /*
1902 : : * restoreTwoPhaseData
1903 : : *
1904 : : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1905 : : * This is called once at the beginning of recovery, saving any extra
1906 : : * lookups in the future. Two-phase files that are newer than the
1907 : : * minimum XID horizon are discarded on the way.
1908 : : */
1909 : : void
3318 1910 : 1077 : restoreTwoPhaseData(void)
1911 : : {
1912 : : DIR *cldir;
1913 : : struct dirent *clde;
1914 : :
3247 alvherre@alvh.no-ip. 1915 : 1077 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3074 tgl@sss.pgh.pa.us 1916 : 1077 : cldir = AllocateDir(TWOPHASE_DIR);
3318 simon@2ndQuadrant.co 1917 [ + + ]: 3247 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1918 : : {
888 akorotkov@postgresql 1919 [ + + ]: 2170 : if (strlen(clde->d_name) == 16 &&
1920 [ + - ]: 16 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1921 : : {
1922 : : FullTransactionId fxid;
1923 : : char *buf;
1924 : :
1925 : 16 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1926 : :
302 michael@paquier.xyz 1927 :GNC 16 : buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
1928 : : true, false, false);
3318 simon@2ndQuadrant.co 1929 [ - + ]:CBC 16 : if (buf == NULL)
3318 simon@2ndQuadrant.co 1930 :UBC 0 : continue;
1931 : :
302 michael@paquier.xyz 1932 :GNC 16 : PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
1933 : : InvalidXLogRecPtr, InvalidReplOriginId);
1934 : : }
1935 : : }
3247 alvherre@alvh.no-ip. 1936 :CBC 1077 : LWLockRelease(TwoPhaseStateLock);
3318 simon@2ndQuadrant.co 1937 : 1077 : FreeDir(cldir);
1938 : 1077 : }
1939 : :
1940 : : /*
1941 : : * PrescanPreparedTransactions
1942 : : *
1943 : : * Scan the shared memory entries of TwoPhaseState and determine the range
1944 : : * of valid XIDs present. This is run during database startup, after we
1945 : : * have completed reading WAL. TransamVariables->nextXid has been set to
1946 : : * one more than the highest XID for which evidence exists in WAL.
1947 : : *
1948 : : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1949 : : * are present, it suggests that the DBA has done a PITR recovery to an
1950 : : * earlier point in time without cleaning out pg_twophase. We dare not
1951 : : * try to recover such prepared xacts since they likely depend on database
1952 : : * state that doesn't exist now.
1953 : : *
1954 : : * However, we will advance nextXid beyond any subxact XIDs belonging to
1955 : : * valid prepared xacts. We need to do this since subxact commit doesn't
1956 : : * write a WAL entry, and so there might be no evidence in WAL of those
1957 : : * subxact XIDs.
1958 : : *
1959 : : * On corrupted two-phase files, fail immediately. Keeping around broken
1960 : : * entries and let replay continue causes harm on the system, and a new
1961 : : * backup should be rolled in.
1962 : : *
1963 : : * Our other responsibility is to determine and return the oldest valid XID
1964 : : * among the prepared xacts (if none, return TransamVariables->nextXid).
1965 : : * This is needed to synchronize pg_subtrans startup properly.
1966 : : *
1967 : : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1968 : : * top-level xids is stored in *xids_p. The number of entries in the array
1969 : : * is returned in *nxids_p.
1970 : : */
1971 : : TransactionId
5981 1972 : 1077 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1973 : : {
879 heikki.linnakangas@i 1974 : 1077 : FullTransactionId nextXid = TransamVariables->nextXid;
2093 andres@anarazel.de 1975 : 1077 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
7627 tgl@sss.pgh.pa.us 1976 : 1077 : TransactionId result = origNextXid;
5981 simon@2ndQuadrant.co 1977 : 1077 : TransactionId *xids = NULL;
1978 : 1077 : int nxids = 0;
1979 : 1077 : int allocsize = 0;
1980 : : int i;
1981 : :
3247 alvherre@alvh.no-ip. 1982 : 1077 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3318 simon@2ndQuadrant.co 1983 [ + + ]: 1131 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1984 : : {
1985 : : TransactionId xid;
1986 : : char *buf;
1987 : 54 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1988 : :
1989 [ - + ]: 54 : Assert(gxact->inredo);
1990 : :
302 michael@paquier.xyz 1991 :GNC 54 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
1992 : : gxact->prepare_start_lsn,
3275 bruce@momjian.us 1993 :CBC 54 : gxact->ondisk, false, true);
1994 : :
3318 simon@2ndQuadrant.co 1995 [ - + ]: 54 : if (buf == NULL)
3318 simon@2ndQuadrant.co 1996 :UBC 0 : continue;
1997 : :
1998 : : /*
1999 : : * OK, we think this file is valid. Incorporate xid into the
2000 : : * running-minimum result.
2001 : : */
302 michael@paquier.xyz 2002 :GNC 54 : xid = XidFromFullTransactionId(gxact->fxid);
3304 simon@2ndQuadrant.co 2003 [ + + ]:CBC 54 : if (TransactionIdPrecedes(xid, result))
2004 : 46 : result = xid;
2005 : :
3318 2006 [ + + ]: 54 : if (xids_p)
2007 : : {
2008 [ + + ]: 21 : if (nxids == allocsize)
2009 : : {
2010 [ + - ]: 17 : if (nxids == 0)
2011 : : {
2012 : 17 : allocsize = 10;
2013 : 17 : xids = palloc(allocsize * sizeof(TransactionId));
2014 : : }
2015 : : else
2016 : : {
3318 simon@2ndQuadrant.co 2017 :UBC 0 : allocsize = allocsize * 2;
2018 : 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2019 : : }
2020 : : }
3318 simon@2ndQuadrant.co 2021 :CBC 21 : xids[nxids++] = xid;
2022 : : }
2023 : :
2024 : 54 : pfree(buf);
2025 : : }
2026 : 1077 : LWLockRelease(TwoPhaseStateLock);
2027 : :
5981 2028 [ + + ]: 1077 : if (xids_p)
2029 : : {
2030 : 67 : *xids_p = xids;
2031 : 67 : *nxids_p = nxids;
2032 : : }
2033 : :
7627 tgl@sss.pgh.pa.us 2034 : 1077 : return result;
2035 : : }
2036 : :
2037 : : /*
2038 : : * StandbyRecoverPreparedTransactions
2039 : : *
2040 : : * Scan the shared memory entries of TwoPhaseState and setup all the required
2041 : : * information to allow standby queries to treat prepared transactions as still
2042 : : * active.
2043 : : *
2044 : : * This is never called at the end of recovery - we use
2045 : : * RecoverPreparedTransactions() at that point.
2046 : : *
2047 : : * This updates pg_subtrans, so that any subtransactions will be correctly
2048 : : * seen as in-progress in snapshots taken during recovery.
2049 : : */
2050 : : void
3295 simon@2ndQuadrant.co 2051 : 67 : StandbyRecoverPreparedTransactions(void)
2052 : : {
2053 : : int i;
2054 : :
3247 alvherre@alvh.no-ip. 2055 : 67 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3318 simon@2ndQuadrant.co 2056 [ + + ]: 88 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2057 : : {
2058 : : char *buf;
2059 : 21 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2060 : :
2061 [ - + ]: 21 : Assert(gxact->inredo);
2062 : :
302 michael@paquier.xyz 2063 :GNC 21 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2064 : : gxact->prepare_start_lsn,
677 heikki.linnakangas@i 2065 :CBC 21 : gxact->ondisk, true, false);
3318 simon@2ndQuadrant.co 2066 [ + - ]: 21 : if (buf != NULL)
3526 2067 : 21 : pfree(buf);
2068 : : }
3318 2069 : 67 : LWLockRelease(TwoPhaseStateLock);
5866 heikki.linnakangas@i 2070 : 67 : }
2071 : :
2072 : : /*
2073 : : * RecoverPreparedTransactions
2074 : : *
2075 : : * Scan the shared memory entries of TwoPhaseState and reload the state for
2076 : : * each prepared transaction (reacquire locks, etc).
2077 : : *
2078 : : * This is run at the end of recovery, but before we allow backends to write
2079 : : * WAL.
2080 : : *
2081 : : * At the end of recovery the way we take snapshots will change. We now need
2082 : : * to mark all running transactions with their full SubTransSetParent() info
2083 : : * to allow normal snapshots to work correctly if snapshots overflow.
2084 : : * We do this here because by definition prepared transactions are the only
2085 : : * type of write transaction still running, so this is necessary and
2086 : : * complete.
2087 : : */
2088 : : void
7627 tgl@sss.pgh.pa.us 2089 : 1010 : RecoverPreparedTransactions(void)
2090 : : {
2091 : : int i;
2092 : :
3247 alvherre@alvh.no-ip. 2093 : 1010 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3318 simon@2ndQuadrant.co 2094 [ + + ]: 1043 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2095 : : {
2096 : : char *buf;
2097 : 33 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
302 michael@paquier.xyz 2098 :GNC 33 : FullTransactionId fxid = gxact->fxid;
2099 : : char *bufptr;
2100 : : TwoPhaseFileHeader *hdr;
2101 : : TransactionId *subxids;
2102 : : const char *gid;
2103 : :
2104 : : /*
2105 : : * Reconstruct subtrans state for the transaction --- needed because
2106 : : * pg_subtrans is not preserved over a restart. Note that we are
2107 : : * linking all the subtransactions directly to the top-level XID;
2108 : : * there may originally have been a more complex hierarchy, but
2109 : : * there's no need to restore that exactly. It's possible that
2110 : : * SubTransSetParent has been set before, if the prepared transaction
2111 : : * generated xid assignment records.
2112 : : */
2113 : 33 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2114 : : gxact->prepare_start_lsn,
3275 bruce@momjian.us 2115 :CBC 33 : gxact->ondisk, true, false);
3318 simon@2ndQuadrant.co 2116 [ - + ]: 33 : if (buf == NULL)
3318 simon@2ndQuadrant.co 2117 :UBC 0 : continue;
2118 : :
3318 simon@2ndQuadrant.co 2119 [ + - ]:CBC 33 : ereport(LOG,
2120 : : (errmsg("recovering prepared transaction %u of epoch %u from shared memory",
2121 : : XidFromFullTransactionId(gxact->fxid),
2122 : : EpochFromFullTransactionId(gxact->fxid))));
2123 : :
2124 : 33 : hdr = (TwoPhaseFileHeader *) buf;
302 michael@paquier.xyz 2125 [ - + ]:GNC 33 : Assert(TransactionIdEquals(hdr->xid,
2126 : : XidFromFullTransactionId(gxact->fxid)));
3318 simon@2ndQuadrant.co 2127 :CBC 33 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2128 : 33 : gid = (const char *) bufptr;
2129 : 33 : bufptr += MAXALIGN(hdr->gidlen);
2130 : 33 : subxids = (TransactionId *) bufptr;
2131 : 33 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1399 rhaas@postgresql.org 2132 : 33 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2133 : 33 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1490 andres@anarazel.de 2134 : 33 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2135 : 33 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
3318 simon@2ndQuadrant.co 2136 : 33 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2137 : :
2138 : : /*
2139 : : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2140 : : * added in redo and already has a shmem entry for it.
2141 : : */
302 michael@paquier.xyz 2142 :GNC 33 : MarkAsPreparingGuts(gxact, gxact->fxid, gid,
2143 : : hdr->prepared_at,
2144 : : hdr->owner, hdr->database);
2145 : :
2146 : : /* recovered, so reset the flag for entries generated by redo */
3318 simon@2ndQuadrant.co 2147 :CBC 33 : gxact->inredo = false;
2148 : :
2149 : 33 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
3247 alvherre@alvh.no-ip. 2150 : 33 : MarkAsPrepared(gxact, true);
2151 : :
2152 : 33 : LWLockRelease(TwoPhaseStateLock);
2153 : :
2154 : : /*
2155 : : * Recover other state (notably locks) using resource managers.
2156 : : */
302 michael@paquier.xyz 2157 :GNC 33 : ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
2158 : :
2159 : : /*
2160 : : * Release locks held by the standby process after we process each
2161 : : * prepared transaction. As a result, we don't need too many
2162 : : * additional locks at any one time.
2163 : : */
3318 simon@2ndQuadrant.co 2164 [ + + ]:CBC 33 : if (InHotStandby)
302 michael@paquier.xyz 2165 :GNC 7 : StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
2166 : :
2167 : : /*
2168 : : * We're done with recovering this transaction. Clear MyLockedGxact,
2169 : : * like we do in PrepareTransaction() during normal operation.
2170 : : */
3318 simon@2ndQuadrant.co 2171 :CBC 33 : PostPrepare_Twophase();
2172 : :
2173 : 33 : pfree(buf);
2174 : :
3247 alvherre@alvh.no-ip. 2175 : 33 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2176 : : }
2177 : :
2178 : 1010 : LWLockRelease(TwoPhaseStateLock);
3318 simon@2ndQuadrant.co 2179 : 1010 : }
2180 : :
2181 : : /*
2182 : : * ProcessTwoPhaseBuffer
2183 : : *
2184 : : * Given a FullTransactionId, read it either from disk or read it directly
2185 : : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2186 : : *
2187 : : * If setParent is true, set up subtransaction parent linkages.
2188 : : *
2189 : : * If setNextXid is true, set TransamVariables->nextXid to the newest
2190 : : * value scanned.
2191 : : */
2192 : : static char *
302 michael@paquier.xyz 2193 :GNC 124 : ProcessTwoPhaseBuffer(FullTransactionId fxid,
2194 : : XLogRecPtr prepare_start_lsn,
2195 : : bool fromdisk,
2196 : : bool setParent, bool setNextXid)
2197 : : {
879 heikki.linnakangas@i 2198 :CBC 124 : FullTransactionId nextXid = TransamVariables->nextXid;
2199 : : TransactionId *subxids;
2200 : : char *buf;
2201 : : TwoPhaseFileHeader *hdr;
2202 : : int i;
2203 : :
3247 alvherre@alvh.no-ip. 2204 [ - + ]: 124 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2205 : :
3318 simon@2ndQuadrant.co 2206 [ + + ]: 124 : if (!fromdisk)
180 alvherre@kurilemu.de 2207 [ - + ]:GNC 74 : Assert(XLogRecPtrIsValid(prepare_start_lsn));
2208 : :
2209 : : /* Already processed? */
302 michael@paquier.xyz 2210 [ + - - + ]: 248 : if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
2211 : 124 : TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
2212 : : {
3318 simon@2ndQuadrant.co 2213 [ # # ]:UBC 0 : if (fromdisk)
2214 : : {
2215 [ # # ]: 0 : ereport(WARNING,
2216 : : (errmsg("removing stale two-phase state file for transaction %u of epoch %u",
2217 : : XidFromFullTransactionId(fxid),
2218 : : EpochFromFullTransactionId(fxid))));
302 michael@paquier.xyz 2219 :UNC 0 : RemoveTwoPhaseFile(fxid, true);
2220 : : }
2221 : : else
2222 : : {
491 michael@paquier.xyz 2223 [ # # ]:UBC 0 : ereport(WARNING,
2224 : : (errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
2225 : : XidFromFullTransactionId(fxid),
2226 : : EpochFromFullTransactionId(fxid))));
302 michael@paquier.xyz 2227 :UNC 0 : PrepareRedoRemoveFull(fxid, true);
2228 : : }
491 michael@paquier.xyz 2229 :UBC 0 : return NULL;
2230 : : }
2231 : :
2232 : : /* Reject XID if too new */
302 michael@paquier.xyz 2233 [ - + ]:GNC 124 : if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
2234 : : {
3318 simon@2ndQuadrant.co 2235 [ # # ]:UBC 0 : if (fromdisk)
2236 : : {
2237 [ # # ]: 0 : ereport(WARNING,
2238 : : (errmsg("removing future two-phase state file for transaction %u of epoch %u",
2239 : : XidFromFullTransactionId(fxid),
2240 : : EpochFromFullTransactionId(fxid))));
302 michael@paquier.xyz 2241 :UNC 0 : RemoveTwoPhaseFile(fxid, true);
2242 : : }
2243 : : else
2244 : : {
3318 simon@2ndQuadrant.co 2245 [ # # ]:UBC 0 : ereport(WARNING,
2246 : : (errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
2247 : : XidFromFullTransactionId(fxid),
2248 : : EpochFromFullTransactionId(fxid))));
302 michael@paquier.xyz 2249 :UNC 0 : PrepareRedoRemoveFull(fxid, true);
2250 : : }
3318 simon@2ndQuadrant.co 2251 :UBC 0 : return NULL;
2252 : : }
2253 : :
3318 simon@2ndQuadrant.co 2254 [ + + ]:CBC 124 : if (fromdisk)
2255 : : {
2256 : : /* Read and validate file */
302 michael@paquier.xyz 2257 :GNC 50 : buf = ReadTwoPhaseFile(fxid, false);
2258 : : }
2259 : : else
2260 : : {
2261 : : /* Read xlog data */
3318 simon@2ndQuadrant.co 2262 :CBC 74 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2263 : : }
2264 : :
2265 : : /* Deconstruct header */
2266 : 124 : hdr = (TwoPhaseFileHeader *) buf;
302 michael@paquier.xyz 2267 [ - + ]:GNC 124 : if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
2268 : : {
3318 simon@2ndQuadrant.co 2269 [ # # ]:UBC 0 : if (fromdisk)
2797 michael@paquier.xyz 2270 [ # # ]: 0 : ereport(ERROR,
2271 : : (errcode(ERRCODE_DATA_CORRUPTED),
2272 : : errmsg("corrupted two-phase state file for transaction %u of epoch %u",
2273 : : XidFromFullTransactionId(fxid),
2274 : : EpochFromFullTransactionId(fxid))));
2275 : : else
2276 [ # # ]: 0 : ereport(ERROR,
2277 : : (errcode(ERRCODE_DATA_CORRUPTED),
2278 : : errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
2279 : : XidFromFullTransactionId(fxid),
2280 : : EpochFromFullTransactionId(fxid))));
2281 : : }
2282 : :
2283 : : /*
2284 : : * Examine subtransaction XIDs ... they should all follow main XID, and
2285 : : * they may force us to advance nextXid.
2286 : : */
3318 simon@2ndQuadrant.co 2287 :CBC 124 : subxids = (TransactionId *) (buf +
2288 : 124 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2289 : 124 : MAXALIGN(hdr->gidlen));
2290 [ + + ]: 1911 : for (i = 0; i < hdr->nsubxacts; i++)
2291 : : {
2292 : 1787 : TransactionId subxid = subxids[i];
2293 : :
302 michael@paquier.xyz 2294 [ - + ]:GNC 1787 : Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
2295 : :
2296 : : /* update nextXid if needed */
2595 tmunro@postgresql.or 2297 [ + + ]:CBC 1787 : if (setNextXid)
2298 : 823 : AdvanceNextFullTransactionIdPastXid(subxid);
2299 : :
3318 simon@2ndQuadrant.co 2300 [ + + ]: 1787 : if (setParent)
302 michael@paquier.xyz 2301 :GNC 823 : SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
2302 : : }
2303 : :
3318 simon@2ndQuadrant.co 2304 :CBC 124 : return buf;
2305 : : }
2306 : :
2307 : :
2308 : : /*
2309 : : * RecordTransactionCommitPrepared
2310 : : *
2311 : : * This is basically the same as RecordTransactionCommit (q.v. if you change
2312 : : * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
2313 : : * race condition.
2314 : : *
2315 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2316 : : * so it is never possible to optimize out the commit record.
2317 : : */
2318 : : static void
7627 tgl@sss.pgh.pa.us 2319 : 255 : RecordTransactionCommitPrepared(TransactionId xid,
2320 : : int nchildren,
2321 : : TransactionId *children,
2322 : : int nrels,
2323 : : RelFileLocator *rels,
2324 : : int nstats,
2325 : : xl_xact_stats_item *stats,
2326 : : int ninvalmsgs,
2327 : : SharedInvalidationMessage *invalmsgs,
2328 : : bool initfileinval,
2329 : : const char *gid)
2330 : : {
2331 : : XLogRecPtr recptr;
2332 : : TimestampTz committs;
2333 : : bool replorigin;
2334 : :
2335 : : /*
2336 : : * Are we using the replication origins feature? Or, in other words, are
2337 : : * we replaying remote actions?
2338 : : */
97 msawada@postgresql.o 2339 [ + + ]:GNC 278 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2340 [ + - ]: 23 : replorigin_xact_state.origin != DoNotReplicateId);
2341 : :
2342 : : /* Load the injection point before entering the critical section */
239 akapila@postgresql.o 2343 : 255 : INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
2344 : :
7627 tgl@sss.pgh.pa.us 2345 :CBC 255 : START_CRIT_SECTION();
2346 : :
2347 : : /* See notes in RecordTransactionCommit */
286 akapila@postgresql.o 2348 [ - + ]:GNC 255 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
2349 : 255 : MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
2350 : :
239 2351 : 255 : INJECTION_POINT_CACHED("commit-after-delay-checkpoint", NULL);
2352 : :
2353 : : /*
2354 : : * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
2355 : : * commit time is written.
2356 : : */
286 2357 : 255 : pg_write_barrier();
2358 : :
2359 : : /*
2360 : : * Note it is important to set committs value after marking ourselves as
2361 : : * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
2362 : : * we want to ensure all transactions that have acquired commit timestamp
2363 : : * are finished before we allow the logical replication client to advance
2364 : : * its xid which is used to hold back dead rows for conflict detection.
2365 : : * See comments atop worker.c.
2366 : : */
2367 : 255 : committs = GetCurrentTimestamp();
2368 : :
2369 : : /*
2370 : : * Emit the XLOG commit record. Note that we mark 2PC commits as
2371 : : * potentially having AccessExclusiveLocks since we don't know whether or
2372 : : * not they do.
2373 : : */
3871 alvherre@alvh.no-ip. 2374 :CBC 255 : recptr = XactLogCommitRecord(committs,
2375 : : nchildren, children, nrels, rels,
2376 : : nstats, stats,
2377 : : ninvalmsgs, invalmsgs,
2378 : : initfileinval,
3240 tgl@sss.pgh.pa.us 2379 : 255 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2380 : : xid, gid);
2381 : :
2382 : :
3871 alvherre@alvh.no-ip. 2383 [ + + ]: 255 : if (replorigin)
2384 : : /* Move LSNs forward for this replication origin */
97 msawada@postgresql.o 2385 :GNC 23 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2386 : : XactLastRecEnd);
2387 : :
2388 : : /*
2389 : : * Record commit timestamp. The value comes from plain commit timestamp
2390 : : * if replorigin is not enabled, or replorigin already set a value for us
2391 : : * in replorigin_xact_state.origin_timestamp otherwise.
2392 : : *
2393 : : * We don't need to WAL-log anything here, as the commit record written
2394 : : * above already contains the data.
2395 : : */
2396 [ + + - + ]: 255 : if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
2397 : 232 : replorigin_xact_state.origin_timestamp = committs;
2398 : :
3871 alvherre@alvh.no-ip. 2399 :CBC 255 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2400 : : replorigin_xact_state.origin_timestamp,
97 msawada@postgresql.o 2401 :GNC 255 : replorigin_xact_state.origin);
2402 : :
2403 : : /*
2404 : : * We don't currently try to sleep before flush here ... nor is there any
2405 : : * support for async commit of a prepared xact (the very idea is probably
2406 : : * a contradiction)
2407 : : */
2408 : :
2409 : : /* Flush XLOG to disk */
7627 tgl@sss.pgh.pa.us 2410 :CBC 255 : XLogFlush(recptr);
2411 : :
2412 : : /* Mark the transaction committed in pg_xact */
6406 alvherre@alvh.no-ip. 2413 : 255 : TransactionIdCommitTree(xid, nchildren, children);
2414 : :
2415 : : /* Checkpoint can proceed now */
286 akapila@postgresql.o 2416 :GNC 255 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
2417 : :
7627 tgl@sss.pgh.pa.us 2418 [ - + ]:CBC 255 : END_CRIT_SECTION();
2419 : :
2420 : : /*
2421 : : * Wait for synchronous replication, if required.
2422 : : *
2423 : : * Note that at this stage we have marked clog, but still show as running
2424 : : * in the procarray and continue to hold locks.
2425 : : */
3689 rhaas@postgresql.org 2426 : 255 : SyncRepWaitForLSN(recptr, true);
7627 tgl@sss.pgh.pa.us 2427 : 255 : }
2428 : :
2429 : : /*
2430 : : * RecordTransactionAbortPrepared
2431 : : *
2432 : : * This is basically the same as RecordTransactionAbort.
2433 : : *
2434 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2435 : : * so it is never possible to optimize out the abort record.
2436 : : */
2437 : : static void
2438 : 48 : RecordTransactionAbortPrepared(TransactionId xid,
2439 : : int nchildren,
2440 : : TransactionId *children,
2441 : : int nrels,
2442 : : RelFileLocator *rels,
2443 : : int nstats,
2444 : : xl_xact_stats_item *stats,
2445 : : const char *gid)
2446 : : {
2447 : : XLogRecPtr recptr;
2448 : : bool replorigin;
2449 : :
2450 : : /*
2451 : : * Are we using the replication origins feature? Or, in other words, are
2452 : : * we replaying remote actions?
2453 : : */
97 msawada@postgresql.o 2454 [ + + ]:GNC 54 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2455 [ + - ]: 6 : replorigin_xact_state.origin != DoNotReplicateId);
2456 : :
2457 : : /*
2458 : : * Catch the scenario where we aborted partway through
2459 : : * RecordTransactionCommitPrepared ...
2460 : : */
7627 tgl@sss.pgh.pa.us 2461 [ - + ]:CBC 48 : if (TransactionIdDidCommit(xid))
7627 tgl@sss.pgh.pa.us 2462 [ # # ]:UBC 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2463 : : xid);
2464 : :
7627 tgl@sss.pgh.pa.us 2465 :CBC 48 : START_CRIT_SECTION();
2466 : :
2467 : : /*
2468 : : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2469 : : * potentially having AccessExclusiveLocks since we don't know whether or
2470 : : * not they do.
2471 : : */
4069 andres@anarazel.de 2472 : 48 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2473 : : nchildren, children,
2474 : : nrels, rels,
2475 : : nstats, stats,
3240 tgl@sss.pgh.pa.us 2476 : 48 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2477 : : xid, gid);
2478 : :
1884 akapila@postgresql.o 2479 [ + + ]: 48 : if (replorigin)
2480 : : /* Move LSNs forward for this replication origin */
97 msawada@postgresql.o 2481 :GNC 6 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2482 : : XactLastRecEnd);
2483 : :
2484 : : /* Always flush, since we're about to remove the 2PC state file */
7627 tgl@sss.pgh.pa.us 2485 :CBC 48 : XLogFlush(recptr);
2486 : :
2487 : : /*
2488 : : * Mark the transaction aborted in clog. This is not absolutely necessary
2489 : : * but we may as well do it while we are here.
2490 : : */
6406 alvherre@alvh.no-ip. 2491 : 48 : TransactionIdAbortTree(xid, nchildren, children);
2492 : :
7627 tgl@sss.pgh.pa.us 2493 [ - + ]: 48 : END_CRIT_SECTION();
2494 : :
2495 : : /*
2496 : : * Wait for synchronous replication, if required.
2497 : : *
2498 : : * Note that at this stage we have marked clog, but still show as running
2499 : : * in the procarray and continue to hold locks.
2500 : : */
3689 rhaas@postgresql.org 2501 : 48 : SyncRepWaitForLSN(recptr, false);
7627 tgl@sss.pgh.pa.us 2502 : 48 : }
2503 : :
2504 : : /*
2505 : : * PrepareRedoAdd
2506 : : *
2507 : : * Store pointers to the start/end of the WAL record along with the xid in
2508 : : * a gxact entry in shared memory TwoPhaseState structure. If caller
2509 : : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2510 : : * data, the entry is marked as located on disk.
2511 : : */
2512 : : void
302 michael@paquier.xyz 2513 :GNC 97 : PrepareRedoAdd(FullTransactionId fxid, char *buf,
2514 : : XLogRecPtr start_lsn, XLogRecPtr end_lsn,
2515 : : ReplOriginId origin_id)
2516 : : {
3318 simon@2ndQuadrant.co 2517 :CBC 97 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2518 : : char *bufptr;
2519 : : const char *gid;
2520 : : GlobalTransaction gxact;
2521 : :
3247 alvherre@alvh.no-ip. 2522 [ - + ]: 97 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
3318 simon@2ndQuadrant.co 2523 [ - + ]: 97 : Assert(RecoveryInProgress());
2524 : :
302 michael@paquier.xyz 2525 [ + + ]:GNC 97 : if (!FullTransactionIdIsValid(fxid))
2526 : : {
2527 [ - + ]: 81 : Assert(InRecovery);
2528 : 81 : fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
2529 : : hdr->xid);
2530 : : }
2531 : :
3318 simon@2ndQuadrant.co 2532 :CBC 97 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2533 : 97 : gid = (const char *) bufptr;
2534 : :
2535 : : /*
2536 : : * Reserve the GID for the given transaction in the redo code path.
2537 : : *
2538 : : * This creates a gxact struct and puts it into the active array.
2539 : : *
2540 : : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2541 : : * shared memory. Hence, we only fill up the bare minimum contents here.
2542 : : * The gxact also gets marked with gxact->inredo set to true to indicate
2543 : : * that it got added in the redo phase
2544 : : */
2545 : :
2546 : : /*
2547 : : * In the event of a crash while a checkpoint was running, it may be
2548 : : * possible that some two-phase data found its way to disk while its
2549 : : * corresponding record needs to be replayed in the follow-up recovery. As
2550 : : * the 2PC data was on disk, it has already been restored at the beginning
2551 : : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2552 : : * duplicates in TwoPhaseState. If a consistent state has been reached,
2553 : : * the record is added to TwoPhaseState and it should have no
2554 : : * corresponding file in pg_twophase.
2555 : : */
180 alvherre@kurilemu.de 2556 [ + + ]:GNC 97 : if (XLogRecPtrIsValid(start_lsn))
2557 : : {
2558 : : char path[MAXPGPATH];
2559 : :
302 michael@paquier.xyz 2560 [ - + ]: 81 : Assert(InRecovery);
2561 : 81 : TwoPhaseFilePath(path, fxid);
2562 : :
1022 michael@paquier.xyz 2563 [ - + ]:CBC 81 : if (access(path, F_OK) == 0)
2564 : : {
1022 michael@paquier.xyz 2565 [ # # # # ]:UBC 0 : ereport(reachedConsistency ? ERROR : WARNING,
2566 : : (errmsg("could not recover two-phase state file for transaction %u",
2567 : : hdr->xid),
2568 : : errdetail("Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2569 : : LSN_FORMAT_ARGS(start_lsn))));
2570 : 0 : return;
2571 : : }
2572 : :
1022 michael@paquier.xyz 2573 [ - + ]:CBC 81 : if (errno != ENOENT)
1022 michael@paquier.xyz 2574 [ # # ]:UBC 0 : ereport(ERROR,
2575 : : (errcode_for_file_access(),
2576 : : errmsg("could not access file \"%s\": %m", path)));
2577 : : }
2578 : :
2579 : : /* Get a free gxact from the freelist */
3318 simon@2ndQuadrant.co 2580 [ - + ]:CBC 97 : if (TwoPhaseState->freeGXacts == NULL)
3318 simon@2ndQuadrant.co 2581 [ # # ]:UBC 0 : ereport(ERROR,
2582 : : (errcode(ERRCODE_OUT_OF_MEMORY),
2583 : : errmsg("maximum number of prepared transactions reached"),
2584 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2585 : : max_prepared_xacts)));
3318 simon@2ndQuadrant.co 2586 :CBC 97 : gxact = TwoPhaseState->freeGXacts;
2587 : 97 : TwoPhaseState->freeGXacts = gxact->next;
2588 : :
2589 : 97 : gxact->prepared_at = hdr->prepared_at;
2590 : 97 : gxact->prepare_start_lsn = start_lsn;
2591 : 97 : gxact->prepare_end_lsn = end_lsn;
302 michael@paquier.xyz 2592 :GNC 97 : gxact->fxid = fxid;
3318 simon@2ndQuadrant.co 2593 :CBC 97 : gxact->owner = hdr->owner;
793 heikki.linnakangas@i 2594 : 97 : gxact->locking_backend = INVALID_PROC_NUMBER;
3318 simon@2ndQuadrant.co 2595 : 97 : gxact->valid = false;
180 alvherre@kurilemu.de 2596 :GNC 97 : gxact->ondisk = !XLogRecPtrIsValid(start_lsn);
3275 bruce@momjian.us 2597 :CBC 97 : gxact->inredo = true; /* yes, added in redo */
3318 simon@2ndQuadrant.co 2598 : 97 : strcpy(gxact->gid, gid);
2599 : :
2600 : : /* And insert it into the active array */
2601 [ - + ]: 97 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2602 : 97 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2603 : :
97 msawada@postgresql.o 2604 [ + + ]:GNC 97 : if (origin_id != InvalidReplOriginId)
2605 : : {
2606 : : /* recover apply progress */
2960 simon@2ndQuadrant.co 2607 :CBC 13 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2608 : : false /* backward */ , false /* WAL */ );
2609 : : }
2610 : :
302 michael@paquier.xyz 2611 [ - + ]:GNC 97 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
2612 : : XidFromFullTransactionId(gxact->fxid),
2613 : : EpochFromFullTransactionId(gxact->fxid));
2614 : : }
2615 : :
2616 : : /*
2617 : : * PrepareRedoRemoveFull
2618 : : *
2619 : : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2620 : : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2621 : : *
2622 : : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2623 : : * is updated.
2624 : : */
2625 : : static void
2626 : 71 : PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
2627 : : {
3318 simon@2ndQuadrant.co 2628 :CBC 71 : GlobalTransaction gxact = NULL;
2629 : : int i;
3304 2630 : 71 : bool found = false;
2631 : :
3247 alvherre@alvh.no-ip. 2632 [ - + ]: 71 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
3318 simon@2ndQuadrant.co 2633 [ - + ]: 71 : Assert(RecoveryInProgress());
2634 : :
2635 [ + + ]: 71 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2636 : : {
2637 : 62 : gxact = TwoPhaseState->prepXacts[i];
2638 : :
302 michael@paquier.xyz 2639 [ + - ]:GNC 62 : if (FullTransactionIdEquals(gxact->fxid, fxid))
2640 : : {
3318 simon@2ndQuadrant.co 2641 [ - + ]:CBC 62 : Assert(gxact->inredo);
3304 2642 : 62 : found = true;
3318 2643 : 62 : break;
2644 : : }
2645 : : }
2646 : :
2647 : : /*
2648 : : * Just leave if there is nothing, this is expected during WAL replay.
2649 : : */
3304 2650 [ + + ]: 71 : if (!found)
3318 2651 : 9 : return;
2652 : :
2653 : : /*
2654 : : * And now we can clean up any files we may have left.
2655 : : */
302 michael@paquier.xyz 2656 [ - + ]:GNC 62 : elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
2657 : : XidFromFullTransactionId(fxid),
2658 : : EpochFromFullTransactionId(fxid));
2659 : :
3318 simon@2ndQuadrant.co 2660 [ + + ]:CBC 62 : if (gxact->ondisk)
302 michael@paquier.xyz 2661 :GNC 5 : RemoveTwoPhaseFile(fxid, giveWarning);
2662 : :
3318 simon@2ndQuadrant.co 2663 :CBC 62 : RemoveGXact(gxact);
2664 : : }
2665 : :
2666 : : /*
2667 : : * Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
2668 : : */
2669 : : void
302 michael@paquier.xyz 2670 :GNC 71 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2671 : : {
2672 : : FullTransactionId fxid =
2673 : 71 : FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
2674 : :
2675 : 71 : PrepareRedoRemoveFull(fxid, giveWarning);
2676 : 71 : }
2677 : :
2678 : : /*
2679 : : * LookupGXact
2680 : : * Check if the prepared transaction with the given GID, lsn and timestamp
2681 : : * exists.
2682 : : *
2683 : : * Note that we always compare with the LSN where prepare ends because that is
2684 : : * what is stored as origin_lsn in the 2PC file.
2685 : : *
2686 : : * This function is primarily used to check if the prepared transaction
2687 : : * received from the upstream (remote node) already exists. Checking only GID
2688 : : * is not sufficient because a different prepared xact with the same GID can
2689 : : * exist on the same node. So, we are ensuring to match origin_lsn and
2690 : : * origin_timestamp of prepared xact to avoid the possibility of a match of
2691 : : * prepared xact from two different nodes.
2692 : : */
2693 : : bool
1756 akapila@postgresql.o 2694 :CBC 5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2695 : : TimestampTz origin_prepare_timestamp)
2696 : : {
2697 : : int i;
2698 : 5 : bool found = false;
2699 : :
2700 : 5 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2701 [ + - ]: 5 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2702 : : {
2703 : 5 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2704 : :
2705 : : /* Ignore not-yet-valid GIDs. */
2706 [ + - + - ]: 5 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2707 : : {
2708 : : char *buf;
2709 : : TwoPhaseFileHeader *hdr;
2710 : :
2711 : : /*
2712 : : * We are not expecting collisions of GXACTs (same gid) between
2713 : : * publisher and subscribers, so we perform all I/O while holding
2714 : : * TwoPhaseStateLock for simplicity.
2715 : : *
2716 : : * To move the I/O out of the lock, we need to ensure that no
2717 : : * other backend commits the prepared xact in the meantime. We can
2718 : : * do this optimization if we encounter many collisions in GID
2719 : : * between publisher and subscriber.
2720 : : */
2721 [ - + ]: 5 : if (gxact->ondisk)
302 michael@paquier.xyz 2722 :UNC 0 : buf = ReadTwoPhaseFile(gxact->fxid, false);
2723 : : else
2724 : : {
1756 akapila@postgresql.o 2725 [ - + ]:CBC 5 : Assert(gxact->prepare_start_lsn);
2726 : 5 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2727 : : }
2728 : :
2729 : 5 : hdr = (TwoPhaseFileHeader *) buf;
2730 : :
2731 [ + - ]: 5 : if (hdr->origin_lsn == prepare_end_lsn &&
2732 [ + - ]: 5 : hdr->origin_timestamp == origin_prepare_timestamp)
2733 : : {
2734 : 5 : found = true;
2735 : 5 : pfree(buf);
2736 : 5 : break;
2737 : : }
2738 : :
1756 akapila@postgresql.o 2739 :UBC 0 : pfree(buf);
2740 : : }
2741 : : }
1756 akapila@postgresql.o 2742 :CBC 5 : LWLockRelease(TwoPhaseStateLock);
2743 : 5 : return found;
2744 : : }
2745 : :
2746 : : /*
2747 : : * TwoPhaseTransactionGid
2748 : : * Form the prepared transaction GID for two_phase transactions.
2749 : : *
2750 : : * Return the GID in the supplied buffer.
2751 : : */
2752 : : void
650 2753 : 53 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2754 : : {
2755 [ - + ]: 53 : Assert(OidIsValid(subid));
2756 : :
2757 [ - + ]: 53 : if (!TransactionIdIsValid(xid))
650 akapila@postgresql.o 2758 [ # # ]:UBC 0 : ereport(ERROR,
2759 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2760 : : errmsg_internal("invalid two-phase transaction ID")));
2761 : :
650 akapila@postgresql.o 2762 :CBC 53 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2763 : 53 : }
2764 : :
2765 : : /*
2766 : : * IsTwoPhaseTransactionGidForSubid
2767 : : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2768 : : * for the specified 'subid'.
2769 : : */
2770 : : static bool
650 akapila@postgresql.o 2771 :UBC 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2772 : : {
2773 : : int ret;
2774 : : Oid subid_from_gid;
2775 : : TransactionId xid_from_gid;
2776 : : char gid_tmp[GIDSIZE];
2777 : :
2778 : : /* Extract the subid and xid from the given GID */
2779 : 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2780 : :
2781 : : /*
2782 : : * Check that the given GID has expected format, and at least the subid
2783 : : * matches.
2784 : : */
2785 [ # # # # ]: 0 : if (ret != 2 || subid != subid_from_gid)
2786 : 0 : return false;
2787 : :
2788 : : /*
2789 : : * Reconstruct a temporary GID based on the subid and xid extracted from
2790 : : * the given GID and check whether the temporary GID and the given GID
2791 : : * match.
2792 : : */
2793 : 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2794 : :
2795 : 0 : return strcmp(gid, gid_tmp) == 0;
2796 : : }
2797 : :
2798 : : /*
2799 : : * LookupGXactBySubid
2800 : : * Check if the prepared transaction done by apply worker exists.
2801 : : */
2802 : : bool
650 akapila@postgresql.o 2803 :CBC 1 : LookupGXactBySubid(Oid subid)
2804 : : {
2805 : 1 : bool found = false;
2806 : :
2807 : 1 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2808 [ - + ]: 1 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2809 : : {
650 akapila@postgresql.o 2810 :UBC 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2811 : :
2812 : : /* Ignore not-yet-valid GIDs. */
2813 [ # # # # ]: 0 : if (gxact->valid &&
2814 : 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2815 : : {
2816 : 0 : found = true;
2817 : 0 : break;
2818 : : }
2819 : : }
650 akapila@postgresql.o 2820 :CBC 1 : LWLockRelease(TwoPhaseStateLock);
2821 : :
2822 : 1 : return found;
2823 : : }
2824 : :
2825 : : /*
2826 : : * TwoPhaseGetOldestXidInCommit
2827 : : * Return the oldest transaction ID from prepared transactions that are
2828 : : * currently in the commit critical section.
2829 : : *
2830 : : * This function only considers transactions in the currently connected
2831 : : * database. If no matching transactions are found, it returns
2832 : : * InvalidTransactionId.
2833 : : */
2834 : : TransactionId
239 akapila@postgresql.o 2835 :GNC 1819 : TwoPhaseGetOldestXidInCommit(void)
2836 : : {
2837 : 1819 : TransactionId oldestRunningXid = InvalidTransactionId;
2838 : :
2839 : 1819 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2840 : :
2841 [ + + ]: 3583 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2842 : : {
2843 : 1764 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2844 : : PGPROC *commitproc;
2845 : : TransactionId xid;
2846 : :
2847 [ - + ]: 1764 : if (!gxact->valid)
239 akapila@postgresql.o 2848 :UNC 0 : continue;
2849 : :
239 akapila@postgresql.o 2850 [ - + ]:GNC 1764 : if (gxact->locking_backend == INVALID_PROC_NUMBER)
239 akapila@postgresql.o 2851 :UNC 0 : continue;
2852 : :
2853 : : /*
2854 : : * Get the backend that is handling the transaction. It's safe to
2855 : : * access this backend while holding TwoPhaseStateLock, as the backend
2856 : : * can only be destroyed after either removing or unlocking the
2857 : : * current global transaction, both of which require an exclusive
2858 : : * TwoPhaseStateLock.
2859 : : */
239 akapila@postgresql.o 2860 :GNC 1764 : commitproc = GetPGProcByNumber(gxact->locking_backend);
2861 : :
2862 [ - + ]: 1764 : if (MyDatabaseId != commitproc->databaseId)
239 akapila@postgresql.o 2863 :UNC 0 : continue;
2864 : :
239 akapila@postgresql.o 2865 [ - + ]:GNC 1764 : if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
239 akapila@postgresql.o 2866 :UNC 0 : continue;
2867 : :
239 akapila@postgresql.o 2868 :GNC 1764 : xid = XidFromFullTransactionId(gxact->fxid);
2869 : :
2870 [ - + - - ]: 1764 : if (!TransactionIdIsValid(oldestRunningXid) ||
239 akapila@postgresql.o 2871 :UNC 0 : TransactionIdPrecedes(xid, oldestRunningXid))
239 akapila@postgresql.o 2872 :GNC 1764 : oldestRunningXid = xid;
2873 : : }
2874 : :
2875 : 1819 : LWLockRelease(TwoPhaseStateLock);
2876 : :
2877 : 1819 : return oldestRunningXid;
2878 : : }
|