Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * bulk_write.c
4 : : * Efficiently and reliably populate a new relation
5 : : *
6 : : * The assumption is that no other backends access the relation while we are
7 : : * loading it, so we can take some shortcuts. Pages already present in the
8 : : * indicated fork when the bulk write operation is started are not modified
9 : : * unless explicitly written to. Do not mix operations through the regular
10 : : * buffer manager and the bulk loading interface!
11 : : *
12 : : * We bypass the buffer manager to avoid the locking overhead, and call
13 : : * smgrextend() directly. A downside is that the pages will need to be
14 : : * re-read into shared buffers on first use after the build finishes. That's
15 : : * usually a good tradeoff for large relations, and for small relations, the
16 : : * overhead isn't very significant compared to creating the relation in the
17 : : * first place.
18 : : *
19 : : * The pages are WAL-logged if needed. To save on WAL header overhead, we
20 : : * WAL-log several pages in one record.
21 : : *
22 : : * One tricky point is that because we bypass the buffer manager, we need to
23 : : * register the relation for fsyncing at the next checkpoint ourselves, and
24 : : * make sure that the relation is correctly fsync'd by us or the checkpointer
25 : : * even if a checkpoint happens concurrently.
26 : : *
27 : : *
28 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
29 : : * Portions Copyright (c) 1994, Regents of the University of California
30 : : *
31 : : *
32 : : * IDENTIFICATION
33 : : * src/backend/storage/smgr/bulk_write.c
34 : : *
35 : : *-------------------------------------------------------------------------
36 : : */
37 : : #include "postgres.h"
38 : :
39 : : #include "access/xloginsert.h"
40 : : #include "access/xlogrecord.h"
41 : : #include "storage/bufpage.h"
42 : : #include "storage/bulk_write.h"
43 : : #include "storage/proc.h"
44 : : #include "storage/smgr.h"
45 : : #include "utils/rel.h"
46 : :
47 : : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
48 : :
49 : : static const PGIOAlignedBlock zero_buffer = {0}; /* worth BLCKSZ */
50 : :
51 : : typedef struct PendingWrite
52 : : {
53 : : BulkWriteBuffer buf;
54 : : BlockNumber blkno;
55 : : bool page_std;
56 : : } PendingWrite;
57 : :
58 : : /*
59 : : * Bulk writer state for one relation fork.
60 : : */
61 : : struct BulkWriteState
62 : : {
63 : : /* Information about the target relation we're writing */
64 : : SMgrRelation smgr;
65 : : ForkNumber forknum;
66 : : bool use_wal;
67 : :
68 : : /* We keep several writes queued, and WAL-log them in batches */
69 : : int npending;
70 : : PendingWrite pending_writes[MAX_PENDING_WRITES];
71 : :
72 : : /* Current size of the relation */
73 : : BlockNumber relsize;
74 : :
75 : : /* The RedoRecPtr at the time that the bulk operation started */
76 : : XLogRecPtr start_RedoRecPtr;
77 : :
78 : : MemoryContext memcxt;
79 : : };
80 : :
81 : : static void smgr_bulk_flush(BulkWriteState *bulkstate);
82 : :
83 : : /*
84 : : * Start a bulk write operation on a relation fork.
85 : : */
86 : : BulkWriteState *
561 heikki.linnakangas@i 87 :CBC 25798 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
88 : : {
89 : 25798 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
90 : : forknum,
91 [ + + + + : 25798 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
+ + + + +
+ ]
92 : : }
93 : :
94 : : /*
95 : : * Start a bulk write operation on a relation fork.
96 : : *
97 : : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
98 : : */
99 : : BulkWriteState *
100 : 25887 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
101 : : {
102 : : BulkWriteState *state;
103 : :
104 : 25887 : state = palloc(sizeof(BulkWriteState));
105 : 25887 : state->smgr = smgr;
106 : 25887 : state->forknum = forknum;
107 : 25887 : state->use_wal = use_wal;
108 : :
109 : 25887 : state->npending = 0;
288 110 : 25887 : state->relsize = smgrnblocks(smgr, forknum);
111 : :
561 112 : 25887 : state->start_RedoRecPtr = GetRedoRecPtr();
113 : :
114 : : /*
115 : : * Remember the memory context. We will use it to allocate all the
116 : : * buffers later.
117 : : */
118 : 25887 : state->memcxt = CurrentMemoryContext;
119 : :
120 : 25887 : return state;
121 : : }
122 : :
123 : : /*
124 : : * Finish bulk write operation.
125 : : *
126 : : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
127 : : * the relation if needed.
128 : : */
129 : : void
130 : 25887 : smgr_bulk_finish(BulkWriteState *bulkstate)
131 : : {
132 : : /* WAL-log and flush any remaining pages */
133 : 25887 : smgr_bulk_flush(bulkstate);
134 : :
135 : : /*
136 : : * Fsync the relation, or register it for the next checkpoint, if
137 : : * necessary.
138 : : */
386 139 [ + + ]: 25887 : if (SmgrIsTemp(bulkstate->smgr))
140 : : {
141 : : /* Temporary relations don't need to be fsync'd, ever */
142 : : }
143 [ + + ]: 24659 : else if (!bulkstate->use_wal)
144 : : {
145 : : /*----------
146 : : * This is either an unlogged relation, or a permanent relation but we
147 : : * skipped WAL-logging because wal_level=minimal:
148 : : *
149 : : * A) Unlogged relation
150 : : *
151 : : * Unlogged relations will go away on crash, but they need to be
152 : : * fsync'd on a clean shutdown. It's sufficient to call
153 : : * smgrregistersync(), that ensures that the checkpointer will
154 : : * flush it at the shutdown checkpoint. (It will flush it on the
155 : : * next online checkpoint too, which is not strictly necessary.)
156 : : *
157 : : * Note that the init-fork of an unlogged relation is not
158 : : * considered unlogged for our purposes. It's treated like a
159 : : * regular permanent relation. The callers will pass use_wal=true
160 : : * for the init fork.
161 : : *
162 : : * B) Permanent relation, WAL-logging skipped because wal_level=minimal
163 : : *
164 : : * This is a new relation, and we didn't WAL-log the pages as we
165 : : * wrote, but they need to be fsync'd before commit.
166 : : *
167 : : * We don't need to do that here, however. The fsync() is done at
168 : : * commit, by smgrDoPendingSyncs() (*).
169 : : *
170 : : * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
171 : : * relation at commit instead of fsyncing it, if the relation was
172 : : * very small, but it's smgrDoPendingSyncs() responsibility in any
173 : : * case.
174 : : *
175 : : * We cannot distinguish the two here, so conservatively assume it's
176 : : * an unlogged relation. A permanent relation with wal_level=minimal
177 : : * would require no actions, see above.
178 : : */
179 : 5829 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
180 : : }
181 : : else
182 : : {
183 : : /*
184 : : * Permanent relation, WAL-logged normally.
185 : : *
186 : : * We already WAL-logged all the pages, so they will be replayed from
187 : : * WAL on crash. However, when we wrote out the pages, we passed
188 : : * skipFsync=true to avoid the overhead of registering all the writes
189 : : * with the checkpointer. Register the whole relation now.
190 : : *
191 : : * There is one hole in that idea: If a checkpoint occurred while we
192 : : * were writing the pages, it already missed fsyncing the pages we had
193 : : * written before the checkpoint started. A crash later on would
194 : : * replay the WAL starting from the checkpoint, therefore it wouldn't
195 : : * replay our earlier WAL records. So if a checkpoint started after
196 : : * the bulk write, fsync the files now.
197 : : */
198 : :
199 : : /*
200 : : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
201 : : * smgrregistersync() calls.
202 : : */
561 203 [ - + ]: 18830 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
204 : 18830 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
205 : :
206 [ + + ]: 18830 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
207 : : {
208 : : /*
209 : : * A checkpoint occurred and it didn't know about our writes, so
210 : : * fsync() the relation ourselves.
211 : : */
561 heikki.linnakangas@i 212 :GBC 6 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
213 : 6 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
214 [ - + ]: 6 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
215 : : }
216 : : else
217 : : {
561 heikki.linnakangas@i 218 :CBC 18824 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
219 : 18824 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
220 : : }
221 : : }
222 : 25887 : }
223 : :
224 : : static int
225 : 118319 : buffer_cmp(const void *a, const void *b)
226 : : {
227 : 118319 : const PendingWrite *bufa = (const PendingWrite *) a;
228 : 118319 : const PendingWrite *bufb = (const PendingWrite *) b;
229 : :
230 : : /* We should not see duplicated writes for the same block */
231 [ - + ]: 118319 : Assert(bufa->blkno != bufb->blkno);
232 [ + + ]: 118319 : if (bufa->blkno > bufb->blkno)
233 : 55975 : return 1;
234 : : else
235 : 62344 : return -1;
236 : : }
237 : :
238 : : /*
239 : : * Finish all the pending writes.
240 : : */
241 : : static void
242 : 26460 : smgr_bulk_flush(BulkWriteState *bulkstate)
243 : : {
244 : 26460 : int npending = bulkstate->npending;
245 : 26460 : PendingWrite *pending_writes = bulkstate->pending_writes;
246 : :
247 [ + + ]: 26460 : if (npending == 0)
248 : 103 : return;
249 : :
250 [ + + ]: 26357 : if (npending > 1)
251 : 5676 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
252 : :
253 [ + + ]: 26357 : if (bulkstate->use_wal)
254 : : {
255 : : BlockNumber blknos[MAX_PENDING_WRITES];
256 : : Page pages[MAX_PENDING_WRITES];
257 : 19073 : bool page_std = true;
258 : :
259 [ + + ]: 56625 : for (int i = 0; i < npending; i++)
260 : : {
261 : 37552 : blknos[i] = pending_writes[i].blkno;
262 : 37552 : pages[i] = pending_writes[i].buf->data;
263 : :
264 : : /*
265 : : * If any of the pages use !page_std, we log them all as such.
266 : : * That's a bit wasteful, but in practice, a mix of standard and
267 : : * non-standard page layout is rare. None of the built-in AMs do
268 : : * that.
269 : : */
270 [ + + ]: 37552 : if (!pending_writes[i].page_std)
271 : 52 : page_std = false;
272 : : }
273 : 19073 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
274 : : npending, blknos, pages, page_std);
275 : : }
276 : :
277 [ + + ]: 85030 : for (int i = 0; i < npending; i++)
278 : : {
279 : 58673 : BlockNumber blkno = pending_writes[i].blkno;
280 : 58673 : Page page = pending_writes[i].buf->data;
281 : :
282 : 58673 : PageSetChecksumInplace(page, blkno);
283 : :
288 284 [ + + ]: 58673 : if (blkno >= bulkstate->relsize)
285 : : {
286 : : /*
287 : : * If we have to write pages nonsequentially, fill in the space
288 : : * with zeroes until we come back and overwrite. This is not
289 : : * logically necessary on standard Unix filesystems (unwritten
290 : : * space will read as zeroes anyway), but it should help to avoid
291 : : * fragmentation. The dummy pages aren't WAL-logged though.
292 : : */
293 [ + + ]: 58673 : while (blkno > bulkstate->relsize)
294 : : {
295 : : /* don't set checksum for all-zero page */
561 296 : 274 : smgrextend(bulkstate->smgr, bulkstate->forknum,
297 : : bulkstate->relsize,
298 : : &zero_buffer,
299 : : true);
288 300 : 274 : bulkstate->relsize++;
301 : : }
302 : :
561 303 : 58399 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
288 304 : 58399 : bulkstate->relsize++;
305 : : }
306 : : else
561 307 : 274 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
308 : 58673 : pfree(page);
309 : : }
310 : :
311 : 26357 : bulkstate->npending = 0;
312 : : }
313 : :
314 : : /*
315 : : * Queue write of 'buf'.
316 : : *
317 : : * NB: this takes ownership of 'buf'!
318 : : *
319 : : * You are only allowed to write a given block once as part of one bulk write
320 : : * operation.
321 : : */
322 : : void
323 : 58673 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
324 : : {
325 : : PendingWrite *w;
326 : :
327 : 58673 : w = &bulkstate->pending_writes[bulkstate->npending++];
328 : 58673 : w->buf = buf;
329 : 58673 : w->blkno = blocknum;
330 : 58673 : w->page_std = page_std;
331 : :
332 [ + + ]: 58673 : if (bulkstate->npending == MAX_PENDING_WRITES)
333 : 573 : smgr_bulk_flush(bulkstate);
334 : 58673 : }
335 : :
336 : : /*
337 : : * Allocate a new buffer which can later be written with smgr_bulk_write().
338 : : *
339 : : * There is no function to free the buffer. When you pass it to
340 : : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
341 : : * needed.
342 : : *
343 : : * This is currently implemented as a simple palloc, but could be implemented
344 : : * using a ring buffer or larger chunks in the future, so don't rely on it.
345 : : */
346 : : BulkWriteBuffer
347 : 58673 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
348 : : {
349 : 58673 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
350 : : }
|