Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * snapbuild.c
4 : : *
5 : : * Infrastructure for building historic catalog snapshots based on contents
6 : : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : : * WAL.
8 : : *
9 : : * NOTES:
10 : : *
11 : : * We build snapshots which can *only* be used to read catalog contents and we
12 : : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : : * at the time the XLogRecord was generated.
15 : : *
16 : : * To build the snapshots we reuse the infrastructure built for Hot
17 : : * Standby. The in-memory snapshots we build look different than HS' because
18 : : * we have different needs. To successfully decode data from the WAL we only
19 : : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : : * tables since the data we decode is wholly contained in the WAL
21 : : * records. Also, our snapshots need to be different in comparison to normal
22 : : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : : * pg_subtrans for information about committed transactions because they might
24 : : * commit in the future from the POV of the WAL entry we're currently
25 : : * decoding. This definition has the advantage that we only need to prevent
26 : : * removal of catalog rows, while normal table's rows can still be
27 : : * removed. This is achieved by using the replication slot mechanism.
28 : : *
29 : : * As the percentage of transactions modifying the catalog normally is fairly
30 : : * small in comparisons to ones only manipulating user data, we keep track of
31 : : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : : * track of all running transactions like it's done in a normal snapshot. Note
33 : : * that we're generally only looking at transactions that have acquired an
34 : : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : : * that we consider committed, everything else is considered aborted/in
36 : : * progress. That also allows us not to care about subtransactions before they
37 : : * have committed which means this module, in contrast to HS, doesn't have to
38 : : * care about suboverflowed subtransactions and similar.
39 : : *
40 : : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : : * transactions we need Snapshots that see intermediate versions of the
42 : : * catalog in a transaction. During normal operation this is achieved by using
43 : : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : : * reset on crash recovery. Even if we could, we could not decode combocids
47 : : * which are only tracked in the original backend's memory. To work around
48 : : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : : * catalog row is modified, which includes the cmin and cmax of the
50 : : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : : * ResolveCminCmaxDuringDecoding() for details.
54 : : *
55 : : * To facilitate all this we need our own visibility routine, as the normal
56 : : * ones are optimized for different usecases.
57 : : *
58 : : * To replace the normal catalog snapshots with decoding ones use the
59 : : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : : *
61 : : *
62 : : *
63 : : * The snapbuild machinery is starting up in several stages, as illustrated
64 : : * by the following graph describing the SnapBuild->state transitions:
65 : : *
66 : : * +-------------------------+
67 : : * +----| START |-------------+
68 : : * | +-------------------------+ |
69 : : * | | |
70 : : * | | |
71 : : * | running_xacts #1 |
72 : : * | | |
73 : : * | | |
74 : : * | v |
75 : : * | +-------------------------+ v
76 : : * | | BUILDING_SNAPSHOT |------------>|
77 : : * | +-------------------------+ |
78 : : * | | |
79 : : * | | |
80 : : * | running_xacts #2, xacts from #1 finished |
81 : : * | | |
82 : : * | | |
83 : : * | v |
84 : : * | +-------------------------+ v
85 : : * | | FULL_SNAPSHOT |------------>|
86 : : * | +-------------------------+ |
87 : : * | | |
88 : : * running_xacts | saved snapshot
89 : : * with zero xacts | at running_xacts's lsn
90 : : * | | |
91 : : * | running_xacts with xacts from #2 finished |
92 : : * | | |
93 : : * | v |
94 : : * | +-------------------------+ |
95 : : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : : * +-------------------------+
97 : : *
98 : : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : : * record is read that is sufficiently new (above the safe xmin horizon),
100 : : * there's a state transition. If there were no running xacts when the
101 : : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : : * snapshot means that all transactions that start henceforth can be decoded
104 : : * in their entirety, but transactions that started previously can't. In
105 : : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : : * running transactions have committed or aborted.
107 : : *
108 : : * Only transactions that commit after CONSISTENT state has been reached will
109 : : * be replayed, even though they might have started while still in
110 : : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : : * changes has been exported, but all the following ones will be. That point
112 : : * is a convenient point to initialize replication from, which is why we
113 : : * export a snapshot at that point, which *can* be used to read normal data.
114 : : *
115 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
116 : : *
117 : : * IDENTIFICATION
118 : : * src/backend/replication/logical/snapbuild.c
119 : : *
120 : : *-------------------------------------------------------------------------
121 : : */
122 : :
123 : : #include "postgres.h"
124 : :
125 : : #include <sys/stat.h>
126 : : #include <unistd.h>
127 : :
128 : : #include "access/heapam_xlog.h"
129 : : #include "access/transam.h"
130 : : #include "access/xact.h"
131 : : #include "common/file_utils.h"
132 : : #include "miscadmin.h"
133 : : #include "pgstat.h"
134 : : #include "replication/logical.h"
135 : : #include "replication/reorderbuffer.h"
136 : : #include "replication/snapbuild.h"
137 : : #include "replication/snapbuild_internal.h"
138 : : #include "storage/fd.h"
139 : : #include "storage/lmgr.h"
140 : : #include "storage/proc.h"
141 : : #include "storage/procarray.h"
142 : : #include "storage/standby.h"
143 : : #include "utils/builtins.h"
144 : : #include "utils/memutils.h"
145 : : #include "utils/snapmgr.h"
146 : : #include "utils/snapshot.h"
147 : : #include "utils/wait_event.h"
148 : :
149 : :
150 : : /*
151 : : * Starting a transaction -- which we need to do while exporting a snapshot --
152 : : * removes knowledge about the previously used resowner, so we save it here.
153 : : */
154 : : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
155 : : static bool ExportInProgress = false;
156 : :
157 : : /* ->committed and ->catchange manipulation */
158 : : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
159 : :
160 : : /* snapshot building/manipulation/distribution functions */
161 : : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
162 : :
163 : : static void SnapBuildFreeSnapshot(Snapshot snap);
164 : :
165 : : static void SnapBuildSnapIncRefcount(Snapshot snap);
166 : :
167 : : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
168 : :
169 : : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
170 : : uint32 xinfo);
171 : :
172 : : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
173 : : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
174 : : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
175 : :
176 : : /* serialization functions */
177 : : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
178 : : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
179 : : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
180 : :
181 : : /*
182 : : * Allocate a new snapshot builder.
183 : : *
184 : : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
185 : : * removed, start_lsn is the LSN >= we want to replay commits.
186 : : */
187 : : SnapBuild *
4395 rhaas@postgresql.org 188 :CBC 1163 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
189 : : TransactionId xmin_horizon,
190 : : XLogRecPtr start_lsn,
191 : : bool need_full_snapshot,
192 : : bool in_slot_creation,
193 : : XLogRecPtr two_phase_at)
194 : : {
195 : : MemoryContext context;
196 : : MemoryContext oldcontext;
197 : : SnapBuild *builder;
198 : :
199 : : /* allocate memory in own context, to have better accountability */
200 : 1163 : context = AllocSetContextCreate(CurrentMemoryContext,
201 : : "snapshot builder context",
202 : : ALLOCSET_DEFAULT_SIZES);
203 : 1163 : oldcontext = MemoryContextSwitchTo(context);
204 : :
95 michael@paquier.xyz 205 :GNC 1163 : builder = palloc0_object(SnapBuild);
206 : :
4395 rhaas@postgresql.org 207 :CBC 1163 : builder->state = SNAPBUILD_START;
208 : 1163 : builder->context = context;
209 : 1163 : builder->reorder = reorder;
210 : : /* Other struct members initialized by zeroing via palloc0 above */
211 : :
212 : 1163 : builder->committed.xcnt = 0;
3189 tgl@sss.pgh.pa.us 213 : 1163 : builder->committed.xcnt_space = 128; /* arbitrary number */
4395 rhaas@postgresql.org 214 : 1163 : builder->committed.xip =
9 msawada@postgresql.o 215 :GNC 1163 : palloc0_array(TransactionId, builder->committed.xcnt_space);
4395 rhaas@postgresql.org 216 :CBC 1163 : builder->committed.includes_all_transactions = true;
217 : :
1312 akapila@postgresql.o 218 : 1163 : builder->catchange.xcnt = 0;
219 : 1163 : builder->catchange.xip = NULL;
220 : :
4395 rhaas@postgresql.org 221 : 1163 : builder->initial_xmin_horizon = xmin_horizon;
4301 andres@anarazel.de 222 : 1163 : builder->start_decoding_at = start_lsn;
612 msawada@postgresql.o 223 : 1163 : builder->in_slot_creation = in_slot_creation;
3244 andres@anarazel.de 224 : 1163 : builder->building_full_snapshot = need_full_snapshot;
1705 akapila@postgresql.o 225 : 1163 : builder->two_phase_at = two_phase_at;
226 : :
4395 rhaas@postgresql.org 227 : 1163 : MemoryContextSwitchTo(oldcontext);
228 : :
229 : 1163 : return builder;
230 : : }
231 : :
232 : : /*
233 : : * Free a snapshot builder.
234 : : */
235 : : void
236 : 922 : FreeSnapshotBuilder(SnapBuild *builder)
237 : : {
238 : 922 : MemoryContext context = builder->context;
239 : :
240 : : /* free snapshot explicitly, that contains some error checking */
241 [ + + ]: 922 : if (builder->snapshot != NULL)
242 : : {
243 : 221 : SnapBuildSnapDecRefcount(builder->snapshot);
244 : 221 : builder->snapshot = NULL;
245 : : }
246 : :
247 : : /* other resources are deallocated via memory context reset */
248 : 922 : MemoryContextDelete(context);
249 : 922 : }
250 : :
251 : : /*
252 : : * Free an unreferenced snapshot that has previously been built by us.
253 : : */
254 : : static void
255 : 1594 : SnapBuildFreeSnapshot(Snapshot snap)
256 : : {
257 : : /* make sure we don't get passed an external snapshot */
2610 andres@anarazel.de 258 [ - + ]: 1594 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
259 : :
260 : : /* make sure nobody modified our snapshot */
4395 rhaas@postgresql.org 261 [ - + ]: 1594 : Assert(snap->curcid == FirstCommandId);
262 [ - + ]: 1594 : Assert(!snap->suboverflowed);
263 [ - + ]: 1594 : Assert(!snap->takenDuringRecovery);
3986 heikki.linnakangas@i 264 [ - + ]: 1594 : Assert(snap->regd_count == 0);
265 : :
266 : : /* slightly more likely, so it's checked even without c-asserts */
4395 rhaas@postgresql.org 267 [ - + ]: 1594 : if (snap->copied)
4395 rhaas@postgresql.org 268 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
269 : :
4395 rhaas@postgresql.org 270 [ - + ]:CBC 1594 : if (snap->active_count)
4395 rhaas@postgresql.org 271 [ # # ]:UBC 0 : elog(ERROR, "cannot free an active snapshot");
272 : :
4395 rhaas@postgresql.org 273 :CBC 1594 : pfree(snap);
274 : 1594 : }
275 : :
276 : : /*
277 : : * In which state of snapshot building are we?
278 : : */
279 : : SnapBuildState
280 : 2167792 : SnapBuildCurrentState(SnapBuild *builder)
281 : : {
282 : 2167792 : return builder->state;
283 : : }
284 : :
285 : : /*
286 : : * Return the LSN at which the two-phase decoding was first enabled.
287 : : */
288 : : XLogRecPtr
1705 akapila@postgresql.o 289 : 34 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
290 : : {
291 : 34 : return builder->two_phase_at;
292 : : }
293 : :
294 : : /*
295 : : * Set the LSN at which two-phase decoding is enabled.
296 : : */
297 : : void
298 : 8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
299 : : {
300 : 8 : builder->two_phase_at = ptr;
1840 301 : 8 : }
302 : :
303 : : /*
304 : : * Should the contents of transaction ending at 'ptr' be decoded?
305 : : */
306 : : bool
4395 rhaas@postgresql.org 307 : 393444 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
308 : : {
4301 andres@anarazel.de 309 : 393444 : return ptr < builder->start_decoding_at;
310 : : }
311 : :
312 : : /*
313 : : * Increase refcount of a snapshot.
314 : : *
315 : : * This is used when handing out a snapshot to some external resource or when
316 : : * adding a Snapshot as builder->snapshot.
317 : : */
318 : : static void
4395 rhaas@postgresql.org 319 : 6895 : SnapBuildSnapIncRefcount(Snapshot snap)
320 : : {
321 : 6895 : snap->active_count++;
322 : 6895 : }
323 : :
324 : : /*
325 : : * Decrease refcount of a snapshot and free if the refcount reaches zero.
326 : : *
327 : : * Externally visible, so that external resources that have been handed an
328 : : * IncRef'ed Snapshot can adjust its refcount easily.
329 : : */
330 : : void
331 : 6630 : SnapBuildSnapDecRefcount(Snapshot snap)
332 : : {
333 : : /* make sure we don't get passed an external snapshot */
2610 andres@anarazel.de 334 [ - + ]: 6630 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
335 : :
336 : : /* make sure nobody modified our snapshot */
4395 rhaas@postgresql.org 337 [ - + ]: 6630 : Assert(snap->curcid == FirstCommandId);
338 [ - + ]: 6630 : Assert(!snap->suboverflowed);
339 [ - + ]: 6630 : Assert(!snap->takenDuringRecovery);
340 : :
3986 heikki.linnakangas@i 341 [ - + ]: 6630 : Assert(snap->regd_count == 0);
342 : :
343 [ - + ]: 6630 : Assert(snap->active_count > 0);
344 : :
345 : : /* slightly more likely, so it's checked even without casserts */
4395 rhaas@postgresql.org 346 [ - + ]: 6630 : if (snap->copied)
4395 rhaas@postgresql.org 347 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
348 : :
4395 rhaas@postgresql.org 349 :CBC 6630 : snap->active_count--;
3986 heikki.linnakangas@i 350 [ + + ]: 6630 : if (snap->active_count == 0)
4395 rhaas@postgresql.org 351 : 1594 : SnapBuildFreeSnapshot(snap);
352 : 6630 : }
353 : :
354 : : /*
355 : : * Build a new snapshot, based on currently committed catalog-modifying
356 : : * transactions.
357 : : *
358 : : * In-progress transactions with catalog access are *not* allowed to modify
359 : : * these snapshots; they have to copy them and fill in appropriate ->curcid
360 : : * and ->subxip/subxcnt values.
361 : : */
362 : : static Snapshot
3196 andres@anarazel.de 363 : 2050 : SnapBuildBuildSnapshot(SnapBuild *builder)
364 : : {
365 : : Snapshot snapshot;
366 : : Size ssize;
367 : :
4395 rhaas@postgresql.org 368 [ - + ]: 2050 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
369 : :
370 : 2050 : ssize = sizeof(SnapshotData)
371 : 2050 : + sizeof(TransactionId) * builder->committed.xcnt
372 : 2050 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
373 : :
374 : 2050 : snapshot = MemoryContextAllocZero(builder->context, ssize);
375 : :
2610 andres@anarazel.de 376 : 2050 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
377 : :
378 : : /*
379 : : * We misuse the original meaning of SnapshotData's xip and subxip fields
380 : : * to make the more fitting for our needs.
381 : : *
382 : : * In the 'xip' array we store transactions that have to be treated as
383 : : * committed. Since we will only ever look at tuples from transactions
384 : : * that have modified the catalog it's more efficient to store those few
385 : : * that exist between xmin and xmax (frequently there are none).
386 : : *
387 : : * Snapshots that are used in transactions that have modified the catalog
388 : : * also use the 'subxip' array to store their toplevel xid and all the
389 : : * subtransaction xids so we can recognize when we need to treat rows as
390 : : * visible that are not in xip but still need to be visible. Subxip only
391 : : * gets filled when the transaction is copied into the context of a
392 : : * catalog modifying transaction since we otherwise share a snapshot
393 : : * between transactions. As long as a txn hasn't modified the catalog it
394 : : * doesn't need to treat any uncommitted rows as visible, so there is no
395 : : * need for those xids.
396 : : *
397 : : * Both arrays are qsort'ed so that we can use bsearch() on them.
398 : : */
4395 rhaas@postgresql.org 399 [ - + ]: 2050 : Assert(TransactionIdIsNormal(builder->xmin));
400 [ - + ]: 2050 : Assert(TransactionIdIsNormal(builder->xmax));
401 : :
402 : 2050 : snapshot->xmin = builder->xmin;
403 : 2050 : snapshot->xmax = builder->xmax;
404 : :
405 : : /* store all transactions to be treated as committed by this snapshot */
406 : 2050 : snapshot->xip =
407 : 2050 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
408 : 2050 : snapshot->xcnt = builder->committed.xcnt;
409 : 2050 : memcpy(snapshot->xip,
410 : 2050 : builder->committed.xip,
411 : 2050 : builder->committed.xcnt * sizeof(TransactionId));
412 : :
413 : : /* sort so we can bsearch() */
414 : 2050 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
415 : :
416 : : /*
417 : : * Initially, subxip is empty, i.e. it's a snapshot to be used by
418 : : * transactions that don't modify the catalog. Will be filled by
419 : : * ReorderBufferCopySnap() if necessary.
420 : : */
421 : 2050 : snapshot->subxcnt = 0;
422 : 2050 : snapshot->subxip = NULL;
423 : :
424 : 2050 : snapshot->suboverflowed = false;
425 : 2050 : snapshot->takenDuringRecovery = false;
426 : 2050 : snapshot->copied = false;
427 : 2050 : snapshot->curcid = FirstCommandId;
428 : 2050 : snapshot->active_count = 0;
3986 heikki.linnakangas@i 429 : 2050 : snapshot->regd_count = 0;
2036 andres@anarazel.de 430 : 2050 : snapshot->snapXactCompletionCount = 0;
431 : :
4395 rhaas@postgresql.org 432 : 2050 : return snapshot;
433 : : }
434 : :
435 : : /*
436 : : * Build the initial slot snapshot and convert it to a normal snapshot that
437 : : * is understood by HeapTupleSatisfiesMVCC.
438 : : *
439 : : * The snapshot will be usable directly in current transaction or exported
440 : : * for loading in different transaction.
441 : : */
442 : : Snapshot
3276 tgl@sss.pgh.pa.us 443 : 208 : SnapBuildInitialSnapshot(SnapBuild *builder)
444 : : {
445 : : Snapshot snap;
446 : : TransactionId xid;
447 : : TransactionId safeXid;
448 : : TransactionId *newxip;
4395 rhaas@postgresql.org 449 : 208 : int newxcnt = 0;
450 : :
3276 tgl@sss.pgh.pa.us 451 [ - + ]: 208 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
1210 akapila@postgresql.o 452 [ - + ]: 208 : Assert(builder->building_full_snapshot);
453 : :
454 : : /* don't allow older snapshots */
1031 tgl@sss.pgh.pa.us 455 : 208 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
1210 akapila@postgresql.o 456 [ - + ]: 208 : if (HaveRegisteredOrActiveSnapshot())
1210 akapila@postgresql.o 457 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
1210 akapila@postgresql.o 458 [ - + ]:CBC 208 : Assert(!HistoricSnapshotActive());
459 : :
4395 rhaas@postgresql.org 460 [ - + ]: 208 : if (builder->state != SNAPBUILD_CONSISTENT)
3279 peter_e@gmx.net 461 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
462 : :
4395 rhaas@postgresql.org 463 [ - + ]:CBC 208 : if (!builder->committed.includes_all_transactions)
3279 peter_e@gmx.net 464 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
465 : :
466 : : /* so we don't overwrite the existing value */
2040 andres@anarazel.de 467 [ - + ]:CBC 208 : if (TransactionIdIsValid(MyProc->xmin))
2040 andres@anarazel.de 468 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
469 : :
3196 andres@anarazel.de 470 :CBC 208 : snap = SnapBuildBuildSnapshot(builder);
471 : :
472 : : /*
473 : : * We know that snap->xmin is alive, enforced by the logical xmin
474 : : * mechanism. Due to that we can do this without locks, we're only
475 : : * changing our own value.
476 : : *
477 : : * Building an initial snapshot is expensive and an unenforced xmin
478 : : * horizon would have bad consequences, therefore always double-check that
479 : : * the horizon is enforced.
480 : : */
1210 akapila@postgresql.o 481 : 208 : LWLockAcquire(ProcArrayLock, LW_SHARED);
482 : 208 : safeXid = GetOldestSafeDecodingTransactionId(false);
483 : 208 : LWLockRelease(ProcArrayLock);
484 : :
485 [ - + ]: 208 : if (TransactionIdFollows(safeXid, snap->xmin))
1210 akapila@postgresql.o 486 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
487 : : safeXid, snap->xmin);
488 : :
2040 andres@anarazel.de 489 :CBC 208 : MyProc->xmin = snap->xmin;
490 : :
491 : : /* allocate in transaction context */
95 michael@paquier.xyz 492 :GNC 208 : newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
493 : :
494 : : /*
495 : : * snapbuild.c builds transactions in an "inverted" manner, which means it
496 : : * stores committed transactions in ->xip, not ones in progress. Build a
497 : : * classical snapshot by marking all non-committed transactions as
498 : : * in-progress. This can be expensive.
499 : : */
4395 rhaas@postgresql.org 500 [ + - - + :CBC 208 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
- + ]
501 : : {
502 : : void *test;
503 : :
504 : : /*
505 : : * Check whether transaction committed using the decoding snapshot
506 : : * meaning of ->xip.
507 : : */
4395 rhaas@postgresql.org 508 :UBC 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
509 : : sizeof(TransactionId), xidComparator);
510 : :
511 [ # # ]: 0 : if (test == NULL)
512 : : {
513 [ # # ]: 0 : if (newxcnt >= GetMaxSnapshotXidCount())
3279 peter_e@gmx.net 514 [ # # ]: 0 : ereport(ERROR,
515 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
516 : : errmsg("initial slot snapshot too large")));
517 : :
4395 rhaas@postgresql.org 518 : 0 : newxip[newxcnt++] = xid;
519 : : }
520 : :
521 [ # # ]: 0 : TransactionIdAdvance(xid);
522 : : }
523 : :
524 : : /* adjust remaining snapshot fields as needed */
2580 michael@paquier.xyz 525 :CBC 208 : snap->snapshot_type = SNAPSHOT_MVCC;
4395 rhaas@postgresql.org 526 : 208 : snap->xcnt = newxcnt;
527 : 208 : snap->xip = newxip;
528 : :
3279 peter_e@gmx.net 529 : 208 : return snap;
530 : : }
531 : :
532 : : /*
533 : : * Export a snapshot so it can be set in another session with SET TRANSACTION
534 : : * SNAPSHOT.
535 : : *
536 : : * For that we need to start a transaction in the current backend as the
537 : : * importing side checks whether the source transaction is still open to make
538 : : * sure the xmin horizon hasn't advanced since then.
539 : : */
540 : : const char *
541 : 1 : SnapBuildExportSnapshot(SnapBuild *builder)
542 : : {
543 : : Snapshot snap;
544 : : char *snapname;
545 : :
546 [ - + ]: 1 : if (IsTransactionOrTransactionBlock())
3279 peter_e@gmx.net 547 [ # # ]:UBC 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
548 : :
3279 peter_e@gmx.net 549 [ - + ]:CBC 1 : if (SavedResourceOwnerDuringExport)
3279 peter_e@gmx.net 550 [ # # ]:UBC 0 : elog(ERROR, "can only export one snapshot at a time");
551 : :
3279 peter_e@gmx.net 552 :CBC 1 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
553 : 1 : ExportInProgress = true;
554 : :
555 : 1 : StartTransactionCommand();
556 : :
557 : : /* There doesn't seem to a nice API to set these */
558 : 1 : XactIsoLevel = XACT_REPEATABLE_READ;
559 : 1 : XactReadOnly = true;
560 : :
3276 tgl@sss.pgh.pa.us 561 : 1 : snap = SnapBuildInitialSnapshot(builder);
562 : :
563 : : /*
564 : : * now that we've built a plain snapshot, make it active and use the
565 : : * normal mechanisms for exporting it
566 : : */
4395 rhaas@postgresql.org 567 : 1 : snapname = ExportSnapshot(snap);
568 : :
569 [ + - ]: 1 : ereport(LOG,
570 : : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
571 : : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
572 : : snap->xcnt,
573 : : snapname, snap->xcnt)));
574 : 1 : return snapname;
575 : : }
576 : :
577 : : /*
578 : : * Ensure there is a snapshot and if not build one for current transaction.
579 : : */
580 : : Snapshot
1266 akapila@postgresql.o 581 : 9 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
582 : : {
3630 simon@2ndQuadrant.co 583 [ - + ]: 9 : Assert(builder->state == SNAPBUILD_CONSISTENT);
584 : :
585 : : /* only build a new snapshot if we don't have a prebuilt one */
586 [ + + ]: 9 : if (builder->snapshot == NULL)
587 : : {
3196 andres@anarazel.de 588 : 2 : builder->snapshot = SnapBuildBuildSnapshot(builder);
589 : : /* increase refcount for the snapshot builder */
3630 simon@2ndQuadrant.co 590 : 2 : SnapBuildSnapIncRefcount(builder->snapshot);
591 : : }
592 : :
593 : 9 : return builder->snapshot;
594 : : }
595 : :
596 : : /*
597 : : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
598 : : * any. Aborts the previously started transaction and resets the resource
599 : : * owner back to its original value.
600 : : */
601 : : void
3865 andres@anarazel.de 602 : 5656 : SnapBuildClearExportedSnapshot(void)
603 : : {
604 : : ResourceOwner tmpResOwner;
605 : :
606 : : /* nothing exported, that is the usual case */
4395 rhaas@postgresql.org 607 [ + + ]: 5656 : if (!ExportInProgress)
608 : 5655 : return;
609 : :
610 [ - + ]: 1 : if (!IsTransactionState())
4395 rhaas@postgresql.org 611 [ # # ]:UBC 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
612 : :
613 : : /*
614 : : * AbortCurrentTransaction() takes care of resetting the snapshot state,
615 : : * so remember SavedResourceOwnerDuringExport.
616 : : */
1609 michael@paquier.xyz 617 :CBC 1 : tmpResOwner = SavedResourceOwnerDuringExport;
618 : :
619 : : /* make sure nothing could have ever happened */
4395 rhaas@postgresql.org 620 : 1 : AbortCurrentTransaction();
621 : :
1609 michael@paquier.xyz 622 : 1 : CurrentResourceOwner = tmpResOwner;
623 : : }
624 : :
625 : : /*
626 : : * Clear snapshot export state during transaction abort.
627 : : */
628 : : void
629 : 26758 : SnapBuildResetExportedSnapshotState(void)
630 : : {
4395 rhaas@postgresql.org 631 : 26758 : SavedResourceOwnerDuringExport = NULL;
632 : 26758 : ExportInProgress = false;
633 : 26758 : }
634 : :
635 : : /*
636 : : * Handle the effects of a single heap change, appropriate to the current state
637 : : * of the snapshot builder and returns whether changes made at (xid, lsn) can
638 : : * be decoded.
639 : : */
640 : : bool
641 : 1576040 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
642 : : {
643 : : /*
644 : : * We can't handle data in transactions if we haven't built a snapshot
645 : : * yet, so don't store them.
646 : : */
647 [ - + ]: 1576040 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
4395 rhaas@postgresql.org 648 :UBC 0 : return false;
649 : :
650 : : /*
651 : : * No point in keeping track of changes in transactions that we don't have
652 : : * enough information about to decode. This means that they started before
653 : : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
654 : : */
4395 rhaas@postgresql.org 655 [ + + - + ]:CBC 1576043 : if (builder->state < SNAPBUILD_CONSISTENT &&
1854 andres@anarazel.de 656 : 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
4395 rhaas@postgresql.org 657 :UBC 0 : return false;
658 : :
659 : : /*
660 : : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
661 : : * be needed to decode the change we're currently processing.
662 : : */
3662 andres@anarazel.de 663 [ + + ]:CBC 1576040 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
664 : : {
665 : : /* only build a new snapshot if we don't have a prebuilt one */
4395 rhaas@postgresql.org 666 [ + + ]: 3627 : if (builder->snapshot == NULL)
667 : : {
3196 andres@anarazel.de 668 : 418 : builder->snapshot = SnapBuildBuildSnapshot(builder);
669 : : /* increase refcount for the snapshot builder */
4395 rhaas@postgresql.org 670 : 418 : SnapBuildSnapIncRefcount(builder->snapshot);
671 : : }
672 : :
673 : : /*
674 : : * Increase refcount for the transaction we're handing the snapshot
675 : : * out to.
676 : : */
677 : 3627 : SnapBuildSnapIncRefcount(builder->snapshot);
678 : 3627 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
679 : : builder->snapshot);
680 : : }
681 : :
682 : 1576040 : return true;
683 : : }
684 : :
685 : : /*
686 : : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
687 : : * This implies that a transaction has done some form of write to system
688 : : * catalogs.
689 : : */
690 : : void
691 : 25319 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
692 : : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
693 : : {
694 : : CommandId cid;
695 : :
696 : : /*
697 : : * we only log new_cid's if a catalog tuple was modified, so mark the
698 : : * transaction as containing catalog modifications
699 : : */
4331 bruce@momjian.us 700 : 25319 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
701 : :
4395 rhaas@postgresql.org 702 : 25319 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
703 : : xlrec->target_locator, xlrec->target_tid,
704 : : xlrec->cmin, xlrec->cmax,
705 : : xlrec->combocid);
706 : :
707 : : /* figure out new command id */
708 [ + + ]: 25319 : if (xlrec->cmin != InvalidCommandId &&
709 [ + + ]: 21116 : xlrec->cmax != InvalidCommandId)
710 : 3214 : cid = Max(xlrec->cmin, xlrec->cmax);
711 [ + + ]: 22105 : else if (xlrec->cmax != InvalidCommandId)
712 : 4203 : cid = xlrec->cmax;
713 [ + - ]: 17902 : else if (xlrec->cmin != InvalidCommandId)
714 : 17902 : cid = xlrec->cmin;
715 : : else
716 : : {
4331 bruce@momjian.us 717 :UBC 0 : cid = InvalidCommandId; /* silence compiler */
4395 rhaas@postgresql.org 718 [ # # ]: 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
719 : : }
720 : :
4395 rhaas@postgresql.org 721 :CBC 25319 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
722 : 25319 : }
723 : :
724 : : /*
725 : : * Add a new Snapshot and invalidation messages to all transactions we're
726 : : * decoding that currently are in-progress so they can see new catalog contents
727 : : * made by the transaction that just committed. This is necessary because those
728 : : * in-progress transactions will use the new catalog's contents from here on
729 : : * (at the very least everything they do needs to be compatible with newer
730 : : * catalog contents).
731 : : */
732 : : static void
339 akapila@postgresql.o 733 : 1413 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
734 : : {
735 : : dlist_iter txn_i;
736 : : ReorderBufferTXN *txn;
737 : :
738 : : /*
739 : : * Iterate through all toplevel transactions. This can include
740 : : * subtransactions which we just don't yet know to be that, but that's
741 : : * fine, they will just get an unnecessary snapshot and invalidations
742 : : * queued.
743 : : */
4395 rhaas@postgresql.org 744 [ + - + + ]: 2861 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
745 : : {
746 : 1448 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
747 : :
748 [ - + ]: 1448 : Assert(TransactionIdIsValid(txn->xid));
749 : :
750 : : /*
751 : : * If we don't have a base snapshot yet, there are no changes in this
752 : : * transaction which in turn implies we don't yet need a snapshot at
753 : : * all. We'll add a snapshot when the first change gets queued.
754 : : *
755 : : * Similarly, we don't need to add invalidations to a transaction
756 : : * whose base snapshot is not yet set. Once a base snapshot is built,
757 : : * it will include the xids of committed transactions that have
758 : : * modified the catalog, thus reflecting the new catalog contents. The
759 : : * existing catalog cache will have already been invalidated after
760 : : * processing the invalidations in the transaction that modified
761 : : * catalogs, ensuring that a fresh cache is constructed during
762 : : * decoding.
763 : : *
764 : : * NB: This works correctly even for subtransactions because
765 : : * ReorderBufferAssignChild() takes care to transfer the base snapshot
766 : : * to the top-level transaction, and while iterating the changequeue
767 : : * we'll get the change from the subtxn.
768 : : */
769 [ + + ]: 1448 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
770 : 2 : continue;
771 : :
772 : : /*
773 : : * We don't need to add snapshot or invalidations to prepared
774 : : * transactions as they should not see the new catalog contents.
775 : : */
396 msawada@postgresql.o 776 [ + + ]: 1446 : if (rbtxn_is_prepared(txn))
1896 akapila@postgresql.o 777 : 28 : continue;
778 : :
251 alvherre@kurilemu.de 779 [ + + ]:GNC 1418 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
780 : : txn->xid, LSN_FORMAT_ARGS(lsn));
781 : :
782 : : /*
783 : : * increase the snapshot's refcount for the transaction we are handing
784 : : * it out to
785 : : */
4395 rhaas@postgresql.org 786 :CBC 1418 : SnapBuildSnapIncRefcount(builder->snapshot);
787 : 1418 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
788 : : builder->snapshot);
789 : :
790 : : /*
791 : : * Add invalidation messages to the reorder buffer of in-progress
792 : : * transactions except the current committed transaction, for which we
793 : : * will execute invalidations at the end.
794 : : *
795 : : * It is required, otherwise, we will end up using the stale catcache
796 : : * contents built by the current transaction even after its decoding,
797 : : * which should have been invalidated due to concurrent catalog
798 : : * changing transaction.
799 : : *
800 : : * Distribute only the invalidation messages generated by the current
801 : : * committed transaction. Invalidation messages received from other
802 : : * transactions would have already been propagated to the relevant
803 : : * in-progress transactions. This transaction would have processed
804 : : * those invalidations, ensuring that subsequent transactions observe
805 : : * a consistent cache state.
806 : : */
339 akapila@postgresql.o 807 [ + + ]: 1418 : if (txn->xid != xid)
808 : : {
809 : : uint32 ninvalidations;
810 : 33 : SharedInvalidationMessage *msgs = NULL;
811 : :
812 : 33 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
813 : : xid, &msgs);
814 : :
815 [ + + ]: 33 : if (ninvalidations > 0)
816 : : {
817 [ - + ]: 29 : Assert(msgs != NULL);
818 : :
272 msawada@postgresql.o 819 : 29 : ReorderBufferAddDistributedInvalidations(builder->reorder,
820 : : txn->xid, lsn,
821 : : ninvalidations, msgs);
822 : : }
823 : : }
824 : : }
4395 rhaas@postgresql.org 825 : 1413 : }
826 : :
827 : : /*
828 : : * Keep track of a new catalog changing transaction that has committed.
829 : : */
830 : : static void
831 : 1422 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
832 : : {
833 [ - + ]: 1422 : Assert(TransactionIdIsValid(xid));
834 : :
835 [ - + ]: 1422 : if (builder->committed.xcnt == builder->committed.xcnt_space)
836 : : {
4395 rhaas@postgresql.org 837 :UBC 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
838 : :
839 [ # # ]: 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
840 : : (uint32) builder->committed.xcnt_space);
841 : :
9 msawada@postgresql.o 842 :UNC 0 : builder->committed.xip = repalloc_array(builder->committed.xip,
843 : : TransactionId,
844 : : builder->committed.xcnt_space);
845 : : }
846 : :
847 : : /*
848 : : * TODO: It might make sense to keep the array sorted here instead of
849 : : * doing it every time we build a new snapshot. On the other hand this
850 : : * gets called repeatedly when a transaction with subtransactions commits.
851 : : */
4395 rhaas@postgresql.org 852 :CBC 1422 : builder->committed.xip[builder->committed.xcnt++] = xid;
853 : 1422 : }
854 : :
855 : : /*
856 : : * Remove knowledge about transactions we treat as committed or containing catalog
857 : : * changes that are smaller than ->xmin. Those won't ever get checked via
858 : : * the ->committed or ->catchange array, respectively. The committed xids will
859 : : * get checked via the clog machinery.
860 : : *
861 : : * We can ideally remove the transaction from catchange array once it is
862 : : * finished (committed/aborted) but that could be costly as we need to maintain
863 : : * the xids order in the array.
864 : : */
865 : : static void
1312 akapila@postgresql.o 866 : 489 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
867 : : {
868 : : int off;
869 : : TransactionId *workspace;
4395 rhaas@postgresql.org 870 : 489 : int surviving_xids = 0;
871 : :
872 : : /* not ready yet */
873 [ - + ]: 489 : if (!TransactionIdIsNormal(builder->xmin))
4395 rhaas@postgresql.org 874 :UBC 0 : return;
875 : :
876 : : /* TODO: Neater algorithm than just copying and iterating? */
877 : : workspace =
4395 rhaas@postgresql.org 878 :CBC 489 : MemoryContextAlloc(builder->context,
879 : 489 : builder->committed.xcnt * sizeof(TransactionId));
880 : :
881 : : /* copy xids that still are interesting to workspace */
882 [ + + ]: 746 : for (off = 0; off < builder->committed.xcnt; off++)
883 : : {
884 [ + - - + : 257 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
+ + ]
885 : : builder->xmin))
886 : : ; /* remove */
887 : : else
888 : 1 : workspace[surviving_xids++] = builder->committed.xip[off];
889 : : }
890 : :
891 : : /* copy workspace back to persistent state */
892 : 489 : memcpy(builder->committed.xip, workspace,
893 : : surviving_xids * sizeof(TransactionId));
894 : :
895 [ - + ]: 489 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
896 : : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
897 : : builder->xmin, builder->xmax);
898 : 489 : builder->committed.xcnt = surviving_xids;
899 : :
900 : 489 : pfree(workspace);
901 : :
902 : : /*
903 : : * Purge xids in ->catchange as well. The purged array must also be sorted
904 : : * in xidComparator order.
905 : : */
1294 akapila@postgresql.o 906 [ + + ]: 489 : if (builder->catchange.xcnt > 0)
907 : : {
908 : : /*
909 : : * Since catchange.xip is sorted, we find the lower bound of xids that
910 : : * are still interesting.
911 : : */
912 [ + + ]: 9 : for (off = 0; off < builder->catchange.xcnt; off++)
913 : : {
914 [ + + ]: 6 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
915 : : builder->xmin))
916 : 1 : break;
917 : : }
918 : :
919 : 4 : surviving_xids = builder->catchange.xcnt - off;
920 : :
921 [ + + ]: 4 : if (surviving_xids > 0)
922 : : {
923 : 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
924 : : surviving_xids * sizeof(TransactionId));
925 : : }
926 : : else
927 : : {
928 : 3 : pfree(builder->catchange.xip);
929 : 3 : builder->catchange.xip = NULL;
930 : : }
931 : :
932 [ - + ]: 4 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
933 : : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
934 : : builder->xmin, builder->xmax);
935 : 4 : builder->catchange.xcnt = surviving_xids;
936 : : }
937 : : }
938 : :
939 : : /*
940 : : * Handle everything that needs to be done when a transaction commits
941 : : */
942 : : void
4395 rhaas@postgresql.org 943 : 3505 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
944 : : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
945 : : {
946 : : int nxact;
947 : :
3228 andres@anarazel.de 948 : 3505 : bool needs_snapshot = false;
949 : 3505 : bool needs_timetravel = false;
4395 rhaas@postgresql.org 950 : 3505 : bool sub_needs_timetravel = false;
951 : :
952 : 3505 : TransactionId xmax = xid;
953 : :
954 : : /*
955 : : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
956 : : * will they be part of a snapshot. So we don't need to record anything.
957 : : */
3228 andres@anarazel.de 958 [ + - ]: 3505 : if (builder->state == SNAPBUILD_START ||
959 [ - + - - ]: 3505 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1854 andres@anarazel.de 960 :UBC 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
961 : : {
962 : : /* ensure that only commits after this are getting replayed */
3228 963 [ # # ]: 0 : if (builder->start_decoding_at <= lsn)
964 : 0 : builder->start_decoding_at = lsn + 1;
965 : 0 : return;
966 : : }
967 : :
4395 rhaas@postgresql.org 968 [ + + ]:CBC 3505 : if (builder->state < SNAPBUILD_CONSISTENT)
969 : : {
970 : : /* ensure that only commits after this are getting replayed */
4301 andres@anarazel.de 971 [ + + ]: 5 : if (builder->start_decoding_at <= lsn)
972 : 2 : builder->start_decoding_at = lsn + 1;
973 : :
974 : : /*
975 : : * If building an exportable snapshot, force xid to be tracked, even
976 : : * if the transaction didn't modify the catalog.
977 : : */
3228 978 [ - + ]: 5 : if (builder->building_full_snapshot)
979 : : {
3228 andres@anarazel.de 980 :UBC 0 : needs_timetravel = true;
981 : : }
982 : : }
983 : :
4395 rhaas@postgresql.org 984 [ + + ]:CBC 4754 : for (nxact = 0; nxact < nsubxacts; nxact++)
985 : : {
986 : 1249 : TransactionId subxid = subxacts[nxact];
987 : :
988 : : /*
989 : : * Add subtransaction to base snapshot if catalog modifying, we don't
990 : : * distinguish to toplevel transactions there.
991 : : */
1312 akapila@postgresql.o 992 [ + + ]: 1249 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
993 : : {
3228 andres@anarazel.de 994 : 9 : sub_needs_timetravel = true;
995 : 9 : needs_snapshot = true;
996 : :
997 [ - + ]: 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
998 : : xid, subxid);
999 : :
4395 rhaas@postgresql.org 1000 : 9 : SnapBuildAddCommittedTxn(builder, subxid);
1001 : :
1002 [ + - - + : 9 : if (NormalTransactionIdFollows(subxid, xmax))
+ - ]
1003 : 9 : xmax = subxid;
1004 : : }
1005 : :
1006 : : /*
1007 : : * If we're forcing timetravel we also need visibility information
1008 : : * about subtransaction, so keep track of subtransaction's state, even
1009 : : * if not catalog modifying. Don't need to distribute a snapshot in
1010 : : * that case.
1011 : : */
3228 andres@anarazel.de 1012 [ - + ]: 1240 : else if (needs_timetravel)
1013 : : {
4395 rhaas@postgresql.org 1014 :UBC 0 : SnapBuildAddCommittedTxn(builder, subxid);
1015 [ # # # # : 0 : if (NormalTransactionIdFollows(subxid, xmax))
# # ]
1016 : 0 : xmax = subxid;
1017 : : }
1018 : : }
1019 : :
1020 : : /* if top-level modified catalog, it'll need a snapshot */
1312 akapila@postgresql.o 1021 [ + + ]:CBC 3505 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1022 : : {
3228 andres@anarazel.de 1023 [ + + ]: 1412 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1024 : : xid);
1025 : 1412 : needs_snapshot = true;
1026 : 1412 : needs_timetravel = true;
4395 rhaas@postgresql.org 1027 : 1412 : SnapBuildAddCommittedTxn(builder, xid);
1028 : : }
3228 andres@anarazel.de 1029 [ + + ]: 2093 : else if (sub_needs_timetravel)
1030 : : {
1031 : : /* track toplevel txn as well, subxact alone isn't meaningful */
1242 akapila@postgresql.o 1032 [ - + ]: 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1033 : : xid);
1034 : 1 : needs_timetravel = true;
4395 rhaas@postgresql.org 1035 : 1 : SnapBuildAddCommittedTxn(builder, xid);
1036 : : }
3228 andres@anarazel.de 1037 [ - + ]: 2092 : else if (needs_timetravel)
1038 : : {
3228 andres@anarazel.de 1039 [ # # ]:UBC 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1040 : :
4395 rhaas@postgresql.org 1041 : 0 : SnapBuildAddCommittedTxn(builder, xid);
1042 : : }
1043 : :
3228 andres@anarazel.de 1044 [ + + ]:CBC 3505 : if (!needs_timetravel)
1045 : : {
1046 : : /* record that we cannot export a general snapshot anymore */
1047 : 2092 : builder->committed.includes_all_transactions = false;
1048 : : }
1049 : :
1050 [ + + - + ]: 3505 : Assert(!needs_snapshot || needs_timetravel);
1051 : :
1052 : : /*
1053 : : * Adjust xmax of the snapshot builder, we only do that for committed,
1054 : : * catalog modifying, transactions, everything else isn't interesting for
1055 : : * us since we'll never look at the respective rows.
1056 : : */
1057 [ + + ]: 3505 : if (needs_timetravel &&
1058 [ + - + - ]: 2826 : (!TransactionIdIsValid(builder->xmax) ||
1059 : 1413 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1060 : : {
1061 : 1413 : builder->xmax = xmax;
1062 [ - + ]: 1413 : TransactionIdAdvance(builder->xmax);
1063 : : }
1064 : :
1065 : : /* if there's any reason to build a historic snapshot, do so now */
1066 [ + + ]: 3505 : if (needs_snapshot)
1067 : : {
1068 : : /*
1069 : : * If we haven't built a complete snapshot yet there's no need to hand
1070 : : * it out, it wouldn't (and couldn't) be used anyway.
1071 : : */
4395 rhaas@postgresql.org 1072 [ - + ]: 1413 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
4395 rhaas@postgresql.org 1073 :UBC 0 : return;
1074 : :
1075 : : /*
1076 : : * Decrease the snapshot builder's refcount of the old snapshot, note
1077 : : * that it still will be used if it has been handed out to the
1078 : : * reorderbuffer earlier.
1079 : : */
4395 rhaas@postgresql.org 1080 [ + - ]:CBC 1413 : if (builder->snapshot)
1081 : 1413 : SnapBuildSnapDecRefcount(builder->snapshot);
1082 : :
3196 andres@anarazel.de 1083 : 1413 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1084 : :
1085 : : /* we might need to execute invalidations, add snapshot */
4395 rhaas@postgresql.org 1086 [ + + ]: 1413 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1087 : : {
1088 : 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1089 : 8 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1090 : : builder->snapshot);
1091 : : }
1092 : :
1093 : : /* refcount of the snapshot builder for the new snapshot */
1094 : 1413 : SnapBuildSnapIncRefcount(builder->snapshot);
1095 : :
1096 : : /*
1097 : : * Add a new catalog snapshot and invalidations messages to all
1098 : : * currently running transactions.
1099 : : */
339 akapila@postgresql.o 1100 : 1413 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1101 : : }
1102 : : }
1103 : :
1104 : : /*
1105 : : * Check the reorder buffer and the snapshot to see if the given transaction has
1106 : : * modified catalogs.
1107 : : */
1108 : : static inline bool
1312 1109 : 4754 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1110 : : uint32 xinfo)
1111 : : {
1112 [ + + ]: 4754 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1113 : 1417 : return true;
1114 : :
1115 : : /*
1116 : : * The transactions that have changed catalogs must have invalidation
1117 : : * info.
1118 : : */
1119 [ + + ]: 3337 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1120 : 3329 : return false;
1121 : :
1122 : : /* Check the catchange XID array */
1123 [ + + + - ]: 12 : return ((builder->catchange.xcnt > 0) &&
1124 : 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1125 : : sizeof(TransactionId), xidComparator) != NULL));
1126 : : }
1127 : :
1128 : : /* -----------------------------------
1129 : : * Snapshot building functions dealing with xlog records
1130 : : * -----------------------------------
1131 : : */
1132 : :
1133 : : /*
1134 : : * Process a running xacts record, and use its information to first build a
1135 : : * historic snapshot and later to release resources that aren't needed
1136 : : * anymore.
1137 : : */
1138 : : void
4395 rhaas@postgresql.org 1139 : 1601 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1140 : : {
1141 : : ReorderBufferTXN *txn;
1142 : : TransactionId xmin;
1143 : :
1144 : : /*
1145 : : * If we're not consistent yet, inspect the record to see whether it
1146 : : * allows to get closer to being consistent. If we are consistent, dump
1147 : : * our snapshot so others or we, after a restart, can use it.
1148 : : */
1149 [ + + ]: 1601 : if (builder->state < SNAPBUILD_CONSISTENT)
1150 : : {
1151 : : /* returns false if there's no point in performing cleanup just yet */
1152 [ + + ]: 1135 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1153 : 1110 : return;
1154 : : }
1155 : : else
1156 : 466 : SnapBuildSerialize(builder, lsn);
1157 : :
1158 : : /*
1159 : : * Update range of interesting xids based on the running xacts
1160 : : * information. We don't increase ->xmax using it, because once we are in
1161 : : * a consistent state we can do that ourselves and much more efficiently
1162 : : * so, because we only need to do it for catalog transactions since we
1163 : : * only ever look at those.
1164 : : *
1165 : : * NB: We only increase xmax when a catalog modifying transaction commits
1166 : : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1167 : : * xmin, which looks odd but is correct and actually more efficient, since
1168 : : * we hit fast paths in heapam_visibility.c.
1169 : : */
1170 : 489 : builder->xmin = running->oldestRunningXid;
1171 : :
1172 : : /* Remove transactions we don't need to keep track off anymore */
1312 akapila@postgresql.o 1173 : 489 : SnapBuildPurgeOlderTxn(builder);
1174 : :
1175 : : /*
1176 : : * Advance the xmin limit for the current replication slot, to allow
1177 : : * vacuum to clean up the tuples this slot has been protecting.
1178 : : *
1179 : : * The reorderbuffer might have an xmin among the currently running
1180 : : * snapshots; use it if so. If not, we need only consider the snapshots
1181 : : * we'll produce later, which can't be less than the oldest running xid in
1182 : : * the record we're reading now.
1183 : : */
2819 alvherre@alvh.no-ip. 1184 : 489 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1185 [ + + ]: 489 : if (xmin == InvalidTransactionId)
1186 : 437 : xmin = running->oldestRunningXid;
1187 [ - + ]: 489 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1188 : : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1189 : 489 : LogicalIncreaseXminForSlot(lsn, xmin);
1190 : :
1191 : : /*
1192 : : * Also tell the slot where we can restart decoding from. We don't want to
1193 : : * do that after every commit because changing that implies an fsync of
1194 : : * the logical slot's state file, so we only do it every time we see a
1195 : : * running xacts record.
1196 : : *
1197 : : * Do so by looking for the oldest in progress transaction (determined by
1198 : : * the first LSN of any of its relevant records). Every transaction
1199 : : * remembers the last location we stored the snapshot to disk before its
1200 : : * beginning. That point is where we can restart from.
1201 : : */
1202 : :
1203 : : /*
1204 : : * Can't know about a serialized snapshot's location if we're not
1205 : : * consistent.
1206 : : */
4395 rhaas@postgresql.org 1207 [ + + ]: 489 : if (builder->state < SNAPBUILD_CONSISTENT)
1208 : 18 : return;
1209 : :
1210 : 471 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1211 : :
1212 : : /*
1213 : : * oldest ongoing txn might have started when we didn't yet serialize
1214 : : * anything because we hadn't reached a consistent state yet.
1215 : : */
129 alvherre@kurilemu.de 1216 [ + + + + ]:GNC 471 : if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
4395 rhaas@postgresql.org 1217 :CBC 23 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1218 : :
1219 : : /*
1220 : : * No in-progress transaction, can reuse the last serialized snapshot if
1221 : : * we have one.
1222 : : */
1223 [ + + ]: 448 : else if (txn == NULL &&
129 alvherre@kurilemu.de 1224 [ + + ]:GNC 410 : XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
1225 [ + - ]: 408 : XLogRecPtrIsValid(builder->last_serialized_snapshot))
4395 rhaas@postgresql.org 1226 :CBC 408 : LogicalIncreaseRestartDecodingForSlot(lsn,
1227 : : builder->last_serialized_snapshot);
1228 : : }
1229 : :
1230 : :
1231 : : /*
1232 : : * Build the start of a snapshot that's capable of decoding the catalog.
1233 : : *
1234 : : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1235 : : * consistent.
1236 : : *
1237 : : * Returns true if there is a point in performing internal maintenance/cleanup
1238 : : * using the xl_running_xacts record.
1239 : : */
1240 : : static bool
1241 : 1135 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1242 : : {
1243 : : /* ---
1244 : : * Build catalog decoding snapshot incrementally using information about
1245 : : * the currently running transactions. There are several ways to do that:
1246 : : *
1247 : : * a) There were no running transactions when the xl_running_xacts record
1248 : : * was inserted, jump to CONSISTENT immediately. We might find such a
1249 : : * state while waiting on c)'s sub-states.
1250 : : *
1251 : : * b) This (in a previous run) or another decoding slot serialized a
1252 : : * snapshot to disk that we can use. Can't use this method while finding
1253 : : * the start point for decoding changes as the restart LSN would be an
1254 : : * arbitrary LSN but we need to find the start point to extract changes
1255 : : * where we won't see the data for partial transactions. Also, we cannot
1256 : : * use this method when a slot needs a full snapshot for export or direct
1257 : : * use, as that snapshot will only contain catalog modifying transactions.
1258 : : *
1259 : : * c) First incrementally build a snapshot for catalog tuples
1260 : : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1261 : : * transactions to finish. Every transaction starting after that
1262 : : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1263 : : * for older running transactions no viable snapshot exists yet, so
1264 : : * CONSISTENT will only be reached once all of those have finished.
1265 : : * ---
1266 : : */
1267 : :
1268 : : /*
1269 : : * xl_running_xacts record is older than what we can use, we might not
1270 : : * have all necessary catalog rows anymore.
1271 : : */
1272 [ + + ]: 1135 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1273 [ + - - + : 492 : NormalTransactionIdPrecedes(running->oldestRunningXid,
- + ]
1274 : : builder->initial_xmin_horizon))
1275 : : {
4395 rhaas@postgresql.org 1276 [ # # ]:UBC 0 : ereport(DEBUG1,
1277 : : errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1278 : : LSN_FORMAT_ARGS(lsn)),
1279 : : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1280 : : builder->initial_xmin_horizon, running->oldestRunningXid));
1281 : :
1282 : :
3228 andres@anarazel.de 1283 : 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1284 : :
4395 rhaas@postgresql.org 1285 : 0 : return true;
1286 : : }
1287 : :
1288 : : /*
1289 : : * a) No transaction were running, we can jump to consistent.
1290 : : *
1291 : : * This is not affected by races around xl_running_xacts, because we can
1292 : : * miss transaction commits, but currently not transactions starting.
1293 : : *
1294 : : * NB: We might have already started to incrementally assemble a snapshot,
1295 : : * so we need to be careful to deal with that.
1296 : : */
3228 andres@anarazel.de 1297 [ + + ]:CBC 1135 : if (running->oldestRunningXid == running->nextXid)
1298 : : {
129 alvherre@kurilemu.de 1299 [ + + ]:GNC 1101 : if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
4301 andres@anarazel.de 1300 [ + + ]:CBC 617 : builder->start_decoding_at <= lsn)
1301 : : /* can decode everything after this */
1302 : 485 : builder->start_decoding_at = lsn + 1;
1303 : :
1304 : : /* As no transactions were running xmin/xmax can be trivially set. */
3189 tgl@sss.pgh.pa.us 1305 : 1101 : builder->xmin = running->nextXid; /* < are finished */
1306 : 1101 : builder->xmax = running->nextXid; /* >= are running */
1307 : :
1308 : : /* so we can safely use the faster comparisons */
4395 rhaas@postgresql.org 1309 [ - + ]: 1101 : Assert(TransactionIdIsNormal(builder->xmin));
1310 [ - + ]: 1101 : Assert(TransactionIdIsNormal(builder->xmax));
1311 : :
1312 : 1101 : builder->state = SNAPBUILD_CONSISTENT;
1854 andres@anarazel.de 1313 : 1101 : builder->next_phase_at = InvalidTransactionId;
1314 : :
4395 rhaas@postgresql.org 1315 [ + - ]: 1101 : ereport(LOG,
1316 : : errmsg("logical decoding found consistent point at %X/%08X",
1317 : : LSN_FORMAT_ARGS(lsn)),
1318 : : errdetail("There are no running transactions."));
1319 : :
1320 : 1101 : return false;
1321 : : }
1322 : :
1323 : : /*
1324 : : * b) valid on disk state and while neither building full snapshot nor
1325 : : * creating a slot.
1326 : : */
3244 andres@anarazel.de 1327 [ + + ]: 34 : else if (!builder->building_full_snapshot &&
612 msawada@postgresql.o 1328 [ + + + + ]: 52 : !builder->in_slot_creation &&
3244 andres@anarazel.de 1329 : 20 : SnapBuildRestore(builder, lsn))
1330 : : {
1331 : : /* there won't be any state to cleanup */
4395 rhaas@postgresql.org 1332 : 9 : return false;
1333 : : }
1334 : :
1335 : : /*
1336 : : * c) transition from START to BUILDING_SNAPSHOT.
1337 : : *
1338 : : * In START state, and a xl_running_xacts record with running xacts is
1339 : : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1340 : : * record xl_running_xacts->nextXid. Once all running xacts have finished
1341 : : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1342 : : * might look that we could use xl_running_xacts's ->xids information to
1343 : : * get there quicker, but that is problematic because transactions marked
1344 : : * as running, might already have inserted their commit record - it's
1345 : : * infeasible to change that with locking.
1346 : : */
3228 andres@anarazel.de 1347 [ + + ]: 25 : else if (builder->state == SNAPBUILD_START)
1348 : : {
1349 : 14 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1854 1350 : 14 : builder->next_phase_at = running->nextXid;
1351 : :
1352 : : /*
1353 : : * Start with an xmin/xmax that's correct for future, when all the
1354 : : * currently running transactions have finished. We'll update both
1355 : : * while waiting for the pending transactions to finish.
1356 : : */
3189 tgl@sss.pgh.pa.us 1357 : 14 : builder->xmin = running->nextXid; /* < are finished */
1358 : 14 : builder->xmax = running->nextXid; /* >= are running */
1359 : :
1360 : : /* so we can safely use the faster comparisons */
4395 rhaas@postgresql.org 1361 [ - + ]: 14 : Assert(TransactionIdIsNormal(builder->xmin));
1362 [ - + ]: 14 : Assert(TransactionIdIsNormal(builder->xmax));
1363 : :
1364 [ + - ]: 14 : ereport(LOG,
1365 : : errmsg("logical decoding found initial starting point at %X/%08X",
1366 : : LSN_FORMAT_ARGS(lsn)),
1367 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1368 : : running->xcnt, running->nextXid));
1369 : :
3228 andres@anarazel.de 1370 : 14 : SnapBuildWaitSnapshot(running, running->nextXid);
1371 : : }
1372 : :
1373 : : /*
1374 : : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1375 : : *
1376 : : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1377 : : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1378 : : * means all transactions starting afterwards have enough information to
1379 : : * be decoded. Switch to FULL_SNAPSHOT.
1380 : : */
1381 [ + + + + ]: 17 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1854 1382 : 6 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1383 : : running->oldestRunningXid))
1384 : : {
3228 1385 : 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1854 1386 : 5 : builder->next_phase_at = running->nextXid;
1387 : :
3228 1388 [ + - ]: 5 : ereport(LOG,
1389 : : errmsg("logical decoding found initial consistent point at %X/%08X",
1390 : : LSN_FORMAT_ARGS(lsn)),
1391 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1392 : : running->xcnt, running->nextXid));
1393 : :
1394 : 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1395 : : }
1396 : :
1397 : : /*
1398 : : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1399 : : *
1400 : : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1401 : : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1402 : : * transactions that are currently in progress have a catalog snapshot,
1403 : : * and all their changes have been collected. Switch to CONSISTENT.
1404 : : */
1405 [ + + + - ]: 11 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1854 1406 : 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1407 : : running->oldestRunningXid))
1408 : : {
3228 1409 : 5 : builder->state = SNAPBUILD_CONSISTENT;
1854 1410 : 5 : builder->next_phase_at = InvalidTransactionId;
1411 : :
3228 1412 [ + - ]: 5 : ereport(LOG,
1413 : : errmsg("logical decoding found consistent point at %X/%08X",
1414 : : LSN_FORMAT_ARGS(lsn)),
1415 : : errdetail("There are no old transactions anymore."));
1416 : : }
1417 : :
1418 : : /*
1419 : : * We already started to track running xacts and need to wait for all
1420 : : * in-progress ones to finish. We fall through to the normal processing of
1421 : : * records so incremental cleanup can be performed.
1422 : : */
4395 rhaas@postgresql.org 1423 : 23 : return true;
1424 : : }
1425 : :
1426 : : /* ---
1427 : : * Iterate through xids in record, wait for all older than the cutoff to
1428 : : * finish. Then, if possible, log a new xl_running_xacts record.
1429 : : *
1430 : : * This isn't required for the correctness of decoding, but to:
1431 : : * a) allow isolationtester to notice that we're currently waiting for
1432 : : * something.
1433 : : * b) log a new xl_running_xacts record where it'd be helpful, without having
1434 : : * to wait for bgwriter or checkpointer.
1435 : : * ---
1436 : : */
1437 : : static void
3228 andres@anarazel.de 1438 : 19 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1439 : : {
1440 : : int off;
1441 : :
1442 [ + + ]: 36 : for (off = 0; off < running->xcnt; off++)
1443 : : {
1444 : 19 : TransactionId xid = running->xids[off];
1445 : :
1446 : : /*
1447 : : * Upper layers should prevent that we ever need to wait on ourselves.
1448 : : * Check anyway, since failing to do so would either result in an
1449 : : * endless wait or an Assert() failure.
1450 : : */
1451 [ - + ]: 19 : if (TransactionIdIsCurrentTransactionId(xid))
3228 andres@anarazel.de 1452 [ # # ]:UBC 0 : elog(ERROR, "waiting for ourselves");
1453 : :
3228 andres@anarazel.de 1454 [ - + ]:CBC 19 : if (TransactionIdFollows(xid, cutoff))
3228 andres@anarazel.de 1455 :UBC 0 : continue;
1456 : :
3228 andres@anarazel.de 1457 :CBC 19 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1458 : : }
1459 : :
1460 : : /*
1461 : : * All transactions we needed to finish finished - try to ensure there is
1462 : : * another xl_running_xacts record in a timely manner, without having to
1463 : : * wait for bgwriter or checkpointer to log one. During recovery we can't
1464 : : * enforce that, so we'll have to wait.
1465 : : */
1466 [ + - ]: 17 : if (!RecoveryInProgress())
1467 : : {
1468 : 17 : LogStandbySnapshot();
1469 : : }
1470 : 17 : }
1471 : :
1472 : : #define SnapBuildOnDiskConstantSize \
1473 : : offsetof(SnapBuildOnDisk, builder)
1474 : : #define SnapBuildOnDiskNotChecksummedSize \
1475 : : offsetof(SnapBuildOnDisk, version)
1476 : :
1477 : : #define SNAPBUILD_MAGIC 0x51A1E001
1478 : : #define SNAPBUILD_VERSION 6
1479 : :
1480 : : /*
1481 : : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1482 : : *
1483 : : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1484 : : * a record that's a potential location for a serialized snapshot.
1485 : : */
1486 : : void
4395 rhaas@postgresql.org 1487 : 73 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1488 : : {
1489 [ - + ]: 73 : if (builder->state < SNAPBUILD_CONSISTENT)
4395 rhaas@postgresql.org 1490 :UBC 0 : SnapBuildRestore(builder, lsn);
1491 : : else
4395 rhaas@postgresql.org 1492 :CBC 73 : SnapBuildSerialize(builder, lsn);
1493 : 73 : }
1494 : :
1495 : : /*
1496 : : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1497 : : * been done by another decoding process.
1498 : : */
1499 : : static void
1500 : 539 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1501 : : {
1502 : : Size needed_length;
1887 akapila@postgresql.o 1503 : 539 : SnapBuildOnDisk *ondisk = NULL;
1312 1504 : 539 : TransactionId *catchange_xip = NULL;
1505 : : MemoryContext old_ctx;
1506 : : size_t catchange_xcnt;
1507 : : char *ondisk_c;
1508 : : int fd;
1509 : : char tmppath[MAXPGPATH];
1510 : : char path[MAXPGPATH];
1511 : : int ret;
1512 : : struct stat stat_buf;
1513 : : Size sz;
1514 : :
129 alvherre@kurilemu.de 1515 [ - + ]:GNC 539 : Assert(XLogRecPtrIsValid(lsn));
1516 [ + + - + ]: 539 : Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
1517 : : builder->last_serialized_snapshot <= lsn);
1518 : :
1519 : : /*
1520 : : * no point in serializing if we cannot continue to work immediately after
1521 : : * restoring the snapshot
1522 : : */
4395 rhaas@postgresql.org 1523 [ - + ]:CBC 539 : if (builder->state < SNAPBUILD_CONSISTENT)
4395 rhaas@postgresql.org 1524 :UBC 0 : return;
1525 : :
1526 : : /* consistent snapshots have no next phase */
1854 andres@anarazel.de 1527 [ - + ]:CBC 539 : Assert(builder->next_phase_at == InvalidTransactionId);
1528 : :
1529 : : /*
1530 : : * We identify snapshots by the LSN they are valid for. We don't need to
1531 : : * include timelines in the name as each LSN maps to exactly one timeline
1532 : : * unless the user used pg_resetwal or similar. If a user did so, there's
1533 : : * no hope continuing to decode anyway.
1534 : : */
562 michael@paquier.xyz 1535 : 539 : sprintf(path, "%s/%X-%X.snap",
1536 : : PG_LOGICAL_SNAPSHOTS_DIR,
1846 peter@eisentraut.org 1537 : 539 : LSN_FORMAT_ARGS(lsn));
1538 : :
1539 : : /*
1540 : : * first check whether some other backend already has written the snapshot
1541 : : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1542 : : * as a valid state. Everything else is an unexpected error.
1543 : : */
4395 rhaas@postgresql.org 1544 : 539 : ret = stat(path, &stat_buf);
1545 : :
1546 [ + + - + ]: 539 : if (ret != 0 && errno != ENOENT)
4395 rhaas@postgresql.org 1547 [ # # ]:UBC 0 : ereport(ERROR,
1548 : : (errcode_for_file_access(),
1549 : : errmsg("could not stat file \"%s\": %m", path)));
1550 : :
4395 rhaas@postgresql.org 1551 [ + + ]:CBC 539 : else if (ret == 0)
1552 : : {
1553 : : /*
1554 : : * somebody else has already serialized to this point, don't overwrite
1555 : : * but remember location, so we don't need to read old data again.
1556 : : *
1557 : : * To be sure it has been synced to disk after the rename() from the
1558 : : * tempfile filename to the real filename, we just repeat the fsync.
1559 : : * That ought to be cheap because in most scenarios it should already
1560 : : * be safely on disk.
1561 : : */
1562 : 223 : fsync_fname(path, false);
562 michael@paquier.xyz 1563 : 223 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1564 : :
4395 rhaas@postgresql.org 1565 : 223 : builder->last_serialized_snapshot = lsn;
1566 : 223 : goto out;
1567 : : }
1568 : :
1569 : : /*
1570 : : * there is an obvious race condition here between the time we stat(2) the
1571 : : * file and us writing the file. But we rename the file into place
1572 : : * atomically and all files created need to contain the same data anyway,
1573 : : * so this is perfectly fine, although a bit of a resource waste. Locking
1574 : : * seems like pointless complication.
1575 : : */
1576 [ + + ]: 316 : elog(DEBUG1, "serializing snapshot to %s", path);
1577 : :
1578 : : /* to make sure only we will write to this tempfile, include pid */
562 michael@paquier.xyz 1579 : 316 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1580 : : PG_LOGICAL_SNAPSHOTS_DIR,
1846 peter@eisentraut.org 1581 : 316 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1582 : :
1583 : : /*
1584 : : * Unlink temporary file if it already exists, needs to have been before a
1585 : : * crash/error since we won't enter this function twice from within a
1586 : : * single decoding slot/backend and the temporary file contains the pid of
1587 : : * the current process.
1588 : : */
4395 rhaas@postgresql.org 1589 [ + - - + ]: 316 : if (unlink(tmppath) != 0 && errno != ENOENT)
4395 rhaas@postgresql.org 1590 [ # # ]:UBC 0 : ereport(ERROR,
1591 : : (errcode_for_file_access(),
1592 : : errmsg("could not remove file \"%s\": %m", tmppath)));
1593 : :
1312 akapila@postgresql.o 1594 :CBC 316 : old_ctx = MemoryContextSwitchTo(builder->context);
1595 : :
1596 : : /* Get the catalog modifying transactions that are yet not committed */
1597 : 316 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1229 drowley@postgresql.o 1598 : 316 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1599 : :
4395 rhaas@postgresql.org 1600 : 316 : needed_length = sizeof(SnapBuildOnDisk) +
1312 akapila@postgresql.o 1601 : 316 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1602 : :
1603 : 316 : ondisk_c = palloc0(needed_length);
4395 rhaas@postgresql.org 1604 : 316 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1605 : 316 : ondisk->magic = SNAPBUILD_MAGIC;
1606 : 316 : ondisk->version = SNAPBUILD_VERSION;
1607 : 316 : ondisk->length = needed_length;
4149 heikki.linnakangas@i 1608 : 316 : INIT_CRC32C(ondisk->checksum);
1609 : 316 : COMP_CRC32C(ondisk->checksum,
1610 : : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1611 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
4395 rhaas@postgresql.org 1612 : 316 : ondisk_c += sizeof(SnapBuildOnDisk);
1613 : :
1614 : 316 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1615 : : /* NULL-ify memory-only data */
1616 : 316 : ondisk->builder.context = NULL;
1617 : 316 : ondisk->builder.snapshot = NULL;
1618 : 316 : ondisk->builder.reorder = NULL;
1619 : 316 : ondisk->builder.committed.xip = NULL;
1312 akapila@postgresql.o 1620 : 316 : ondisk->builder.catchange.xip = NULL;
1621 : : /* update catchange only on disk data */
1622 : 316 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1623 : :
4149 heikki.linnakangas@i 1624 : 316 : COMP_CRC32C(ondisk->checksum,
1625 : : &ondisk->builder,
1626 : : sizeof(SnapBuild));
1627 : :
1628 : : /* copy committed xacts */
1312 akapila@postgresql.o 1629 [ + + ]: 316 : if (builder->committed.xcnt > 0)
1630 : : {
1631 : 55 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1632 : 55 : memcpy(ondisk_c, builder->committed.xip, sz);
1633 : 55 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1634 : 55 : ondisk_c += sz;
1635 : : }
1636 : :
1637 : : /* copy catalog modifying xacts */
1638 [ + + ]: 316 : if (catchange_xcnt > 0)
1639 : : {
1640 : 10 : sz = sizeof(TransactionId) * catchange_xcnt;
1641 : 10 : memcpy(ondisk_c, catchange_xip, sz);
1642 : 10 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1643 : 10 : ondisk_c += sz;
1644 : : }
1645 : :
4141 andres@anarazel.de 1646 : 316 : FIN_CRC32C(ondisk->checksum);
1647 : :
1648 : : /* we have valid data now, open tempfile and write it there */
4395 rhaas@postgresql.org 1649 : 316 : fd = OpenTransientFile(tmppath,
1650 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1651 [ - + ]: 316 : if (fd < 0)
4395 rhaas@postgresql.org 1652 [ # # ]:UBC 0 : ereport(ERROR,
1653 : : (errcode_for_file_access(),
1654 : : errmsg("could not open file \"%s\": %m", tmppath)));
1655 : :
2779 michael@paquier.xyz 1656 :CBC 316 : errno = 0;
3284 rhaas@postgresql.org 1657 : 316 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
4395 1658 [ - + ]: 316 : if ((write(fd, ondisk, needed_length)) != needed_length)
1659 : : {
2820 michael@paquier.xyz 1660 :UBC 0 : int save_errno = errno;
1661 : :
4395 rhaas@postgresql.org 1662 : 0 : CloseTransientFile(fd);
1663 : :
1664 : : /* if write didn't set errno, assume problem is no disk space */
2820 michael@paquier.xyz 1665 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4395 rhaas@postgresql.org 1666 [ # # ]: 0 : ereport(ERROR,
1667 : : (errcode_for_file_access(),
1668 : : errmsg("could not write to file \"%s\": %m", tmppath)));
1669 : : }
3284 rhaas@postgresql.org 1670 :CBC 316 : pgstat_report_wait_end();
1671 : :
1672 : : /*
1673 : : * fsync the file before renaming so that even if we crash after this we
1674 : : * have either a fully valid file or nothing.
1675 : : *
1676 : : * It's safe to just ERROR on fsync() here because we'll retry the whole
1677 : : * operation including the writes.
1678 : : *
1679 : : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1680 : : * some noticeable overhead since it's performed synchronously during
1681 : : * decoding?
1682 : : */
1683 : 316 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
4395 1684 [ - + ]: 316 : if (pg_fsync(fd) != 0)
1685 : : {
2820 michael@paquier.xyz 1686 :UBC 0 : int save_errno = errno;
1687 : :
4395 rhaas@postgresql.org 1688 : 0 : CloseTransientFile(fd);
2820 michael@paquier.xyz 1689 : 0 : errno = save_errno;
4395 rhaas@postgresql.org 1690 [ # # ]: 0 : ereport(ERROR,
1691 : : (errcode_for_file_access(),
1692 : : errmsg("could not fsync file \"%s\": %m", tmppath)));
1693 : : }
3284 rhaas@postgresql.org 1694 :CBC 316 : pgstat_report_wait_end();
1695 : :
2444 peter@eisentraut.org 1696 [ - + ]: 316 : if (CloseTransientFile(fd) != 0)
2563 michael@paquier.xyz 1697 [ # # ]:UBC 0 : ereport(ERROR,
1698 : : (errcode_for_file_access(),
1699 : : errmsg("could not close file \"%s\": %m", tmppath)));
1700 : :
562 michael@paquier.xyz 1701 :CBC 316 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1702 : :
1703 : : /*
1704 : : * We may overwrite the work from some other backend, but that's ok, our
1705 : : * snapshot is valid as well, we'll just have done some superfluous work.
1706 : : */
4395 rhaas@postgresql.org 1707 [ - + ]: 316 : if (rename(tmppath, path) != 0)
1708 : : {
4395 rhaas@postgresql.org 1709 [ # # ]:UBC 0 : ereport(ERROR,
1710 : : (errcode_for_file_access(),
1711 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1712 : : tmppath, path)));
1713 : : }
1714 : :
1715 : : /* make sure we persist */
4395 rhaas@postgresql.org 1716 :CBC 316 : fsync_fname(path, false);
562 michael@paquier.xyz 1717 : 316 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1718 : :
1719 : : /*
1720 : : * Now there's no way we can lose the dumped state anymore, remember this
1721 : : * as a serialization point.
1722 : : */
4395 rhaas@postgresql.org 1723 : 316 : builder->last_serialized_snapshot = lsn;
1724 : :
1312 akapila@postgresql.o 1725 : 316 : MemoryContextSwitchTo(old_ctx);
1726 : :
4395 rhaas@postgresql.org 1727 : 539 : out:
1728 : 539 : ReorderBufferSetRestartPoint(builder->reorder,
1729 : : builder->last_serialized_snapshot);
1730 : : /* be tidy */
1887 akapila@postgresql.o 1731 [ + + ]: 539 : if (ondisk)
1732 : 316 : pfree(ondisk);
1312 1733 [ + + ]: 539 : if (catchange_xip)
1734 : 10 : pfree(catchange_xip);
1735 : : }
1736 : :
1737 : : /*
1738 : : * Restore the logical snapshot file contents to 'ondisk'.
1739 : : *
1740 : : * 'context' is the memory context where the catalog modifying/committed xid
1741 : : * will live.
1742 : : * If 'missing_ok' is true, will not throw an error if the file is not found.
1743 : : */
1744 : : bool
369 msawada@postgresql.o 1745 : 22 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1746 : : MemoryContext context, bool missing_ok)
1747 : : {
1748 : : int fd;
1749 : : pg_crc32c checksum;
1750 : : Size sz;
1751 : : char path[MAXPGPATH];
1752 : :
1753 : 22 : sprintf(path, "%s/%X-%X.snap",
1754 : : PG_LOGICAL_SNAPSHOTS_DIR,
1755 : 22 : LSN_FORMAT_ARGS(lsn));
1756 : :
3095 peter_e@gmx.net 1757 : 22 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1758 : :
517 msawada@postgresql.o 1759 [ + + ]: 22 : if (fd < 0)
1760 : : {
1761 [ + - + - ]: 11 : if (missing_ok && errno == ENOENT)
1762 : 11 : return false;
1763 : :
4395 rhaas@postgresql.org 1764 [ # # ]:UBC 0 : ereport(ERROR,
1765 : : (errcode_for_file_access(),
1766 : : errmsg("could not open file \"%s\": %m", path)));
1767 : : }
1768 : :
1769 : : /* ----
1770 : : * Make sure the snapshot had been stored safely to disk, that's normally
1771 : : * cheap.
1772 : : * Note that we do not need PANIC here, nobody will be able to use the
1773 : : * slot without fsyncing, and saving it won't succeed without an fsync()
1774 : : * either...
1775 : : * ----
1776 : : */
4395 rhaas@postgresql.org 1777 :CBC 11 : fsync_fname(path, false);
562 michael@paquier.xyz 1778 : 11 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1779 : :
1780 : : /* read statically sized portion of snapshot */
537 peter@eisentraut.org 1781 : 11 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1782 : :
517 msawada@postgresql.o 1783 [ - + ]: 11 : if (ondisk->magic != SNAPBUILD_MAGIC)
4395 rhaas@postgresql.org 1784 [ # # ]:UBC 0 : ereport(ERROR,
1785 : : (errcode(ERRCODE_DATA_CORRUPTED),
1786 : : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1787 : : path, ondisk->magic, SNAPBUILD_MAGIC)));
1788 : :
517 msawada@postgresql.o 1789 [ - + ]:CBC 11 : if (ondisk->version != SNAPBUILD_VERSION)
4395 rhaas@postgresql.org 1790 [ # # ]:UBC 0 : ereport(ERROR,
1791 : : (errcode(ERRCODE_DATA_CORRUPTED),
1792 : : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1793 : : path, ondisk->version, SNAPBUILD_VERSION)));
1794 : :
4149 heikki.linnakangas@i 1795 :CBC 11 : INIT_CRC32C(checksum);
1796 : 11 : COMP_CRC32C(checksum,
1797 : : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1798 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1799 : :
1800 : : /* read SnapBuild */
537 peter@eisentraut.org 1801 : 11 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
517 msawada@postgresql.o 1802 : 11 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1803 : :
1804 : : /* restore committed xacts information */
1805 [ + + ]: 11 : if (ondisk->builder.committed.xcnt > 0)
1806 : : {
1807 : 5 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1808 : 5 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
537 peter@eisentraut.org 1809 : 5 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
517 msawada@postgresql.o 1810 : 5 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1811 : : }
1812 : :
1813 : : /* restore catalog modifying xacts information */
1814 [ + + ]: 11 : if (ondisk->builder.catchange.xcnt > 0)
1815 : : {
1816 : 5 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1817 : 5 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
537 peter@eisentraut.org 1818 : 5 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
517 msawada@postgresql.o 1819 : 5 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1820 : : }
1821 : :
2444 peter@eisentraut.org 1822 [ - + ]: 11 : if (CloseTransientFile(fd) != 0)
2563 michael@paquier.xyz 1823 [ # # ]:UBC 0 : ereport(ERROR,
1824 : : (errcode_for_file_access(),
1825 : : errmsg("could not close file \"%s\": %m", path)));
1826 : :
4141 andres@anarazel.de 1827 :CBC 11 : FIN_CRC32C(checksum);
1828 : :
1829 : : /* verify checksum of what we've read */
517 msawada@postgresql.o 1830 [ - + ]: 11 : if (!EQ_CRC32C(checksum, ondisk->checksum))
4395 rhaas@postgresql.org 1831 [ # # ]:UBC 0 : ereport(ERROR,
1832 : : (errcode(ERRCODE_DATA_CORRUPTED),
1833 : : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1834 : : path, checksum, ondisk->checksum)));
1835 : :
517 msawada@postgresql.o 1836 :CBC 11 : return true;
1837 : : }
1838 : :
1839 : : /*
1840 : : * Restore a snapshot into 'builder' if previously one has been stored at the
1841 : : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1842 : : */
1843 : : static bool
1844 : 20 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1845 : : {
1846 : : SnapBuildOnDisk ondisk;
1847 : :
1848 : : /* no point in loading a snapshot if we're already there */
1849 [ - + ]: 20 : if (builder->state == SNAPBUILD_CONSISTENT)
517 msawada@postgresql.o 1850 :UBC 0 : return false;
1851 : :
1852 : : /* validate and restore the snapshot to 'ondisk' */
369 msawada@postgresql.o 1853 [ + + ]:CBC 20 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
517 1854 : 11 : return false;
1855 : :
1856 : : /*
1857 : : * ok, we now have a sensible snapshot here, figure out if it has more
1858 : : * information than we have.
1859 : : */
1860 : :
1861 : : /*
1862 : : * We are only interested in consistent snapshots for now, comparing
1863 : : * whether one incomplete snapshot is more "advanced" seems to be
1864 : : * unnecessarily complex.
1865 : : */
4395 rhaas@postgresql.org 1866 [ - + ]: 9 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
4395 rhaas@postgresql.org 1867 :UBC 0 : goto snapshot_not_interesting;
1868 : :
1869 : : /*
1870 : : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1871 : : * be available.
1872 : : */
4395 rhaas@postgresql.org 1873 [ - + ]:CBC 9 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
4395 rhaas@postgresql.org 1874 :UBC 0 : goto snapshot_not_interesting;
1875 : :
1876 : : /*
1877 : : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1878 : : * possible that an old value may remain.
1879 : : */
1854 andres@anarazel.de 1880 [ - + ]:CBC 9 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
985 dgustafsson@postgres 1881 : 9 : builder->next_phase_at = InvalidTransactionId;
1882 : :
1883 : : /* ok, we think the snapshot is sensible, copy over everything important */
4395 rhaas@postgresql.org 1884 : 9 : builder->xmin = ondisk.builder.xmin;
1885 : 9 : builder->xmax = ondisk.builder.xmax;
1886 : 9 : builder->state = ondisk.builder.state;
1887 : :
1888 : 9 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1889 : : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1890 : : /* don't overwrite preallocated xip, if we don't have anything here */
1891 [ + + ]: 9 : if (builder->committed.xcnt > 0)
1892 : : {
1893 : 3 : pfree(builder->committed.xip);
1894 : 3 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1895 : 3 : builder->committed.xip = ondisk.builder.committed.xip;
1896 : : }
1897 : 9 : ondisk.builder.committed.xip = NULL;
1898 : :
1899 : : /* set catalog modifying transactions */
1312 akapila@postgresql.o 1900 [ - + ]: 9 : if (builder->catchange.xip)
1312 akapila@postgresql.o 1901 :UBC 0 : pfree(builder->catchange.xip);
1312 akapila@postgresql.o 1902 :CBC 9 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1903 : 9 : builder->catchange.xip = ondisk.builder.catchange.xip;
1904 : 9 : ondisk.builder.catchange.xip = NULL;
1905 : :
1906 : : /* our snapshot is not interesting anymore, build a new one */
4395 rhaas@postgresql.org 1907 [ - + ]: 9 : if (builder->snapshot != NULL)
1908 : : {
4395 rhaas@postgresql.org 1909 :UBC 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1910 : : }
3196 andres@anarazel.de 1911 :CBC 9 : builder->snapshot = SnapBuildBuildSnapshot(builder);
4395 rhaas@postgresql.org 1912 : 9 : SnapBuildSnapIncRefcount(builder->snapshot);
1913 : :
1914 : 9 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1915 : :
1916 [ - + ]: 9 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1917 : :
1918 [ + - ]: 9 : ereport(LOG,
1919 : : errmsg("logical decoding found consistent point at %X/%08X",
1920 : : LSN_FORMAT_ARGS(lsn)),
1921 : : errdetail("Logical decoding will begin using saved snapshot."));
1922 : 9 : return true;
1923 : :
4395 rhaas@postgresql.org 1924 :UBC 0 : snapshot_not_interesting:
1925 [ # # ]: 0 : if (ondisk.builder.committed.xip != NULL)
1926 : 0 : pfree(ondisk.builder.committed.xip);
1312 akapila@postgresql.o 1927 [ # # ]: 0 : if (ondisk.builder.catchange.xip != NULL)
1928 : 0 : pfree(ondisk.builder.catchange.xip);
4395 rhaas@postgresql.org 1929 : 0 : return false;
1930 : : }
1931 : :
1932 : : /*
1933 : : * Read the contents of the serialized snapshot to 'dest'.
1934 : : */
1935 : : static void
537 peter@eisentraut.org 1936 :CBC 32 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1937 : : {
1938 : : int readBytes;
1939 : :
1312 akapila@postgresql.o 1940 : 32 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1941 : 32 : readBytes = read(fd, dest, size);
1942 : 32 : pgstat_report_wait_end();
1943 [ - + ]: 32 : if (readBytes != size)
1944 : : {
1312 akapila@postgresql.o 1945 :UBC 0 : int save_errno = errno;
1946 : :
1947 : 0 : CloseTransientFile(fd);
1948 : :
1949 [ # # ]: 0 : if (readBytes < 0)
1950 : : {
1951 : 0 : errno = save_errno;
1952 [ # # ]: 0 : ereport(ERROR,
1953 : : (errcode_for_file_access(),
1954 : : errmsg("could not read file \"%s\": %m", path)));
1955 : : }
1956 : : else
1957 [ # # ]: 0 : ereport(ERROR,
1958 : : (errcode(ERRCODE_DATA_CORRUPTED),
1959 : : errmsg("could not read file \"%s\": read %d of %zu",
1960 : : path, readBytes, size)));
1961 : : }
1312 akapila@postgresql.o 1962 :CBC 32 : }
1963 : :
1964 : : /*
1965 : : * Remove all serialized snapshots that are not required anymore because no
1966 : : * slot can need them. This doesn't actually have to run during a checkpoint,
1967 : : * but it's a convenient point to schedule this.
1968 : : *
1969 : : * NB: We run this during checkpoints even if logical decoding is disabled so
1970 : : * we cleanup old slots at some point after it got disabled.
1971 : : */
1972 : : void
4395 rhaas@postgresql.org 1973 : 1802 : CheckPointSnapBuild(void)
1974 : : {
1975 : : XLogRecPtr cutoff;
1976 : : XLogRecPtr redo;
1977 : : DIR *snap_dir;
1978 : : struct dirent *snap_de;
1979 : : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1980 : :
1981 : : /*
1982 : : * We start off with a minimum of the last redo pointer. No new
1983 : : * replication slot will start before that, so that's a safe upper bound
1984 : : * for removal.
1985 : : */
1986 : 1802 : redo = GetRedoRecPtr();
1987 : :
1988 : : /* now check for the restart ptrs from existing slots */
1989 : 1802 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1990 : :
1991 : : /* don't start earlier than the restart lsn */
1992 [ + + ]: 1802 : if (redo < cutoff)
1993 : 1 : cutoff = redo;
1994 : :
562 michael@paquier.xyz 1995 : 1802 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1996 [ + + ]: 5691 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1997 : : {
1998 : : uint32 hi;
1999 : : uint32 lo;
2000 : : XLogRecPtr lsn;
2001 : : PGFileType de_type;
2002 : :
4395 rhaas@postgresql.org 2003 [ + + ]: 3889 : if (strcmp(snap_de->d_name, ".") == 0 ||
2004 [ + + ]: 2087 : strcmp(snap_de->d_name, "..") == 0)
2005 : 3604 : continue;
2006 : :
562 michael@paquier.xyz 2007 : 285 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
1290 2008 : 285 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2009 : :
2010 [ + - - + ]: 285 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2011 : : {
4395 rhaas@postgresql.org 2012 [ # # ]:UBC 0 : elog(DEBUG1, "only regular files expected: %s", path);
2013 : 0 : continue;
2014 : : }
2015 : :
2016 : : /*
2017 : : * temporary filenames from SnapBuildSerialize() include the LSN and
2018 : : * everything but are postfixed by .$pid.tmp. We can just remove them
2019 : : * the same as other files because there can be none that are
2020 : : * currently being written that are older than cutoff.
2021 : : *
2022 : : * We just log a message if a file doesn't fit the pattern, it's
2023 : : * probably some editors lock/state file or similar...
2024 : : */
4395 rhaas@postgresql.org 2025 [ - + ]:CBC 285 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2026 : : {
4395 rhaas@postgresql.org 2027 [ # # ]:UBC 0 : ereport(LOG,
2028 : : (errmsg("could not parse file name \"%s\"", path)));
2029 : 0 : continue;
2030 : : }
2031 : :
4395 rhaas@postgresql.org 2032 :CBC 285 : lsn = ((uint64) hi) << 32 | lo;
2033 : :
2034 : : /* check whether we still need it */
129 alvherre@kurilemu.de 2035 [ + + + + ]:GNC 285 : if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2036 : : {
4395 rhaas@postgresql.org 2037 [ + + ]:CBC 188 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2038 : :
2039 : : /*
2040 : : * It's not particularly harmful, though strange, if we can't
2041 : : * remove the file here. Don't prevent the checkpoint from
2042 : : * completing, that'd be a cure worse than the disease.
2043 : : */
2044 [ - + ]: 188 : if (unlink(path) < 0)
2045 : : {
4395 rhaas@postgresql.org 2046 [ # # ]:UBC 0 : ereport(LOG,
2047 : : (errcode_for_file_access(),
2048 : : errmsg("could not remove file \"%s\": %m",
2049 : : path)));
2050 : 0 : continue;
2051 : : }
2052 : : }
2053 : : }
4395 rhaas@postgresql.org 2054 :CBC 1802 : FreeDir(snap_dir);
2055 : 1802 : }
2056 : :
2057 : : /*
2058 : : * Check if a logical snapshot at the specified point has been serialized.
2059 : : */
2060 : : bool
711 akapila@postgresql.o 2061 : 15 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2062 : : {
2063 : : char path[MAXPGPATH];
2064 : : int ret;
2065 : : struct stat stat_buf;
2066 : :
213 fujii@postgresql.org 2067 : 15 : sprintf(path, "%s/%X-%X.snap",
2068 : : PG_LOGICAL_SNAPSHOTS_DIR,
711 akapila@postgresql.o 2069 : 15 : LSN_FORMAT_ARGS(lsn));
2070 : :
2071 : 15 : ret = stat(path, &stat_buf);
2072 : :
2073 [ + + - + ]: 15 : if (ret != 0 && errno != ENOENT)
711 akapila@postgresql.o 2074 [ # # ]:UBC 0 : ereport(ERROR,
2075 : : (errcode_for_file_access(),
2076 : : errmsg("could not stat file \"%s\": %m", path)));
2077 : :
711 akapila@postgresql.o 2078 :CBC 15 : return ret == 0;
2079 : : }
|