Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * snapbuild.c
4 : : *
5 : : * Infrastructure for building historic catalog snapshots based on contents
6 : : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : : * WAL.
8 : : *
9 : : * NOTES:
10 : : *
11 : : * We build snapshots which can *only* be used to read catalog contents and we
12 : : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : : * at the time the XLogRecord was generated.
15 : : *
16 : : * To build the snapshots we reuse the infrastructure built for Hot
17 : : * Standby. The in-memory snapshots we build look different than HS' because
18 : : * we have different needs. To successfully decode data from the WAL we only
19 : : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : : * tables since the data we decode is wholly contained in the WAL
21 : : * records. Also, our snapshots need to be different in comparison to normal
22 : : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : : * pg_subtrans for information about committed transactions because they might
24 : : * commit in the future from the POV of the WAL entry we're currently
25 : : * decoding. This definition has the advantage that we only need to prevent
26 : : * removal of catalog rows, while normal table's rows can still be
27 : : * removed. This is achieved by using the replication slot mechanism.
28 : : *
29 : : * As the percentage of transactions modifying the catalog normally is fairly
30 : : * small in comparisons to ones only manipulating user data, we keep track of
31 : : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : : * track of all running transactions like it's done in a normal snapshot. Note
33 : : * that we're generally only looking at transactions that have acquired an
34 : : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : : * that we consider committed, everything else is considered aborted/in
36 : : * progress. That also allows us not to care about subtransactions before they
37 : : * have committed which means this module, in contrast to HS, doesn't have to
38 : : * care about suboverflowed subtransactions and similar.
39 : : *
40 : : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : : * transactions we need Snapshots that see intermediate versions of the
42 : : * catalog in a transaction. During normal operation this is achieved by using
43 : : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : : * reset on crash recovery. Even if we could, we could not decode combocids
47 : : * which are only tracked in the original backend's memory. To work around
48 : : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : : * catalog row is modified, which includes the cmin and cmax of the
50 : : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : : * ResolveCminCmaxDuringDecoding() for details.
54 : : *
55 : : * To facilitate all this we need our own visibility routine, as the normal
56 : : * ones are optimized for different usecases.
57 : : *
58 : : * To replace the normal catalog snapshots with decoding ones use the
59 : : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : : *
61 : : *
62 : : *
63 : : * The snapbuild machinery is starting up in several stages, as illustrated
64 : : * by the following graph describing the SnapBuild->state transitions:
65 : : *
66 : : * +-------------------------+
67 : : * +----| START |-------------+
68 : : * | +-------------------------+ |
69 : : * | | |
70 : : * | | |
71 : : * | running_xacts #1 |
72 : : * | | |
73 : : * | | |
74 : : * | v |
75 : : * | +-------------------------+ v
76 : : * | | BUILDING_SNAPSHOT |------------>|
77 : : * | +-------------------------+ |
78 : : * | | |
79 : : * | | |
80 : : * | running_xacts #2, xacts from #1 finished |
81 : : * | | |
82 : : * | | |
83 : : * | v |
84 : : * | +-------------------------+ v
85 : : * | | FULL_SNAPSHOT |------------>|
86 : : * | +-------------------------+ |
87 : : * | | |
88 : : * running_xacts | saved snapshot
89 : : * with zero xacts | at running_xacts's lsn
90 : : * | | |
91 : : * | running_xacts with xacts from #2 finished |
92 : : * | | |
93 : : * | v |
94 : : * | +-------------------------+ |
95 : : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : : * +-------------------------+
97 : : *
98 : : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : : * record is read that is sufficiently new (above the safe xmin horizon),
100 : : * there's a state transition. If there were no running xacts when the
101 : : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : : * snapshot means that all transactions that start henceforth can be decoded
104 : : * in their entirety, but transactions that started previously can't. In
105 : : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : : * running transactions have committed or aborted.
107 : : *
108 : : * Only transactions that commit after CONSISTENT state has been reached will
109 : : * be replayed, even though they might have started while still in
110 : : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : : * changes has been exported, but all the following ones will be. That point
112 : : * is a convenient point to initialize replication from, which is why we
113 : : * export a snapshot at that point, which *can* be used to read normal data.
114 : : *
115 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
116 : : *
117 : : * IDENTIFICATION
118 : : * src/backend/replication/logical/snapbuild.c
119 : : *
120 : : *-------------------------------------------------------------------------
121 : : */
122 : :
123 : : #include "postgres.h"
124 : :
125 : : #include <sys/stat.h>
126 : : #include <unistd.h>
127 : :
128 : : #include "access/heapam_xlog.h"
129 : : #include "access/transam.h"
130 : : #include "access/xact.h"
131 : : #include "common/file_utils.h"
132 : : #include "miscadmin.h"
133 : : #include "pgstat.h"
134 : : #include "replication/logical.h"
135 : : #include "replication/reorderbuffer.h"
136 : : #include "replication/snapbuild.h"
137 : : #include "replication/snapbuild_internal.h"
138 : : #include "storage/fd.h"
139 : : #include "storage/lmgr.h"
140 : : #include "storage/proc.h"
141 : : #include "storage/procarray.h"
142 : : #include "storage/standby.h"
143 : : #include "utils/builtins.h"
144 : : #include "utils/memutils.h"
145 : : #include "utils/snapmgr.h"
146 : : #include "utils/snapshot.h"
147 : : #include "utils/wait_event.h"
148 : :
149 : :
150 : : /*
151 : : * Starting a transaction -- which we need to do while exporting a snapshot --
152 : : * removes knowledge about the previously used resowner, so we save it here.
153 : : */
154 : : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
155 : : static bool ExportInProgress = false;
156 : :
157 : : /* ->committed and ->catchange manipulation */
158 : : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
159 : :
160 : : /* snapshot building/manipulation/distribution functions */
161 : : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
162 : :
163 : : static void SnapBuildFreeSnapshot(Snapshot snap);
164 : :
165 : : static void SnapBuildSnapIncRefcount(Snapshot snap);
166 : :
167 : : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
168 : :
169 : : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
170 : : uint32 xinfo);
171 : :
172 : : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
173 : : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
174 : : xl_running_xacts *running);
175 : : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
176 : :
177 : : /* serialization functions */
178 : : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
179 : : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
180 : : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
181 : :
182 : : /*
183 : : * Allocate a new snapshot builder.
184 : : *
185 : : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
186 : : * removed, start_lsn is the LSN >= we want to replay commits.
187 : : */
188 : : SnapBuild *
4471 rhaas@postgresql.org 189 :CBC 1192 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
190 : : TransactionId xmin_horizon,
191 : : XLogRecPtr start_lsn,
192 : : bool need_full_snapshot,
193 : : bool in_slot_creation,
194 : : XLogRecPtr two_phase_at)
195 : : {
196 : : MemoryContext context;
197 : : MemoryContext oldcontext;
198 : : SnapBuild *builder;
199 : :
200 : : /* allocate memory in own context, to have better accountability */
201 : 1192 : context = AllocSetContextCreate(CurrentMemoryContext,
202 : : "snapshot builder context",
203 : : ALLOCSET_DEFAULT_SIZES);
204 : 1192 : oldcontext = MemoryContextSwitchTo(context);
205 : :
171 michael@paquier.xyz 206 :GNC 1192 : builder = palloc0_object(SnapBuild);
207 : :
4471 rhaas@postgresql.org 208 :CBC 1192 : builder->state = SNAPBUILD_START;
209 : 1192 : builder->context = context;
210 : 1192 : builder->reorder = reorder;
211 : : /* Other struct members initialized by zeroing via palloc0 above */
212 : :
213 : 1192 : builder->committed.xcnt = 0;
3265 tgl@sss.pgh.pa.us 214 : 1192 : builder->committed.xcnt_space = 128; /* arbitrary number */
4471 rhaas@postgresql.org 215 : 1192 : builder->committed.xip =
85 msawada@postgresql.o 216 :GNC 1192 : palloc0_array(TransactionId, builder->committed.xcnt_space);
4471 rhaas@postgresql.org 217 :CBC 1192 : builder->committed.includes_all_transactions = true;
218 : :
1388 akapila@postgresql.o 219 : 1192 : builder->catchange.xcnt = 0;
220 : 1192 : builder->catchange.xip = NULL;
221 : :
4471 rhaas@postgresql.org 222 : 1192 : builder->initial_xmin_horizon = xmin_horizon;
4377 andres@anarazel.de 223 : 1192 : builder->start_decoding_at = start_lsn;
688 msawada@postgresql.o 224 : 1192 : builder->in_slot_creation = in_slot_creation;
3320 andres@anarazel.de 225 : 1192 : builder->building_full_snapshot = need_full_snapshot;
1781 akapila@postgresql.o 226 : 1192 : builder->two_phase_at = two_phase_at;
227 : :
4471 rhaas@postgresql.org 228 : 1192 : MemoryContextSwitchTo(oldcontext);
229 : :
230 : 1192 : return builder;
231 : : }
232 : :
233 : : /*
234 : : * Free a snapshot builder.
235 : : */
236 : : void
237 : 944 : FreeSnapshotBuilder(SnapBuild *builder)
238 : : {
239 : 944 : MemoryContext context = builder->context;
240 : :
241 : : /* free snapshot explicitly, that contains some error checking */
242 [ + + ]: 944 : if (builder->snapshot != NULL)
243 : : {
244 : 235 : SnapBuildSnapDecRefcount(builder->snapshot);
245 : 235 : builder->snapshot = NULL;
246 : : }
247 : :
248 : : /* other resources are deallocated via memory context reset */
249 : 944 : MemoryContextDelete(context);
250 : 944 : }
251 : :
252 : : /*
253 : : * Free an unreferenced snapshot that has previously been built by us.
254 : : */
255 : : static void
256 : 1632 : SnapBuildFreeSnapshot(Snapshot snap)
257 : : {
258 : : /* make sure we don't get passed an external snapshot */
2686 andres@anarazel.de 259 [ - + ]: 1632 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
260 : :
261 : : /* make sure nobody modified our snapshot */
4471 rhaas@postgresql.org 262 [ - + ]: 1632 : Assert(snap->curcid == FirstCommandId);
263 [ - + ]: 1632 : Assert(!snap->suboverflowed);
264 [ - + ]: 1632 : Assert(!snap->takenDuringRecovery);
4062 heikki.linnakangas@i 265 [ - + ]: 1632 : Assert(snap->regd_count == 0);
266 : :
267 : : /* slightly more likely, so it's checked even without c-asserts */
4471 rhaas@postgresql.org 268 [ - + ]: 1632 : if (snap->copied)
4471 rhaas@postgresql.org 269 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
270 : :
4471 rhaas@postgresql.org 271 [ - + ]:CBC 1632 : if (snap->active_count)
4471 rhaas@postgresql.org 272 [ # # ]:UBC 0 : elog(ERROR, "cannot free an active snapshot");
273 : :
4471 rhaas@postgresql.org 274 :CBC 1632 : pfree(snap);
275 : 1632 : }
276 : :
277 : : /*
278 : : * In which state of snapshot building are we?
279 : : */
280 : : SnapBuildState
281 : 1968355 : SnapBuildCurrentState(SnapBuild *builder)
282 : : {
283 : 1968355 : return builder->state;
284 : : }
285 : :
286 : : /*
287 : : * Return the LSN at which the two-phase decoding was first enabled.
288 : : */
289 : : XLogRecPtr
1781 akapila@postgresql.o 290 : 35 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
291 : : {
292 : 35 : return builder->two_phase_at;
293 : : }
294 : :
295 : : /*
296 : : * Set the LSN at which two-phase decoding is enabled.
297 : : */
298 : : void
299 : 8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
300 : : {
301 : 8 : builder->two_phase_at = ptr;
1916 302 : 8 : }
303 : :
304 : : /*
305 : : * Should the contents of transaction ending at 'ptr' be decoded?
306 : : */
307 : : bool
4471 rhaas@postgresql.org 308 : 375296 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
309 : : {
4377 andres@anarazel.de 310 : 375296 : return ptr < builder->start_decoding_at;
311 : : }
312 : :
313 : : /*
314 : : * Increase refcount of a snapshot.
315 : : *
316 : : * This is used when handing out a snapshot to some external resource or when
317 : : * adding a Snapshot as builder->snapshot.
318 : : */
319 : : static void
4471 rhaas@postgresql.org 320 : 7150 : SnapBuildSnapIncRefcount(Snapshot snap)
321 : : {
322 : 7150 : snap->active_count++;
323 : 7150 : }
324 : :
325 : : /*
326 : : * Decrease refcount of a snapshot and free if the refcount reaches zero.
327 : : *
328 : : * Externally visible, so that external resources that have been handed an
329 : : * IncRef'ed Snapshot can adjust its refcount easily.
330 : : */
331 : : void
332 : 6865 : SnapBuildSnapDecRefcount(Snapshot snap)
333 : : {
334 : : /* make sure we don't get passed an external snapshot */
2686 andres@anarazel.de 335 [ - + ]: 6865 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
336 : :
337 : : /* make sure nobody modified our snapshot */
4471 rhaas@postgresql.org 338 [ - + ]: 6865 : Assert(snap->curcid == FirstCommandId);
339 [ - + ]: 6865 : Assert(!snap->suboverflowed);
340 [ - + ]: 6865 : Assert(!snap->takenDuringRecovery);
341 : :
4062 heikki.linnakangas@i 342 [ - + ]: 6865 : Assert(snap->regd_count == 0);
343 : :
344 [ - + ]: 6865 : Assert(snap->active_count > 0);
345 : :
346 : : /* slightly more likely, so it's checked even without casserts */
4471 rhaas@postgresql.org 347 [ - + ]: 6865 : if (snap->copied)
4471 rhaas@postgresql.org 348 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
349 : :
4471 rhaas@postgresql.org 350 :CBC 6865 : snap->active_count--;
4062 heikki.linnakangas@i 351 [ + + ]: 6865 : if (snap->active_count == 0)
4471 rhaas@postgresql.org 352 : 1632 : SnapBuildFreeSnapshot(snap);
353 : 6865 : }
354 : :
355 : : /*
356 : : * Build a new snapshot, based on currently committed catalog-modifying
357 : : * transactions.
358 : : *
359 : : * In-progress transactions with catalog access are *not* allowed to modify
360 : : * these snapshots; they have to copy them and fill in appropriate ->curcid
361 : : * and ->subxip/subxcnt values.
362 : : */
363 : : static Snapshot
3272 andres@anarazel.de 364 : 2116 : SnapBuildBuildSnapshot(SnapBuild *builder)
365 : : {
366 : : Snapshot snapshot;
367 : : Size ssize;
368 : :
4471 rhaas@postgresql.org 369 [ - + ]: 2116 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
370 : :
371 : 2116 : ssize = sizeof(SnapshotData)
372 : 2116 : + sizeof(TransactionId) * builder->committed.xcnt
373 : 2116 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
374 : :
375 : 2116 : snapshot = MemoryContextAllocZero(builder->context, ssize);
376 : :
2686 andres@anarazel.de 377 : 2116 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
378 : :
379 : : /*
380 : : * We misuse the original meaning of SnapshotData's xip and subxip fields
381 : : * to make the more fitting for our needs.
382 : : *
383 : : * In the 'xip' array we store transactions that have to be treated as
384 : : * committed. Since we will only ever look at tuples from transactions
385 : : * that have modified the catalog it's more efficient to store those few
386 : : * that exist between xmin and xmax (frequently there are none).
387 : : *
388 : : * Snapshots that are used in transactions that have modified the catalog
389 : : * also use the 'subxip' array to store their toplevel xid and all the
390 : : * subtransaction xids so we can recognize when we need to treat rows as
391 : : * visible that are not in xip but still need to be visible. Subxip only
392 : : * gets filled when the transaction is copied into the context of a
393 : : * catalog modifying transaction since we otherwise share a snapshot
394 : : * between transactions. As long as a txn hasn't modified the catalog it
395 : : * doesn't need to treat any uncommitted rows as visible, so there is no
396 : : * need for those xids.
397 : : *
398 : : * Both arrays are qsort'ed so that we can use bsearch() on them.
399 : : */
4471 rhaas@postgresql.org 400 [ - + ]: 2116 : Assert(TransactionIdIsNormal(builder->xmin));
401 [ - + ]: 2116 : Assert(TransactionIdIsNormal(builder->xmax));
402 : :
403 : 2116 : snapshot->xmin = builder->xmin;
404 : 2116 : snapshot->xmax = builder->xmax;
405 : :
406 : : /* store all transactions to be treated as committed by this snapshot */
407 : 2116 : snapshot->xip =
408 : 2116 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
409 : 2116 : snapshot->xcnt = builder->committed.xcnt;
410 : 2116 : memcpy(snapshot->xip,
411 : 2116 : builder->committed.xip,
412 : 2116 : builder->committed.xcnt * sizeof(TransactionId));
413 : :
414 : : /* sort so we can bsearch() */
415 : 2116 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
416 : :
417 : : /*
418 : : * Initially, subxip is empty, i.e. it's a snapshot to be used by
419 : : * transactions that don't modify the catalog. Will be filled by
420 : : * ReorderBufferCopySnap() if necessary.
421 : : */
422 : 2116 : snapshot->subxcnt = 0;
423 : 2116 : snapshot->subxip = NULL;
424 : :
425 : 2116 : snapshot->suboverflowed = false;
426 : 2116 : snapshot->takenDuringRecovery = false;
427 : 2116 : snapshot->copied = false;
428 : 2116 : snapshot->curcid = FirstCommandId;
429 : 2116 : snapshot->active_count = 0;
4062 heikki.linnakangas@i 430 : 2116 : snapshot->regd_count = 0;
2112 andres@anarazel.de 431 : 2116 : snapshot->snapXactCompletionCount = 0;
432 : :
4471 rhaas@postgresql.org 433 : 2116 : return snapshot;
434 : : }
435 : :
436 : : /*
437 : : * Build the initial slot snapshot and convert it to a normal snapshot that
438 : : * is understood by HeapTupleSatisfiesMVCC.
439 : : *
440 : : * The snapshot will be usable directly in current transaction or exported
441 : : * for loading in different transaction.
442 : : */
443 : : Snapshot
3352 tgl@sss.pgh.pa.us 444 : 219 : SnapBuildInitialSnapshot(SnapBuild *builder)
445 : : {
446 : : Snapshot snap;
447 : : TransactionId xid;
448 : : TransactionId safeXid;
449 : : TransactionId *newxip;
4471 rhaas@postgresql.org 450 : 219 : int newxcnt = 0;
451 : :
3352 tgl@sss.pgh.pa.us 452 [ - + ]: 219 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
1286 akapila@postgresql.o 453 [ - + ]: 219 : Assert(builder->building_full_snapshot);
454 : :
455 : : /* don't allow older snapshots */
1107 tgl@sss.pgh.pa.us 456 : 219 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
1286 akapila@postgresql.o 457 [ - + ]: 219 : if (HaveRegisteredOrActiveSnapshot())
1286 akapila@postgresql.o 458 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
1286 akapila@postgresql.o 459 [ - + ]:CBC 219 : Assert(!HistoricSnapshotActive());
460 : :
4471 rhaas@postgresql.org 461 [ - + ]: 219 : if (builder->state != SNAPBUILD_CONSISTENT)
3355 peter_e@gmx.net 462 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
463 : :
4471 rhaas@postgresql.org 464 [ - + ]:CBC 219 : if (!builder->committed.includes_all_transactions)
3355 peter_e@gmx.net 465 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
466 : :
467 : : /* so we don't overwrite the existing value */
2116 andres@anarazel.de 468 [ - + ]:CBC 219 : if (TransactionIdIsValid(MyProc->xmin))
2116 andres@anarazel.de 469 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
470 : :
3272 andres@anarazel.de 471 :CBC 219 : snap = SnapBuildBuildSnapshot(builder);
472 : :
473 : : /*
474 : : * We know that snap->xmin is alive, enforced by the logical xmin
475 : : * mechanism. Due to that we can do this without locks, we're only
476 : : * changing our own value.
477 : : *
478 : : * Building an initial snapshot is expensive and an unenforced xmin
479 : : * horizon would have bad consequences, therefore always double-check that
480 : : * the horizon is enforced.
481 : : */
1286 akapila@postgresql.o 482 : 219 : LWLockAcquire(ProcArrayLock, LW_SHARED);
483 : 219 : safeXid = GetOldestSafeDecodingTransactionId(false);
484 : 219 : LWLockRelease(ProcArrayLock);
485 : :
486 [ - + ]: 219 : if (TransactionIdFollows(safeXid, snap->xmin))
1286 akapila@postgresql.o 487 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
488 : : safeXid, snap->xmin);
489 : :
2116 andres@anarazel.de 490 :CBC 219 : MyProc->xmin = snap->xmin;
491 : :
492 : : /* allocate in transaction context */
171 michael@paquier.xyz 493 :GNC 219 : newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
494 : :
495 : : /*
496 : : * snapbuild.c builds transactions in an "inverted" manner, which means it
497 : : * stores committed transactions in ->xip, not ones in progress. Build a
498 : : * classical snapshot by marking all non-committed transactions as
499 : : * in-progress. This can be expensive.
500 : : */
4471 rhaas@postgresql.org 501 [ + - - + :CBC 219 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
- + ]
502 : : {
503 : : void *test;
504 : :
505 : : /*
506 : : * Check whether transaction committed using the decoding snapshot
507 : : * meaning of ->xip.
508 : : */
4471 rhaas@postgresql.org 509 :UBC 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
510 : : sizeof(TransactionId), xidComparator);
511 : :
512 [ # # ]: 0 : if (test == NULL)
513 : : {
514 [ # # ]: 0 : if (newxcnt >= GetMaxSnapshotXidCount())
3355 peter_e@gmx.net 515 [ # # ]: 0 : ereport(ERROR,
516 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
517 : : errmsg("initial slot snapshot too large")));
518 : :
4471 rhaas@postgresql.org 519 : 0 : newxip[newxcnt++] = xid;
520 : : }
521 : :
522 [ # # ]: 0 : TransactionIdAdvance(xid);
523 : : }
524 : :
525 : : /* adjust remaining snapshot fields as needed */
2656 michael@paquier.xyz 526 :CBC 219 : snap->snapshot_type = SNAPSHOT_MVCC;
4471 rhaas@postgresql.org 527 : 219 : snap->xcnt = newxcnt;
528 : 219 : snap->xip = newxip;
529 : :
3355 peter_e@gmx.net 530 : 219 : return snap;
531 : : }
532 : :
533 : : /*
534 : : * Export a snapshot so it can be set in another session with SET TRANSACTION
535 : : * SNAPSHOT.
536 : : *
537 : : * For that we need to start a transaction in the current backend as the
538 : : * importing side checks whether the source transaction is still open to make
539 : : * sure the xmin horizon hasn't advanced since then.
540 : : */
541 : : const char *
542 : 1 : SnapBuildExportSnapshot(SnapBuild *builder)
543 : : {
544 : : Snapshot snap;
545 : : char *snapname;
546 : :
547 [ - + ]: 1 : if (IsTransactionOrTransactionBlock())
3355 peter_e@gmx.net 548 [ # # ]:UBC 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
549 : :
3355 peter_e@gmx.net 550 [ - + ]:CBC 1 : if (SavedResourceOwnerDuringExport)
3355 peter_e@gmx.net 551 [ # # ]:UBC 0 : elog(ERROR, "can only export one snapshot at a time");
552 : :
3355 peter_e@gmx.net 553 :CBC 1 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
554 : 1 : ExportInProgress = true;
555 : :
556 : 1 : StartTransactionCommand();
557 : :
558 : : /* There doesn't seem to a nice API to set these */
559 : 1 : XactIsoLevel = XACT_REPEATABLE_READ;
560 : 1 : XactReadOnly = true;
561 : :
3352 tgl@sss.pgh.pa.us 562 : 1 : snap = SnapBuildInitialSnapshot(builder);
563 : :
564 : : /*
565 : : * now that we've built a plain snapshot, make it active and use the
566 : : * normal mechanisms for exporting it
567 : : */
4471 rhaas@postgresql.org 568 : 1 : snapname = ExportSnapshot(snap);
569 : :
570 [ + - ]: 1 : ereport(LOG,
571 : : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
572 : : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
573 : : snap->xcnt,
574 : : snapname, snap->xcnt)));
575 : 1 : return snapname;
576 : : }
577 : :
578 : : /*
579 : : * Ensure there is a snapshot and if not build one for current transaction.
580 : : */
581 : : Snapshot
1342 akapila@postgresql.o 582 : 8 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
583 : : {
3706 simon@2ndQuadrant.co 584 [ - + ]: 8 : Assert(builder->state == SNAPBUILD_CONSISTENT);
585 : :
586 : : /* only build a new snapshot if we don't have a prebuilt one */
587 [ + + ]: 8 : if (builder->snapshot == NULL)
588 : : {
3272 andres@anarazel.de 589 : 1 : builder->snapshot = SnapBuildBuildSnapshot(builder);
590 : : /* increase refcount for the snapshot builder */
3706 simon@2ndQuadrant.co 591 : 1 : SnapBuildSnapIncRefcount(builder->snapshot);
592 : : }
593 : :
594 : 8 : return builder->snapshot;
595 : : }
596 : :
597 : : /*
598 : : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
599 : : * any. Aborts the previously started transaction and resets the resource
600 : : * owner back to its original value.
601 : : */
602 : : void
3941 andres@anarazel.de 603 : 5831 : SnapBuildClearExportedSnapshot(void)
604 : : {
605 : : ResourceOwner tmpResOwner;
606 : :
607 : : /* nothing exported, that is the usual case */
4471 rhaas@postgresql.org 608 [ + + ]: 5831 : if (!ExportInProgress)
609 : 5830 : return;
610 : :
611 [ - + ]: 1 : if (!IsTransactionState())
4471 rhaas@postgresql.org 612 [ # # ]:UBC 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
613 : :
614 : : /*
615 : : * AbortCurrentTransaction() takes care of resetting the snapshot state,
616 : : * so remember SavedResourceOwnerDuringExport.
617 : : */
1685 michael@paquier.xyz 618 :CBC 1 : tmpResOwner = SavedResourceOwnerDuringExport;
619 : :
620 : : /* make sure nothing could have ever happened */
4471 rhaas@postgresql.org 621 : 1 : AbortCurrentTransaction();
622 : :
1685 michael@paquier.xyz 623 : 1 : CurrentResourceOwner = tmpResOwner;
624 : : }
625 : :
626 : : /*
627 : : * Clear snapshot export state during transaction abort.
628 : : */
629 : : void
630 : 35386 : SnapBuildResetExportedSnapshotState(void)
631 : : {
4471 rhaas@postgresql.org 632 : 35386 : SavedResourceOwnerDuringExport = NULL;
633 : 35386 : ExportInProgress = false;
634 : 35386 : }
635 : :
636 : : /*
637 : : * Handle the effects of a single heap change, appropriate to the current state
638 : : * of the snapshot builder and returns whether changes made at (xid, lsn) can
639 : : * be decoded.
640 : : */
641 : : bool
642 : 1398908 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
643 : : {
644 : : /*
645 : : * We can't handle data in transactions if we haven't built a snapshot
646 : : * yet, so don't store them.
647 : : */
648 [ - + ]: 1398908 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
4471 rhaas@postgresql.org 649 :UBC 0 : return false;
650 : :
651 : : /*
652 : : * No point in keeping track of changes in transactions that we don't have
653 : : * enough information about to decode. This means that they started before
654 : : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
655 : : */
4471 rhaas@postgresql.org 656 [ + + - + ]:CBC 1398911 : if (builder->state < SNAPBUILD_CONSISTENT &&
1930 andres@anarazel.de 657 : 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
4471 rhaas@postgresql.org 658 :UBC 0 : return false;
659 : :
660 : : /*
661 : : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
662 : : * be needed to decode the change we're currently processing.
663 : : */
3738 andres@anarazel.de 664 [ + + ]:CBC 1398908 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
665 : : {
666 : : /* only build a new snapshot if we don't have a prebuilt one */
4471 rhaas@postgresql.org 667 [ + + ]: 3794 : if (builder->snapshot == NULL)
668 : : {
3272 andres@anarazel.de 669 : 442 : builder->snapshot = SnapBuildBuildSnapshot(builder);
670 : : /* increase refcount for the snapshot builder */
4471 rhaas@postgresql.org 671 : 442 : SnapBuildSnapIncRefcount(builder->snapshot);
672 : : }
673 : :
674 : : /*
675 : : * Increase refcount for the transaction we're handing the snapshot
676 : : * out to.
677 : : */
678 : 3794 : SnapBuildSnapIncRefcount(builder->snapshot);
679 : 3794 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
680 : : builder->snapshot);
681 : : }
682 : :
683 : 1398908 : return true;
684 : : }
685 : :
686 : : /*
687 : : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
688 : : * This implies that a transaction has done some form of write to system
689 : : * catalogs.
690 : : */
691 : : void
692 : 27192 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
693 : : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
694 : : {
695 : : CommandId cid;
696 : :
697 : : /*
698 : : * we only log new_cid's if a catalog tuple was modified, so mark the
699 : : * transaction as containing catalog modifications
700 : : */
4407 bruce@momjian.us 701 : 27192 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
702 : :
4471 rhaas@postgresql.org 703 : 27192 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
704 : : xlrec->target_locator, xlrec->target_tid,
705 : : xlrec->cmin, xlrec->cmax,
706 : : xlrec->combocid);
707 : :
708 : : /* figure out new command id */
709 [ + + ]: 27192 : if (xlrec->cmin != InvalidCommandId &&
710 [ + + ]: 22876 : xlrec->cmax != InvalidCommandId)
711 : 3311 : cid = Max(xlrec->cmin, xlrec->cmax);
712 [ + + ]: 23881 : else if (xlrec->cmax != InvalidCommandId)
713 : 4316 : cid = xlrec->cmax;
714 [ + - ]: 19565 : else if (xlrec->cmin != InvalidCommandId)
715 : 19565 : cid = xlrec->cmin;
716 : : else
717 : : {
4407 bruce@momjian.us 718 :UBC 0 : cid = InvalidCommandId; /* silence compiler */
4471 rhaas@postgresql.org 719 [ # # ]: 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
720 : : }
721 : :
4471 rhaas@postgresql.org 722 :CBC 27192 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
723 : 27192 : }
724 : :
725 : : /*
726 : : * Add a new Snapshot and invalidation messages to all transactions we're
727 : : * decoding that currently are in-progress so they can see new catalog contents
728 : : * made by the transaction that just committed. This is necessary because those
729 : : * in-progress transactions will use the new catalog's contents from here on
730 : : * (at the very least everything they do needs to be compatible with newer
731 : : * catalog contents).
732 : : */
733 : : static void
415 akapila@postgresql.o 734 : 1446 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
735 : : {
736 : : dlist_iter txn_i;
737 : : ReorderBufferTXN *txn;
738 : :
739 : : /*
740 : : * Iterate through all toplevel transactions. This can include
741 : : * subtransactions which we just don't yet know to be that, but that's
742 : : * fine, they will just get an unnecessary snapshot and invalidations
743 : : * queued.
744 : : */
4471 rhaas@postgresql.org 745 [ + - + + ]: 2927 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
746 : : {
747 : 1481 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
748 : :
749 [ - + ]: 1481 : Assert(TransactionIdIsValid(txn->xid));
750 : :
751 : : /*
752 : : * If we don't have a base snapshot yet, there are no changes in this
753 : : * transaction which in turn implies we don't yet need a snapshot at
754 : : * all. We'll add a snapshot when the first change gets queued.
755 : : *
756 : : * Similarly, we don't need to add invalidations to a transaction
757 : : * whose base snapshot is not yet set. Once a base snapshot is built,
758 : : * it will include the xids of committed transactions that have
759 : : * modified the catalog, thus reflecting the new catalog contents. The
760 : : * existing catalog cache will have already been invalidated after
761 : : * processing the invalidations in the transaction that modified
762 : : * catalogs, ensuring that a fresh cache is constructed during
763 : : * decoding.
764 : : *
765 : : * NB: This works correctly even for subtransactions because
766 : : * ReorderBufferAssignChild() takes care to transfer the base snapshot
767 : : * to the top-level transaction, and while iterating the changequeue
768 : : * we'll get the change from the subtxn.
769 : : */
770 [ + + ]: 1481 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
771 : 2 : continue;
772 : :
773 : : /*
774 : : * We don't need to add snapshot or invalidations to prepared
775 : : * transactions as they should not see the new catalog contents.
776 : : */
472 msawada@postgresql.o 777 [ + + ]: 1479 : if (rbtxn_is_prepared(txn))
1972 akapila@postgresql.o 778 : 28 : continue;
779 : :
327 alvherre@kurilemu.de 780 [ + + ]:GNC 1451 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
781 : : txn->xid, LSN_FORMAT_ARGS(lsn));
782 : :
783 : : /*
784 : : * increase the snapshot's refcount for the transaction we are handing
785 : : * it out to
786 : : */
4471 rhaas@postgresql.org 787 :CBC 1451 : SnapBuildSnapIncRefcount(builder->snapshot);
788 : 1451 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
789 : : builder->snapshot);
790 : :
791 : : /*
792 : : * Add invalidation messages to the reorder buffer of in-progress
793 : : * transactions except the current committed transaction, for which we
794 : : * will execute invalidations at the end.
795 : : *
796 : : * It is required, otherwise, we will end up using the stale catcache
797 : : * contents built by the current transaction even after its decoding,
798 : : * which should have been invalidated due to concurrent catalog
799 : : * changing transaction.
800 : : *
801 : : * Distribute only the invalidation messages generated by the current
802 : : * committed transaction. Invalidation messages received from other
803 : : * transactions would have already been propagated to the relevant
804 : : * in-progress transactions. This transaction would have processed
805 : : * those invalidations, ensuring that subsequent transactions observe
806 : : * a consistent cache state.
807 : : */
415 akapila@postgresql.o 808 [ + + ]: 1451 : if (txn->xid != xid)
809 : : {
810 : : uint32 ninvalidations;
811 : 33 : SharedInvalidationMessage *msgs = NULL;
812 : :
813 : 33 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
814 : : xid, &msgs);
815 : :
816 [ + + ]: 33 : if (ninvalidations > 0)
817 : : {
818 [ - + ]: 29 : Assert(msgs != NULL);
819 : :
348 msawada@postgresql.o 820 : 29 : ReorderBufferAddDistributedInvalidations(builder->reorder,
821 : : txn->xid, lsn,
822 : : ninvalidations, msgs);
823 : : }
824 : : }
825 : : }
4471 rhaas@postgresql.org 826 : 1446 : }
827 : :
828 : : /*
829 : : * Keep track of a new catalog changing transaction that has committed.
830 : : */
831 : : static void
832 : 1455 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
833 : : {
834 [ - + ]: 1455 : Assert(TransactionIdIsValid(xid));
835 : :
836 [ + + ]: 1455 : if (builder->committed.xcnt == builder->committed.xcnt_space)
837 : : {
4471 rhaas@postgresql.org 838 :GBC 1 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
839 : :
840 [ - + ]: 1 : elog(DEBUG1, "increasing space for committed transactions to %u",
841 : : (uint32) builder->committed.xcnt_space);
842 : :
85 msawada@postgresql.o 843 :GNC 1 : builder->committed.xip = repalloc_array(builder->committed.xip,
844 : : TransactionId,
845 : : builder->committed.xcnt_space);
846 : : }
847 : :
848 : : /*
849 : : * TODO: It might make sense to keep the array sorted here instead of
850 : : * doing it every time we build a new snapshot. On the other hand this
851 : : * gets called repeatedly when a transaction with subtransactions commits.
852 : : */
4471 rhaas@postgresql.org 853 :CBC 1455 : builder->committed.xip[builder->committed.xcnt++] = xid;
854 : 1455 : }
855 : :
856 : : /*
857 : : * Remove knowledge about transactions we treat as committed or containing catalog
858 : : * changes that are smaller than ->xmin. Those won't ever get checked via
859 : : * the ->committed or ->catchange array, respectively. The committed xids will
860 : : * get checked via the clog machinery.
861 : : *
862 : : * We can ideally remove the transaction from catchange array once it is
863 : : * finished (committed/aborted) but that could be costly as we need to maintain
864 : : * the xids order in the array.
865 : : */
866 : : static void
1388 akapila@postgresql.o 867 : 504 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
868 : : {
869 : : int off;
870 : : TransactionId *workspace;
4471 rhaas@postgresql.org 871 : 504 : int surviving_xids = 0;
872 : :
873 : : /* not ready yet */
874 [ - + ]: 504 : if (!TransactionIdIsNormal(builder->xmin))
4471 rhaas@postgresql.org 875 :UBC 0 : return;
876 : :
877 : : /* TODO: Neater algorithm than just copying and iterating? */
878 : : workspace =
4471 rhaas@postgresql.org 879 :CBC 504 : MemoryContextAlloc(builder->context,
880 : 504 : builder->committed.xcnt * sizeof(TransactionId));
881 : :
882 : : /* copy xids that still are interesting to workspace */
883 [ + + ]: 769 : for (off = 0; off < builder->committed.xcnt; off++)
884 : : {
885 [ + - - + : 265 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
+ + ]
886 : : builder->xmin))
887 : : ; /* remove */
888 : : else
889 : 1 : workspace[surviving_xids++] = builder->committed.xip[off];
890 : : }
891 : :
892 : : /* copy workspace back to persistent state */
893 : 504 : memcpy(builder->committed.xip, workspace,
894 : : surviving_xids * sizeof(TransactionId));
895 : :
896 [ - + ]: 504 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
897 : : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
898 : : builder->xmin, builder->xmax);
899 : 504 : builder->committed.xcnt = surviving_xids;
900 : :
901 : 504 : pfree(workspace);
902 : :
903 : : /*
904 : : * Purge xids in ->catchange as well. The purged array must also be sorted
905 : : * in xidComparator order.
906 : : */
1370 akapila@postgresql.o 907 [ + + ]: 504 : if (builder->catchange.xcnt > 0)
908 : : {
909 : : /*
910 : : * Since catchange.xip is sorted, we find the lower bound of xids that
911 : : * are still interesting.
912 : : */
913 [ + + ]: 7 : for (off = 0; off < builder->catchange.xcnt; off++)
914 : : {
915 [ + + ]: 5 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
916 : : builder->xmin))
917 : 1 : break;
918 : : }
919 : :
920 : 3 : surviving_xids = builder->catchange.xcnt - off;
921 : :
922 [ + + ]: 3 : if (surviving_xids > 0)
923 : : {
924 : 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
925 : : surviving_xids * sizeof(TransactionId));
926 : : }
927 : : else
928 : : {
929 : 2 : pfree(builder->catchange.xip);
930 : 2 : builder->catchange.xip = NULL;
931 : : }
932 : :
933 [ - + ]: 3 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
934 : : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
935 : : builder->xmin, builder->xmax);
936 : 3 : builder->catchange.xcnt = surviving_xids;
937 : : }
938 : : }
939 : :
940 : : /*
941 : : * Handle everything that needs to be done when a transaction commits
942 : : */
943 : : void
4471 rhaas@postgresql.org 944 : 3644 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
945 : : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
946 : : {
947 : : int nxact;
948 : :
3304 andres@anarazel.de 949 : 3644 : bool needs_snapshot = false;
950 : 3644 : bool needs_timetravel = false;
4471 rhaas@postgresql.org 951 : 3644 : bool sub_needs_timetravel = false;
952 : :
953 : 3644 : TransactionId xmax = xid;
954 : :
955 : : /*
956 : : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
957 : : * will they be part of a snapshot. So we don't need to record anything.
958 : : */
3304 andres@anarazel.de 959 [ + - ]: 3644 : if (builder->state == SNAPBUILD_START ||
960 [ - + - - ]: 3644 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1930 andres@anarazel.de 961 :UBC 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
962 : : {
963 : : /* ensure that only commits after this are getting replayed */
3304 964 [ # # ]: 0 : if (builder->start_decoding_at <= lsn)
965 : 0 : builder->start_decoding_at = lsn + 1;
966 : 0 : return;
967 : : }
968 : :
4471 rhaas@postgresql.org 969 [ + + ]:CBC 3644 : if (builder->state < SNAPBUILD_CONSISTENT)
970 : : {
971 : : /* ensure that only commits after this are getting replayed */
4377 andres@anarazel.de 972 [ + + ]: 5 : if (builder->start_decoding_at <= lsn)
973 : 2 : builder->start_decoding_at = lsn + 1;
974 : :
975 : : /*
976 : : * If building an exportable snapshot, force xid to be tracked, even
977 : : * if the transaction didn't modify the catalog.
978 : : */
3304 979 [ - + ]: 5 : if (builder->building_full_snapshot)
980 : : {
3304 andres@anarazel.de 981 :UBC 0 : needs_timetravel = true;
982 : : }
983 : : }
984 : :
4471 rhaas@postgresql.org 985 [ + + ]:CBC 4868 : for (nxact = 0; nxact < nsubxacts; nxact++)
986 : : {
987 : 1224 : TransactionId subxid = subxacts[nxact];
988 : :
989 : : /*
990 : : * Add subtransaction to base snapshot if catalog modifying, we don't
991 : : * distinguish to toplevel transactions there.
992 : : */
1388 akapila@postgresql.o 993 [ + + ]: 1224 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
994 : : {
3304 andres@anarazel.de 995 : 9 : sub_needs_timetravel = true;
996 : 9 : needs_snapshot = true;
997 : :
998 [ - + ]: 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
999 : : xid, subxid);
1000 : :
4471 rhaas@postgresql.org 1001 : 9 : SnapBuildAddCommittedTxn(builder, subxid);
1002 : :
1003 [ + - - + : 9 : if (NormalTransactionIdFollows(subxid, xmax))
+ - ]
1004 : 9 : xmax = subxid;
1005 : : }
1006 : :
1007 : : /*
1008 : : * If we're forcing timetravel we also need visibility information
1009 : : * about subtransaction, so keep track of subtransaction's state, even
1010 : : * if not catalog modifying. Don't need to distribute a snapshot in
1011 : : * that case.
1012 : : */
3304 andres@anarazel.de 1013 [ - + ]: 1215 : else if (needs_timetravel)
1014 : : {
4471 rhaas@postgresql.org 1015 :UBC 0 : SnapBuildAddCommittedTxn(builder, subxid);
1016 [ # # # # : 0 : if (NormalTransactionIdFollows(subxid, xmax))
# # ]
1017 : 0 : xmax = subxid;
1018 : : }
1019 : : }
1020 : :
1021 : : /* if top-level modified catalog, it'll need a snapshot */
1388 akapila@postgresql.o 1022 [ + + ]:CBC 3644 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1023 : : {
3304 andres@anarazel.de 1024 [ + + ]: 1445 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1025 : : xid);
1026 : 1445 : needs_snapshot = true;
1027 : 1445 : needs_timetravel = true;
4471 rhaas@postgresql.org 1028 : 1445 : SnapBuildAddCommittedTxn(builder, xid);
1029 : : }
3304 andres@anarazel.de 1030 [ + + ]: 2199 : else if (sub_needs_timetravel)
1031 : : {
1032 : : /* track toplevel txn as well, subxact alone isn't meaningful */
1318 akapila@postgresql.o 1033 [ - + ]: 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1034 : : xid);
1035 : 1 : needs_timetravel = true;
4471 rhaas@postgresql.org 1036 : 1 : SnapBuildAddCommittedTxn(builder, xid);
1037 : : }
3304 andres@anarazel.de 1038 [ - + ]: 2198 : else if (needs_timetravel)
1039 : : {
3304 andres@anarazel.de 1040 [ # # ]:UBC 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1041 : :
4471 rhaas@postgresql.org 1042 : 0 : SnapBuildAddCommittedTxn(builder, xid);
1043 : : }
1044 : :
3304 andres@anarazel.de 1045 [ + + ]:CBC 3644 : if (!needs_timetravel)
1046 : : {
1047 : : /* record that we cannot export a general snapshot anymore */
1048 : 2198 : builder->committed.includes_all_transactions = false;
1049 : : }
1050 : :
1051 [ + + - + ]: 3644 : Assert(!needs_snapshot || needs_timetravel);
1052 : :
1053 : : /*
1054 : : * Adjust xmax of the snapshot builder, we only do that for committed,
1055 : : * catalog modifying, transactions, everything else isn't interesting for
1056 : : * us since we'll never look at the respective rows.
1057 : : */
1058 [ + + ]: 3644 : if (needs_timetravel &&
1059 [ + - + + ]: 2892 : (!TransactionIdIsValid(builder->xmax) ||
1060 : 1446 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1061 : : {
1062 : 1445 : builder->xmax = xmax;
1063 [ - + ]: 1445 : TransactionIdAdvance(builder->xmax);
1064 : : }
1065 : :
1066 : : /* if there's any reason to build a historic snapshot, do so now */
1067 [ + + ]: 3644 : if (needs_snapshot)
1068 : : {
1069 : : /*
1070 : : * If we haven't built a complete snapshot yet there's no need to hand
1071 : : * it out, it wouldn't (and couldn't) be used anyway.
1072 : : */
4471 rhaas@postgresql.org 1073 [ - + ]: 1446 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
4471 rhaas@postgresql.org 1074 :UBC 0 : return;
1075 : :
1076 : : /*
1077 : : * Decrease the snapshot builder's refcount of the old snapshot, note
1078 : : * that it still will be used if it has been handed out to the
1079 : : * reorderbuffer earlier.
1080 : : */
4471 rhaas@postgresql.org 1081 [ + - ]:CBC 1446 : if (builder->snapshot)
1082 : 1446 : SnapBuildSnapDecRefcount(builder->snapshot);
1083 : :
3272 andres@anarazel.de 1084 : 1446 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1085 : :
1086 : : /* we might need to execute invalidations, add snapshot */
4471 rhaas@postgresql.org 1087 [ + + ]: 1446 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1088 : : {
1089 : 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1090 : 8 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1091 : : builder->snapshot);
1092 : : }
1093 : :
1094 : : /* refcount of the snapshot builder for the new snapshot */
1095 : 1446 : SnapBuildSnapIncRefcount(builder->snapshot);
1096 : :
1097 : : /*
1098 : : * Add a new catalog snapshot and invalidations messages to all
1099 : : * currently running transactions.
1100 : : */
415 akapila@postgresql.o 1101 : 1446 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1102 : : }
1103 : : }
1104 : :
1105 : : /*
1106 : : * Check the reorder buffer and the snapshot to see if the given transaction has
1107 : : * modified catalogs.
1108 : : */
1109 : : static inline bool
1388 1110 : 4868 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1111 : : uint32 xinfo)
1112 : : {
1113 [ + + ]: 4868 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1114 : 1450 : return true;
1115 : :
1116 : : /*
1117 : : * The transactions that have changed catalogs must have invalidation
1118 : : * info.
1119 : : */
1120 [ + + ]: 3418 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1121 : 3410 : return false;
1122 : :
1123 : : /* Check the catchange XID array */
1124 [ + + + - ]: 12 : return ((builder->catchange.xcnt > 0) &&
1125 : 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1126 : : sizeof(TransactionId), xidComparator) != NULL));
1127 : : }
1128 : :
1129 : : /* -----------------------------------
1130 : : * Snapshot building functions dealing with xlog records
1131 : : * -----------------------------------
1132 : : */
1133 : :
1134 : : /*
1135 : : * Process a running xacts record, and use its information to first build a
1136 : : * historic snapshot and later to release resources that aren't needed
1137 : : * anymore.
1138 : : */
1139 : : void
7 alvherre@kurilemu.de 1140 : 1649 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1141 : : {
1142 : : ReorderBufferTXN *txn;
1143 : : TransactionId xmin;
1144 : :
1145 : : /*
1146 : : * If we're not consistent yet, inspect the record to see whether it
1147 : : * allows to get closer to being consistent. If we are consistent, dump
1148 : : * our snapshot so others or we, after a restart, can use it.
1149 : : */
4471 rhaas@postgresql.org 1150 [ + + ]: 1649 : if (builder->state < SNAPBUILD_CONSISTENT)
1151 : : {
1152 : : /* returns false if there's no point in performing cleanup just yet */
1153 [ + + ]: 1167 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1154 : 1143 : return;
1155 : : }
1156 : : else
1157 : 482 : SnapBuildSerialize(builder, lsn);
1158 : :
1159 : : /*
1160 : : * Update range of interesting xids based on the running xacts
1161 : : * information. We don't increase ->xmax using it, because once we are in
1162 : : * a consistent state we can do that ourselves and much more efficiently
1163 : : * so, because we only need to do it for catalog transactions since we
1164 : : * only ever look at those.
1165 : : *
1166 : : * NB: We only increase xmax when a catalog modifying transaction commits
1167 : : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1168 : : * xmin, which looks odd but is correct and actually more efficient, since
1169 : : * we hit fast paths in heapam_visibility.c.
1170 : : */
1171 : 504 : builder->xmin = running->oldestRunningXid;
1172 : :
1173 : : /* Remove transactions we don't need to keep track off anymore */
1388 akapila@postgresql.o 1174 : 504 : SnapBuildPurgeOlderTxn(builder);
1175 : :
1176 : : /*
1177 : : * Advance the xmin limit for the current replication slot, to allow
1178 : : * vacuum to clean up the tuples this slot has been protecting.
1179 : : *
1180 : : * The reorderbuffer might have an xmin among the currently running
1181 : : * snapshots; use it if so. If not, we need only consider the snapshots
1182 : : * we'll produce later, which can't be less than the oldest running xid in
1183 : : * the record we're reading now.
1184 : : */
2895 alvherre@alvh.no-ip. 1185 : 504 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1186 [ + + ]: 504 : if (xmin == InvalidTransactionId)
1187 : 458 : xmin = running->oldestRunningXid;
1188 [ - + ]: 504 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1189 : : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1190 : 504 : LogicalIncreaseXminForSlot(lsn, xmin);
1191 : :
1192 : : /*
1193 : : * Also tell the slot where we can restart decoding from. We don't want to
1194 : : * do that after every commit because changing that implies an fsync of
1195 : : * the logical slot's state file, so we only do it every time we see a
1196 : : * running xacts record.
1197 : : *
1198 : : * Do so by looking for the oldest in progress transaction (determined by
1199 : : * the first LSN of any of its relevant records). Every transaction
1200 : : * remembers the last location we stored the snapshot to disk before its
1201 : : * beginning. That point is where we can restart from.
1202 : : */
1203 : :
1204 : : /*
1205 : : * Can't know about a serialized snapshot's location if we're not
1206 : : * consistent.
1207 : : */
4471 rhaas@postgresql.org 1208 [ + + ]: 504 : if (builder->state < SNAPBUILD_CONSISTENT)
1209 : 17 : return;
1210 : :
1211 : 487 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1212 : :
1213 : : /*
1214 : : * oldest ongoing txn might have started when we didn't yet serialize
1215 : : * anything because we hadn't reached a consistent state yet.
1216 : : */
205 alvherre@kurilemu.de 1217 [ + + + + ]:GNC 487 : if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
4471 rhaas@postgresql.org 1218 :CBC 24 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1219 : :
1220 : : /*
1221 : : * No in-progress transaction, can reuse the last serialized snapshot if
1222 : : * we have one.
1223 : : */
1224 [ + + ]: 463 : else if (txn == NULL &&
205 alvherre@kurilemu.de 1225 [ + + ]:GNC 432 : XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
1226 [ + - ]: 430 : XLogRecPtrIsValid(builder->last_serialized_snapshot))
4471 rhaas@postgresql.org 1227 :CBC 430 : LogicalIncreaseRestartDecodingForSlot(lsn,
1228 : : builder->last_serialized_snapshot);
1229 : : }
1230 : :
1231 : :
1232 : : /*
1233 : : * Build the start of a snapshot that's capable of decoding the catalog.
1234 : : *
1235 : : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1236 : : * consistent.
1237 : : *
1238 : : * Returns true if there is a point in performing internal maintenance/cleanup
1239 : : * using the xl_running_xacts record.
1240 : : */
1241 : : static bool
1242 : 1167 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1243 : : {
1244 : : /* ---
1245 : : * Build catalog decoding snapshot incrementally using information about
1246 : : * the currently running transactions. There are several ways to do that:
1247 : : *
1248 : : * a) There were no running transactions when the xl_running_xacts record
1249 : : * was inserted, jump to CONSISTENT immediately. We might find such a
1250 : : * state while waiting on c)'s sub-states.
1251 : : *
1252 : : * b) This (in a previous run) or another decoding slot serialized a
1253 : : * snapshot to disk that we can use. Can't use this method while finding
1254 : : * the start point for decoding changes as the restart LSN would be an
1255 : : * arbitrary LSN but we need to find the start point to extract changes
1256 : : * where we won't see the data for partial transactions. Also, we cannot
1257 : : * use this method when a slot needs a full snapshot for export or direct
1258 : : * use, as that snapshot will only contain catalog modifying transactions.
1259 : : *
1260 : : * c) First incrementally build a snapshot for catalog tuples
1261 : : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1262 : : * transactions to finish. Every transaction starting after that
1263 : : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1264 : : * for older running transactions no viable snapshot exists yet, so
1265 : : * CONSISTENT will only be reached once all of those have finished.
1266 : : * ---
1267 : : */
1268 : :
1269 : : /*
1270 : : * xl_running_xacts record is older than what we can use, we might not
1271 : : * have all necessary catalog rows anymore.
1272 : : */
1273 [ + + ]: 1167 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1274 [ + - - + : 503 : NormalTransactionIdPrecedes(running->oldestRunningXid,
- + ]
1275 : : builder->initial_xmin_horizon))
1276 : : {
4471 rhaas@postgresql.org 1277 [ # # ]:UBC 0 : ereport(DEBUG1,
1278 : : errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1279 : : LSN_FORMAT_ARGS(lsn)),
1280 : : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1281 : : builder->initial_xmin_horizon, running->oldestRunningXid));
1282 : :
1283 : :
3304 andres@anarazel.de 1284 : 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1285 : :
4471 rhaas@postgresql.org 1286 : 0 : return true;
1287 : : }
1288 : :
1289 : : /*
1290 : : * a) No transaction were running, we can jump to consistent.
1291 : : *
1292 : : * This is not affected by races around xl_running_xacts, because we can
1293 : : * miss transaction commits, but currently not transactions starting.
1294 : : *
1295 : : * NB: We might have already started to incrementally assemble a snapshot,
1296 : : * so we need to be careful to deal with that.
1297 : : */
3304 andres@anarazel.de 1298 [ + + ]:CBC 1167 : if (running->oldestRunningXid == running->nextXid)
1299 : : {
205 alvherre@kurilemu.de 1300 [ + + ]:GNC 1135 : if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
4377 andres@anarazel.de 1301 [ + + ]:CBC 639 : builder->start_decoding_at <= lsn)
1302 : : /* can decode everything after this */
1303 : 497 : builder->start_decoding_at = lsn + 1;
1304 : :
1305 : : /* As no transactions were running xmin/xmax can be trivially set. */
3265 tgl@sss.pgh.pa.us 1306 : 1135 : builder->xmin = running->nextXid; /* < are finished */
1307 : 1135 : builder->xmax = running->nextXid; /* >= are running */
1308 : :
1309 : : /* so we can safely use the faster comparisons */
4471 rhaas@postgresql.org 1310 [ - + ]: 1135 : Assert(TransactionIdIsNormal(builder->xmin));
1311 [ - + ]: 1135 : Assert(TransactionIdIsNormal(builder->xmax));
1312 : :
1313 : 1135 : builder->state = SNAPBUILD_CONSISTENT;
1930 andres@anarazel.de 1314 : 1135 : builder->next_phase_at = InvalidTransactionId;
1315 : :
50 fujii@postgresql.org 1316 [ + + + + ]:GNC 1135 : ereport(LogicalDecodingLogLevel(),
1317 : : errmsg("logical decoding found consistent point at %X/%08X",
1318 : : LSN_FORMAT_ARGS(lsn)),
1319 : : errdetail("There are no running transactions."));
1320 : :
4471 rhaas@postgresql.org 1321 :CBC 1135 : return false;
1322 : : }
1323 : :
1324 : : /*
1325 : : * b) valid on disk state and while neither building full snapshot nor
1326 : : * creating a slot.
1327 : : */
3320 andres@anarazel.de 1328 [ + + ]: 32 : else if (!builder->building_full_snapshot &&
688 msawada@postgresql.o 1329 [ + + + + ]: 50 : !builder->in_slot_creation &&
3320 andres@anarazel.de 1330 : 19 : SnapBuildRestore(builder, lsn))
1331 : : {
1332 : : /* there won't be any state to cleanup */
4471 rhaas@postgresql.org 1333 : 8 : return false;
1334 : : }
1335 : :
1336 : : /*
1337 : : * c) transition from START to BUILDING_SNAPSHOT.
1338 : : *
1339 : : * In START state, and a xl_running_xacts record with running xacts is
1340 : : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1341 : : * record xl_running_xacts->nextXid. Once all running xacts have finished
1342 : : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1343 : : * might look that we could use xl_running_xacts's ->xids information to
1344 : : * get there quicker, but that is problematic because transactions marked
1345 : : * as running, might already have inserted their commit record - it's
1346 : : * infeasible to change that with locking.
1347 : : */
3304 andres@anarazel.de 1348 [ + + ]: 24 : else if (builder->state == SNAPBUILD_START)
1349 : : {
1350 : 13 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1930 1351 : 13 : builder->next_phase_at = running->nextXid;
1352 : :
1353 : : /*
1354 : : * Start with an xmin/xmax that's correct for future, when all the
1355 : : * currently running transactions have finished. We'll update both
1356 : : * while waiting for the pending transactions to finish.
1357 : : */
3265 tgl@sss.pgh.pa.us 1358 : 13 : builder->xmin = running->nextXid; /* < are finished */
1359 : 13 : builder->xmax = running->nextXid; /* >= are running */
1360 : :
1361 : : /* so we can safely use the faster comparisons */
4471 rhaas@postgresql.org 1362 [ - + ]: 13 : Assert(TransactionIdIsNormal(builder->xmin));
1363 [ - + ]: 13 : Assert(TransactionIdIsNormal(builder->xmax));
1364 : :
1365 [ + - ]: 13 : ereport(LOG,
1366 : : errmsg("logical decoding found initial starting point at %X/%08X",
1367 : : LSN_FORMAT_ARGS(lsn)),
1368 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1369 : : running->xcnt, running->nextXid));
1370 : :
3304 andres@anarazel.de 1371 : 13 : SnapBuildWaitSnapshot(running, running->nextXid);
1372 : : }
1373 : :
1374 : : /*
1375 : : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1376 : : *
1377 : : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1378 : : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1379 : : * means all transactions starting afterwards have enough information to
1380 : : * be decoded. Switch to FULL_SNAPSHOT.
1381 : : */
1382 [ + + + + ]: 17 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1930 1383 : 6 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1384 : : running->oldestRunningXid))
1385 : : {
3304 1386 : 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1930 1387 : 5 : builder->next_phase_at = running->nextXid;
1388 : :
3304 1389 [ + - ]: 5 : ereport(LOG,
1390 : : errmsg("logical decoding found initial consistent point at %X/%08X",
1391 : : LSN_FORMAT_ARGS(lsn)),
1392 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1393 : : running->xcnt, running->nextXid));
1394 : :
1395 : 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1396 : : }
1397 : :
1398 : : /*
1399 : : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1400 : : *
1401 : : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1402 : : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1403 : : * transactions that are currently in progress have a catalog snapshot,
1404 : : * and all their changes have been collected. Switch to CONSISTENT.
1405 : : */
1406 [ + + + - ]: 11 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1930 1407 : 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1408 : : running->oldestRunningXid))
1409 : : {
3304 1410 : 5 : builder->state = SNAPBUILD_CONSISTENT;
1930 1411 : 5 : builder->next_phase_at = InvalidTransactionId;
1412 : :
50 fujii@postgresql.org 1413 [ + - - + ]:GNC 5 : ereport(LogicalDecodingLogLevel(),
1414 : : errmsg("logical decoding found consistent point at %X/%08X",
1415 : : LSN_FORMAT_ARGS(lsn)),
1416 : : errdetail("There are no old transactions anymore."));
1417 : : }
1418 : :
1419 : : /*
1420 : : * We already started to track running xacts and need to wait for all
1421 : : * in-progress ones to finish. We fall through to the normal processing of
1422 : : * records so incremental cleanup can be performed.
1423 : : */
4471 rhaas@postgresql.org 1424 :CBC 22 : return true;
1425 : : }
1426 : :
1427 : : /* ---
1428 : : * Iterate through xids in record, wait for all older than the cutoff to
1429 : : * finish. Then, if possible, log a new xl_running_xacts record.
1430 : : *
1431 : : * This isn't required for the correctness of decoding, but to:
1432 : : * a) allow isolationtester to notice that we're currently waiting for
1433 : : * something.
1434 : : * b) log a new xl_running_xacts record where it'd be helpful, without having
1435 : : * to wait for bgwriter or checkpointer.
1436 : : * ---
1437 : : */
1438 : : static void
3304 andres@anarazel.de 1439 : 18 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1440 : : {
1441 : : int off;
1442 : :
1443 [ + + ]: 34 : for (off = 0; off < running->xcnt; off++)
1444 : : {
1445 : 18 : TransactionId xid = running->xids[off];
1446 : :
1447 : : /*
1448 : : * Upper layers should prevent that we ever need to wait on ourselves.
1449 : : * Check anyway, since failing to do so would either result in an
1450 : : * endless wait or an Assert() failure.
1451 : : */
1452 [ - + ]: 18 : if (TransactionIdIsCurrentTransactionId(xid))
3304 andres@anarazel.de 1453 [ # # ]:UBC 0 : elog(ERROR, "waiting for ourselves");
1454 : :
3304 andres@anarazel.de 1455 [ - + ]:CBC 18 : if (TransactionIdFollows(xid, cutoff))
3304 andres@anarazel.de 1456 :UBC 0 : continue;
1457 : :
3304 andres@anarazel.de 1458 :CBC 18 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1459 : : }
1460 : :
1461 : : /*
1462 : : * All transactions we needed to finish finished - try to ensure there is
1463 : : * another xl_running_xacts record in a timely manner, without having to
1464 : : * wait for bgwriter or checkpointer to log one. During recovery we can't
1465 : : * enforce that, so we'll have to wait.
1466 : : */
1467 [ + - ]: 16 : if (!RecoveryInProgress())
1468 : : {
7 alvherre@kurilemu.de 1469 : 16 : LogStandbySnapshot();
1470 : : }
3304 andres@anarazel.de 1471 : 16 : }
1472 : :
1473 : : #define SnapBuildOnDiskConstantSize \
1474 : : offsetof(SnapBuildOnDisk, builder)
1475 : : #define SnapBuildOnDiskNotChecksummedSize \
1476 : : offsetof(SnapBuildOnDisk, version)
1477 : :
1478 : : #define SNAPBUILD_MAGIC 0x51A1E001
1479 : : #define SNAPBUILD_VERSION 6
1480 : :
1481 : : /*
1482 : : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1483 : : *
1484 : : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1485 : : * a record that's a potential location for a serialized snapshot.
1486 : : */
1487 : : void
4471 rhaas@postgresql.org 1488 : 79 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1489 : : {
1490 [ - + ]: 79 : if (builder->state < SNAPBUILD_CONSISTENT)
4471 rhaas@postgresql.org 1491 :UBC 0 : SnapBuildRestore(builder, lsn);
1492 : : else
4471 rhaas@postgresql.org 1493 :CBC 79 : SnapBuildSerialize(builder, lsn);
1494 : 79 : }
1495 : :
1496 : : /*
1497 : : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1498 : : * been done by another decoding process.
1499 : : */
1500 : : static void
1501 : 561 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1502 : : {
1503 : : Size needed_length;
1963 akapila@postgresql.o 1504 : 561 : SnapBuildOnDisk *ondisk = NULL;
1388 1505 : 561 : TransactionId *catchange_xip = NULL;
1506 : : MemoryContext old_ctx;
1507 : : size_t catchange_xcnt;
1508 : : char *ondisk_c;
1509 : : int fd;
1510 : : char tmppath[MAXPGPATH];
1511 : : char path[MAXPGPATH];
1512 : : int ret;
1513 : : struct stat stat_buf;
1514 : : Size sz;
1515 : :
205 alvherre@kurilemu.de 1516 [ - + ]:GNC 561 : Assert(XLogRecPtrIsValid(lsn));
1517 [ + + - + ]: 561 : Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
1518 : : builder->last_serialized_snapshot <= lsn);
1519 : :
1520 : : /*
1521 : : * no point in serializing if we cannot continue to work immediately after
1522 : : * restoring the snapshot
1523 : : */
4471 rhaas@postgresql.org 1524 [ - + ]:CBC 561 : if (builder->state < SNAPBUILD_CONSISTENT)
4471 rhaas@postgresql.org 1525 :UBC 0 : return;
1526 : :
1527 : : /* consistent snapshots have no next phase */
1930 andres@anarazel.de 1528 [ - + ]:CBC 561 : Assert(builder->next_phase_at == InvalidTransactionId);
1529 : :
1530 : : /*
1531 : : * We identify snapshots by the LSN they are valid for. We don't need to
1532 : : * include timelines in the name as each LSN maps to exactly one timeline
1533 : : * unless the user used pg_resetwal or similar. If a user did so, there's
1534 : : * no hope continuing to decode anyway.
1535 : : */
638 michael@paquier.xyz 1536 : 561 : sprintf(path, "%s/%X-%X.snap",
1537 : : PG_LOGICAL_SNAPSHOTS_DIR,
1922 peter@eisentraut.org 1538 : 561 : LSN_FORMAT_ARGS(lsn));
1539 : :
1540 : : /*
1541 : : * first check whether some other backend already has written the snapshot
1542 : : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1543 : : * as a valid state. Everything else is an unexpected error.
1544 : : */
4471 rhaas@postgresql.org 1545 : 561 : ret = stat(path, &stat_buf);
1546 : :
1547 [ + + - + ]: 561 : if (ret != 0 && errno != ENOENT)
4471 rhaas@postgresql.org 1548 [ # # ]:UBC 0 : ereport(ERROR,
1549 : : (errcode_for_file_access(),
1550 : : errmsg("could not stat file \"%s\": %m", path)));
1551 : :
4471 rhaas@postgresql.org 1552 [ + + ]:CBC 561 : else if (ret == 0)
1553 : : {
1554 : : /*
1555 : : * somebody else has already serialized to this point, don't overwrite
1556 : : * but remember location, so we don't need to read old data again.
1557 : : *
1558 : : * To be sure it has been synced to disk after the rename() from the
1559 : : * tempfile filename to the real filename, we just repeat the fsync.
1560 : : * That ought to be cheap because in most scenarios it should already
1561 : : * be safely on disk.
1562 : : */
1563 : 243 : fsync_fname(path, false);
638 michael@paquier.xyz 1564 : 243 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1565 : :
4471 rhaas@postgresql.org 1566 : 243 : builder->last_serialized_snapshot = lsn;
1567 : 243 : goto out;
1568 : : }
1569 : :
1570 : : /*
1571 : : * there is an obvious race condition here between the time we stat(2) the
1572 : : * file and us writing the file. But we rename the file into place
1573 : : * atomically and all files created need to contain the same data anyway,
1574 : : * so this is perfectly fine, although a bit of a resource waste. Locking
1575 : : * seems like pointless complication.
1576 : : */
1577 [ + + ]: 318 : elog(DEBUG1, "serializing snapshot to %s", path);
1578 : :
1579 : : /* to make sure only we will write to this tempfile, include pid */
638 michael@paquier.xyz 1580 : 318 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1581 : : PG_LOGICAL_SNAPSHOTS_DIR,
1922 peter@eisentraut.org 1582 : 318 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1583 : :
1584 : : /*
1585 : : * Unlink temporary file if it already exists, needs to have been before a
1586 : : * crash/error since we won't enter this function twice from within a
1587 : : * single decoding slot/backend and the temporary file contains the pid of
1588 : : * the current process.
1589 : : */
4471 rhaas@postgresql.org 1590 [ + - - + ]: 318 : if (unlink(tmppath) != 0 && errno != ENOENT)
4471 rhaas@postgresql.org 1591 [ # # ]:UBC 0 : ereport(ERROR,
1592 : : (errcode_for_file_access(),
1593 : : errmsg("could not remove file \"%s\": %m", tmppath)));
1594 : :
1388 akapila@postgresql.o 1595 :CBC 318 : old_ctx = MemoryContextSwitchTo(builder->context);
1596 : :
1597 : : /* Get the catalog modifying transactions that are yet not committed */
1598 : 318 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1305 drowley@postgresql.o 1599 : 318 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1600 : :
4471 rhaas@postgresql.org 1601 : 318 : needed_length = sizeof(SnapBuildOnDisk) +
1388 akapila@postgresql.o 1602 : 318 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1603 : :
1604 : 318 : ondisk_c = palloc0(needed_length);
4471 rhaas@postgresql.org 1605 : 318 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1606 : 318 : ondisk->magic = SNAPBUILD_MAGIC;
1607 : 318 : ondisk->version = SNAPBUILD_VERSION;
1608 : 318 : ondisk->length = needed_length;
4225 heikki.linnakangas@i 1609 : 318 : INIT_CRC32C(ondisk->checksum);
1610 : 318 : COMP_CRC32C(ondisk->checksum,
1611 : : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1612 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
4471 rhaas@postgresql.org 1613 : 318 : ondisk_c += sizeof(SnapBuildOnDisk);
1614 : :
1615 : 318 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1616 : : /* NULL-ify memory-only data */
1617 : 318 : ondisk->builder.context = NULL;
1618 : 318 : ondisk->builder.snapshot = NULL;
1619 : 318 : ondisk->builder.reorder = NULL;
1620 : 318 : ondisk->builder.committed.xip = NULL;
1388 akapila@postgresql.o 1621 : 318 : ondisk->builder.catchange.xip = NULL;
1622 : : /* update catchange only on disk data */
1623 : 318 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1624 : :
4225 heikki.linnakangas@i 1625 : 318 : COMP_CRC32C(ondisk->checksum,
1626 : : &ondisk->builder,
1627 : : sizeof(SnapBuild));
1628 : :
1629 : : /* copy committed xacts */
1388 akapila@postgresql.o 1630 [ + + ]: 318 : if (builder->committed.xcnt > 0)
1631 : : {
1632 : 55 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1633 : 55 : memcpy(ondisk_c, builder->committed.xip, sz);
1634 : 55 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1635 : 55 : ondisk_c += sz;
1636 : : }
1637 : :
1638 : : /* copy catalog modifying xacts */
1639 [ + + ]: 318 : if (catchange_xcnt > 0)
1640 : : {
1641 : 8 : sz = sizeof(TransactionId) * catchange_xcnt;
1642 : 8 : memcpy(ondisk_c, catchange_xip, sz);
1643 : 8 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1644 : 8 : ondisk_c += sz;
1645 : : }
1646 : :
4217 andres@anarazel.de 1647 : 318 : FIN_CRC32C(ondisk->checksum);
1648 : :
1649 : : /* we have valid data now, open tempfile and write it there */
4471 rhaas@postgresql.org 1650 : 318 : fd = OpenTransientFile(tmppath,
1651 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1652 [ - + ]: 318 : if (fd < 0)
4471 rhaas@postgresql.org 1653 [ # # ]:UBC 0 : ereport(ERROR,
1654 : : (errcode_for_file_access(),
1655 : : errmsg("could not open file \"%s\": %m", tmppath)));
1656 : :
2855 michael@paquier.xyz 1657 :CBC 318 : errno = 0;
3360 rhaas@postgresql.org 1658 : 318 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
4471 1659 [ - + ]: 318 : if ((write(fd, ondisk, needed_length)) != needed_length)
1660 : : {
2896 michael@paquier.xyz 1661 :UBC 0 : int save_errno = errno;
1662 : :
4471 rhaas@postgresql.org 1663 : 0 : CloseTransientFile(fd);
1664 : :
1665 : : /* if write didn't set errno, assume problem is no disk space */
2896 michael@paquier.xyz 1666 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4471 rhaas@postgresql.org 1667 [ # # ]: 0 : ereport(ERROR,
1668 : : (errcode_for_file_access(),
1669 : : errmsg("could not write to file \"%s\": %m", tmppath)));
1670 : : }
3360 rhaas@postgresql.org 1671 :CBC 318 : pgstat_report_wait_end();
1672 : :
1673 : : /*
1674 : : * fsync the file before renaming so that even if we crash after this we
1675 : : * have either a fully valid file or nothing.
1676 : : *
1677 : : * It's safe to just ERROR on fsync() here because we'll retry the whole
1678 : : * operation including the writes.
1679 : : *
1680 : : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1681 : : * some noticeable overhead since it's performed synchronously during
1682 : : * decoding?
1683 : : */
1684 : 318 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
4471 1685 [ - + ]: 318 : if (pg_fsync(fd) != 0)
1686 : : {
2896 michael@paquier.xyz 1687 :UBC 0 : int save_errno = errno;
1688 : :
4471 rhaas@postgresql.org 1689 : 0 : CloseTransientFile(fd);
2896 michael@paquier.xyz 1690 : 0 : errno = save_errno;
4471 rhaas@postgresql.org 1691 [ # # ]: 0 : ereport(ERROR,
1692 : : (errcode_for_file_access(),
1693 : : errmsg("could not fsync file \"%s\": %m", tmppath)));
1694 : : }
3360 rhaas@postgresql.org 1695 :CBC 318 : pgstat_report_wait_end();
1696 : :
2520 peter@eisentraut.org 1697 [ - + ]: 318 : if (CloseTransientFile(fd) != 0)
2639 michael@paquier.xyz 1698 [ # # ]:UBC 0 : ereport(ERROR,
1699 : : (errcode_for_file_access(),
1700 : : errmsg("could not close file \"%s\": %m", tmppath)));
1701 : :
638 michael@paquier.xyz 1702 :CBC 318 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1703 : :
1704 : : /*
1705 : : * We may overwrite the work from some other backend, but that's ok, our
1706 : : * snapshot is valid as well, we'll just have done some superfluous work.
1707 : : */
4471 rhaas@postgresql.org 1708 [ - + ]: 318 : if (rename(tmppath, path) != 0)
1709 : : {
4471 rhaas@postgresql.org 1710 [ # # ]:UBC 0 : ereport(ERROR,
1711 : : (errcode_for_file_access(),
1712 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1713 : : tmppath, path)));
1714 : : }
1715 : :
1716 : : /* make sure we persist */
4471 rhaas@postgresql.org 1717 :CBC 318 : fsync_fname(path, false);
638 michael@paquier.xyz 1718 : 318 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1719 : :
1720 : : /*
1721 : : * Now there's no way we can lose the dumped state anymore, remember this
1722 : : * as a serialization point.
1723 : : */
4471 rhaas@postgresql.org 1724 : 318 : builder->last_serialized_snapshot = lsn;
1725 : :
1388 akapila@postgresql.o 1726 : 318 : MemoryContextSwitchTo(old_ctx);
1727 : :
4471 rhaas@postgresql.org 1728 : 561 : out:
1729 : 561 : ReorderBufferSetRestartPoint(builder->reorder,
1730 : : builder->last_serialized_snapshot);
1731 : : /* be tidy */
1963 akapila@postgresql.o 1732 [ + + ]: 561 : if (ondisk)
1733 : 318 : pfree(ondisk);
1388 1734 [ + + ]: 561 : if (catchange_xip)
1735 : 8 : pfree(catchange_xip);
1736 : : }
1737 : :
1738 : : /*
1739 : : * Restore the logical snapshot file contents to 'ondisk'.
1740 : : *
1741 : : * 'context' is the memory context where the catalog modifying/committed xid
1742 : : * will live.
1743 : : * If 'missing_ok' is true, will not throw an error if the file is not found.
1744 : : */
1745 : : bool
445 msawada@postgresql.o 1746 : 21 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1747 : : MemoryContext context, bool missing_ok)
1748 : : {
1749 : : int fd;
1750 : : pg_crc32c checksum;
1751 : : Size sz;
1752 : : char path[MAXPGPATH];
1753 : :
1754 : 21 : sprintf(path, "%s/%X-%X.snap",
1755 : : PG_LOGICAL_SNAPSHOTS_DIR,
1756 : 21 : LSN_FORMAT_ARGS(lsn));
1757 : :
3171 peter_e@gmx.net 1758 : 21 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1759 : :
593 msawada@postgresql.o 1760 [ + + ]: 21 : if (fd < 0)
1761 : : {
1762 [ + - + - ]: 11 : if (missing_ok && errno == ENOENT)
1763 : 11 : return false;
1764 : :
4471 rhaas@postgresql.org 1765 [ # # ]:UBC 0 : ereport(ERROR,
1766 : : (errcode_for_file_access(),
1767 : : errmsg("could not open file \"%s\": %m", path)));
1768 : : }
1769 : :
1770 : : /* ----
1771 : : * Make sure the snapshot had been stored safely to disk, that's normally
1772 : : * cheap.
1773 : : * Note that we do not need PANIC here, nobody will be able to use the
1774 : : * slot without fsyncing, and saving it won't succeed without an fsync()
1775 : : * either...
1776 : : * ----
1777 : : */
4471 rhaas@postgresql.org 1778 :CBC 10 : fsync_fname(path, false);
638 michael@paquier.xyz 1779 : 10 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1780 : :
1781 : : /* read statically sized portion of snapshot */
613 peter@eisentraut.org 1782 : 10 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1783 : :
593 msawada@postgresql.o 1784 [ - + ]: 10 : if (ondisk->magic != SNAPBUILD_MAGIC)
4471 rhaas@postgresql.org 1785 [ # # ]:UBC 0 : ereport(ERROR,
1786 : : (errcode(ERRCODE_DATA_CORRUPTED),
1787 : : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1788 : : path, ondisk->magic, SNAPBUILD_MAGIC)));
1789 : :
593 msawada@postgresql.o 1790 [ - + ]:CBC 10 : if (ondisk->version != SNAPBUILD_VERSION)
4471 rhaas@postgresql.org 1791 [ # # ]:UBC 0 : ereport(ERROR,
1792 : : (errcode(ERRCODE_DATA_CORRUPTED),
1793 : : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1794 : : path, ondisk->version, SNAPBUILD_VERSION)));
1795 : :
4225 heikki.linnakangas@i 1796 :CBC 10 : INIT_CRC32C(checksum);
1797 : 10 : COMP_CRC32C(checksum,
1798 : : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1799 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1800 : :
1801 : : /* read SnapBuild */
613 peter@eisentraut.org 1802 : 10 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
593 msawada@postgresql.o 1803 : 10 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1804 : :
1805 : : /* restore committed xacts information */
1806 [ + + ]: 10 : if (ondisk->builder.committed.xcnt > 0)
1807 : : {
1808 : 5 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1809 : 5 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
613 peter@eisentraut.org 1810 : 5 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
593 msawada@postgresql.o 1811 : 5 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1812 : : }
1813 : :
1814 : : /* restore catalog modifying xacts information */
1815 [ + + ]: 10 : if (ondisk->builder.catchange.xcnt > 0)
1816 : : {
1817 : 4 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1818 : 4 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
613 peter@eisentraut.org 1819 : 4 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
593 msawada@postgresql.o 1820 : 4 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1821 : : }
1822 : :
2520 peter@eisentraut.org 1823 [ - + ]: 10 : if (CloseTransientFile(fd) != 0)
2639 michael@paquier.xyz 1824 [ # # ]:UBC 0 : ereport(ERROR,
1825 : : (errcode_for_file_access(),
1826 : : errmsg("could not close file \"%s\": %m", path)));
1827 : :
4217 andres@anarazel.de 1828 :CBC 10 : FIN_CRC32C(checksum);
1829 : :
1830 : : /* verify checksum of what we've read */
593 msawada@postgresql.o 1831 [ - + ]: 10 : if (!EQ_CRC32C(checksum, ondisk->checksum))
4471 rhaas@postgresql.org 1832 [ # # ]:UBC 0 : ereport(ERROR,
1833 : : (errcode(ERRCODE_DATA_CORRUPTED),
1834 : : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1835 : : path, checksum, ondisk->checksum)));
1836 : :
593 msawada@postgresql.o 1837 :CBC 10 : return true;
1838 : : }
1839 : :
1840 : : /*
1841 : : * Restore a snapshot into 'builder' if previously one has been stored at the
1842 : : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1843 : : */
1844 : : static bool
1845 : 19 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1846 : : {
1847 : : SnapBuildOnDisk ondisk;
1848 : :
1849 : : /* no point in loading a snapshot if we're already there */
1850 [ - + ]: 19 : if (builder->state == SNAPBUILD_CONSISTENT)
593 msawada@postgresql.o 1851 :UBC 0 : return false;
1852 : :
1853 : : /* validate and restore the snapshot to 'ondisk' */
445 msawada@postgresql.o 1854 [ + + ]:CBC 19 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
593 1855 : 11 : return false;
1856 : :
1857 : : /*
1858 : : * ok, we now have a sensible snapshot here, figure out if it has more
1859 : : * information than we have.
1860 : : */
1861 : :
1862 : : /*
1863 : : * We are only interested in consistent snapshots for now, comparing
1864 : : * whether one incomplete snapshot is more "advanced" seems to be
1865 : : * unnecessarily complex.
1866 : : */
4471 rhaas@postgresql.org 1867 [ - + ]: 8 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
4471 rhaas@postgresql.org 1868 :UBC 0 : goto snapshot_not_interesting;
1869 : :
1870 : : /*
1871 : : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1872 : : * be available.
1873 : : */
4471 rhaas@postgresql.org 1874 [ - + ]:CBC 8 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
4471 rhaas@postgresql.org 1875 :UBC 0 : goto snapshot_not_interesting;
1876 : :
1877 : : /*
1878 : : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1879 : : * possible that an old value may remain.
1880 : : */
1930 andres@anarazel.de 1881 [ - + ]:CBC 8 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1061 dgustafsson@postgres 1882 : 8 : builder->next_phase_at = InvalidTransactionId;
1883 : :
1884 : : /* ok, we think the snapshot is sensible, copy over everything important */
4471 rhaas@postgresql.org 1885 : 8 : builder->xmin = ondisk.builder.xmin;
1886 : 8 : builder->xmax = ondisk.builder.xmax;
1887 : 8 : builder->state = ondisk.builder.state;
1888 : :
1889 : 8 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1890 : : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1891 : : /* don't overwrite preallocated xip, if we don't have anything here */
1892 [ + + ]: 8 : if (builder->committed.xcnt > 0)
1893 : : {
1894 : 3 : pfree(builder->committed.xip);
1895 : 3 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1896 : 3 : builder->committed.xip = ondisk.builder.committed.xip;
1897 : : }
1898 : 8 : ondisk.builder.committed.xip = NULL;
1899 : :
1900 : : /* set catalog modifying transactions */
1388 akapila@postgresql.o 1901 [ - + ]: 8 : if (builder->catchange.xip)
1388 akapila@postgresql.o 1902 :UBC 0 : pfree(builder->catchange.xip);
1388 akapila@postgresql.o 1903 :CBC 8 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1904 : 8 : builder->catchange.xip = ondisk.builder.catchange.xip;
1905 : 8 : ondisk.builder.catchange.xip = NULL;
1906 : :
1907 : : /* our snapshot is not interesting anymore, build a new one */
4471 rhaas@postgresql.org 1908 [ - + ]: 8 : if (builder->snapshot != NULL)
1909 : : {
4471 rhaas@postgresql.org 1910 :UBC 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1911 : : }
3272 andres@anarazel.de 1912 :CBC 8 : builder->snapshot = SnapBuildBuildSnapshot(builder);
4471 rhaas@postgresql.org 1913 : 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1914 : :
1915 : 8 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1916 : :
1917 [ - + ]: 8 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1918 : :
50 fujii@postgresql.org 1919 [ + + + + ]:GNC 8 : ereport(LogicalDecodingLogLevel(),
1920 : : errmsg("logical decoding found consistent point at %X/%08X",
1921 : : LSN_FORMAT_ARGS(lsn)),
1922 : : errdetail("Logical decoding will begin using saved snapshot."));
4471 rhaas@postgresql.org 1923 :CBC 8 : return true;
1924 : :
4471 rhaas@postgresql.org 1925 :UBC 0 : snapshot_not_interesting:
1926 [ # # ]: 0 : if (ondisk.builder.committed.xip != NULL)
1927 : 0 : pfree(ondisk.builder.committed.xip);
1388 akapila@postgresql.o 1928 [ # # ]: 0 : if (ondisk.builder.catchange.xip != NULL)
1929 : 0 : pfree(ondisk.builder.catchange.xip);
4471 rhaas@postgresql.org 1930 : 0 : return false;
1931 : : }
1932 : :
1933 : : /*
1934 : : * Read the contents of the serialized snapshot to 'dest'.
1935 : : */
1936 : : static void
613 peter@eisentraut.org 1937 :CBC 29 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1938 : : {
1939 : : int readBytes;
1940 : :
1388 akapila@postgresql.o 1941 : 29 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1942 : 29 : readBytes = read(fd, dest, size);
1943 : 29 : pgstat_report_wait_end();
1944 [ - + ]: 29 : if (readBytes != size)
1945 : : {
1388 akapila@postgresql.o 1946 :UBC 0 : int save_errno = errno;
1947 : :
1948 : 0 : CloseTransientFile(fd);
1949 : :
1950 [ # # ]: 0 : if (readBytes < 0)
1951 : : {
1952 : 0 : errno = save_errno;
1953 [ # # ]: 0 : ereport(ERROR,
1954 : : (errcode_for_file_access(),
1955 : : errmsg("could not read file \"%s\": %m", path)));
1956 : : }
1957 : : else
1958 [ # # ]: 0 : ereport(ERROR,
1959 : : (errcode(ERRCODE_DATA_CORRUPTED),
1960 : : errmsg("could not read file \"%s\": read %d of %zu",
1961 : : path, readBytes, size)));
1962 : : }
1388 akapila@postgresql.o 1963 :CBC 29 : }
1964 : :
1965 : : /*
1966 : : * Remove all serialized snapshots that are not required anymore because no
1967 : : * slot can need them. This doesn't actually have to run during a checkpoint,
1968 : : * but it's a convenient point to schedule this.
1969 : : *
1970 : : * NB: We run this during checkpoints even if logical decoding is disabled so
1971 : : * we cleanup old slots at some point after it got disabled.
1972 : : */
1973 : : void
4471 rhaas@postgresql.org 1974 : 1950 : CheckPointSnapBuild(void)
1975 : : {
1976 : : XLogRecPtr cutoff;
1977 : : XLogRecPtr redo;
1978 : : DIR *snap_dir;
1979 : : struct dirent *snap_de;
1980 : : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1981 : :
1982 : : /*
1983 : : * We start off with a minimum of the last redo pointer. No new
1984 : : * replication slot will start before that, so that's a safe upper bound
1985 : : * for removal.
1986 : : */
1987 : 1950 : redo = GetRedoRecPtr();
1988 : :
1989 : : /* now check for the restart ptrs from existing slots */
1990 : 1950 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1991 : :
1992 : : /* don't start earlier than the restart lsn */
1993 [ + + ]: 1950 : if (redo < cutoff)
1994 : 1 : cutoff = redo;
1995 : :
638 michael@paquier.xyz 1996 : 1950 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1997 [ + + ]: 6145 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1998 : : {
1999 : : uint32 hi;
2000 : : uint32 lo;
2001 : : XLogRecPtr lsn;
2002 : : PGFileType de_type;
2003 : :
4471 rhaas@postgresql.org 2004 [ + + ]: 4195 : if (strcmp(snap_de->d_name, ".") == 0 ||
2005 [ + + ]: 2245 : strcmp(snap_de->d_name, "..") == 0)
2006 : 3900 : continue;
2007 : :
638 michael@paquier.xyz 2008 : 295 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
1366 2009 : 295 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2010 : :
2011 [ + - - + ]: 295 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2012 : : {
4471 rhaas@postgresql.org 2013 [ # # ]:UBC 0 : elog(DEBUG1, "only regular files expected: %s", path);
2014 : 0 : continue;
2015 : : }
2016 : :
2017 : : /*
2018 : : * temporary filenames from SnapBuildSerialize() include the LSN and
2019 : : * everything but are postfixed by .$pid.tmp. We can just remove them
2020 : : * the same as other files because there can be none that are
2021 : : * currently being written that are older than cutoff.
2022 : : *
2023 : : * We just log a message if a file doesn't fit the pattern, it's
2024 : : * probably some editors lock/state file or similar...
2025 : : */
4471 rhaas@postgresql.org 2026 [ - + ]:CBC 295 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2027 : : {
4471 rhaas@postgresql.org 2028 [ # # ]:UBC 0 : ereport(LOG,
2029 : : (errmsg("could not parse file name \"%s\"", path)));
2030 : 0 : continue;
2031 : : }
2032 : :
4471 rhaas@postgresql.org 2033 :CBC 295 : lsn = ((uint64) hi) << 32 | lo;
2034 : :
2035 : : /* check whether we still need it */
205 alvherre@kurilemu.de 2036 [ + + + + ]:GNC 295 : if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2037 : : {
4471 rhaas@postgresql.org 2038 [ + + ]:CBC 192 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2039 : :
2040 : : /*
2041 : : * It's not particularly harmful, though strange, if we can't
2042 : : * remove the file here. Don't prevent the checkpoint from
2043 : : * completing, that'd be a cure worse than the disease.
2044 : : */
2045 [ - + ]: 192 : if (unlink(path) < 0)
2046 : : {
4471 rhaas@postgresql.org 2047 [ # # ]:UBC 0 : ereport(LOG,
2048 : : (errcode_for_file_access(),
2049 : : errmsg("could not remove file \"%s\": %m",
2050 : : path)));
2051 : 0 : continue;
2052 : : }
2053 : : }
2054 : : }
4471 rhaas@postgresql.org 2055 :CBC 1950 : FreeDir(snap_dir);
2056 : 1950 : }
2057 : :
2058 : : /*
2059 : : * Check if a logical snapshot at the specified point has been serialized.
2060 : : */
2061 : : bool
787 akapila@postgresql.o 2062 : 12 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2063 : : {
2064 : : char path[MAXPGPATH];
2065 : : int ret;
2066 : : struct stat stat_buf;
2067 : :
289 fujii@postgresql.org 2068 : 12 : sprintf(path, "%s/%X-%X.snap",
2069 : : PG_LOGICAL_SNAPSHOTS_DIR,
787 akapila@postgresql.o 2070 : 12 : LSN_FORMAT_ARGS(lsn));
2071 : :
2072 : 12 : ret = stat(path, &stat_buf);
2073 : :
2074 [ + + - + ]: 12 : if (ret != 0 && errno != ENOENT)
787 akapila@postgresql.o 2075 [ # # ]:UBC 0 : ereport(ERROR,
2076 : : (errcode_for_file_access(),
2077 : : errmsg("could not stat file \"%s\": %m", path)));
2078 : :
787 akapila@postgresql.o 2079 :CBC 12 : return ret == 0;
2080 : : }
|