Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * sharedtuplestore.c
4 : : * Simple mechanism for sharing tuples between backends.
5 : : *
6 : : * This module contains a shared temporary tuple storage mechanism providing
7 : : * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 : : * can write to a SharedTuplestore, and then multiple backends can later scan
9 : : * the stored tuples. Currently, the only scan type supported is a parallel
10 : : * scan where each backend reads an arbitrary subset of the tuples that were
11 : : * written.
12 : : *
13 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
14 : : * Portions Copyright (c) 1994, Regents of the University of California
15 : : *
16 : : * IDENTIFICATION
17 : : * src/backend/utils/sort/sharedtuplestore.c
18 : : *
19 : : *-------------------------------------------------------------------------
20 : : */
21 : :
22 : : #include "postgres.h"
23 : :
24 : : #include "access/htup.h"
25 : : #include "access/htup_details.h"
26 : : #include "storage/buffile.h"
27 : : #include "storage/lwlock.h"
28 : : #include "storage/sharedfileset.h"
29 : : #include "utils/sharedtuplestore.h"
30 : :
31 : : /*
32 : : * The size of chunks, in pages. This is somewhat arbitrarily set to match
33 : : * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
34 : : * at approximately the same rate as it allocates new chunks of memory to
35 : : * insert them into.
36 : : */
37 : : #define STS_CHUNK_PAGES 4
38 : : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39 : : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
40 : :
41 : : /* Chunk written to disk. */
42 : : typedef struct SharedTuplestoreChunk
43 : : {
44 : : int ntuples; /* Number of tuples in this chunk. */
45 : : int overflow; /* If overflow, how many including this one? */
46 : : char data[FLEXIBLE_ARRAY_MEMBER];
47 : : } SharedTuplestoreChunk;
48 : :
49 : : /* Per-participant shared state. */
50 : : typedef struct SharedTuplestoreParticipant
51 : : {
52 : : LWLock lock;
53 : : BlockNumber read_page; /* Page number for next read. */
54 : : BlockNumber npages; /* Number of pages written. */
55 : : bool writing; /* Used only for assertions. */
56 : : } SharedTuplestoreParticipant;
57 : :
58 : : /* The control object that lives in shared memory. */
59 : : struct SharedTuplestore
60 : : {
61 : : int nparticipants; /* Number of participants that can write. */
62 : : int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
63 : : size_t meta_data_size; /* Size of per-tuple header. */
64 : : char name[NAMEDATALEN]; /* A name for this tuplestore. */
65 : :
66 : : /* Followed by per-participant shared state. */
67 : : SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
68 : : };
69 : :
70 : : /* Per-participant state that lives in backend-local memory. */
71 : : struct SharedTuplestoreAccessor
72 : : {
73 : : int participant; /* My participant number. */
74 : : SharedTuplestore *sts; /* The shared state. */
75 : : SharedFileSet *fileset; /* The SharedFileSet holding files. */
76 : : MemoryContext context; /* Memory context for buffers. */
77 : :
78 : : /* State for reading. */
79 : : int read_participant; /* The current participant to read from. */
80 : : BufFile *read_file; /* The current file to read from. */
81 : : int read_ntuples_available; /* The number of tuples in chunk. */
82 : : int read_ntuples; /* How many tuples have we read from chunk? */
83 : : size_t read_bytes; /* How many bytes have we read from chunk? */
84 : : char *read_buffer; /* A buffer for loading tuples. */
85 : : size_t read_buffer_size;
86 : : BlockNumber read_next_page; /* Lowest block we'll consider reading. */
87 : :
88 : : /* State for writing. */
89 : : SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
90 : : BufFile *write_file; /* The current file to write to. */
91 : : char *write_pointer; /* Current write pointer within chunk. */
92 : : char *write_end; /* One past the end of the current chunk. */
93 : : };
94 : :
95 : : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
96 : : int participant);
97 : :
98 : : /*
99 : : * Return the amount of shared memory required to hold SharedTuplestore for a
100 : : * given number of participants.
101 : : */
102 : : size_t
3009 andres@anarazel.de 103 :CBC 2219 : sts_estimate(int participants)
104 : : {
105 : 4438 : return offsetof(SharedTuplestore, participants) +
106 : 2219 : sizeof(SharedTuplestoreParticipant) * participants;
107 : : }
108 : :
109 : : /*
110 : : * Initialize a SharedTuplestore in existing shared memory. There must be
111 : : * space for sts_estimate(participants) bytes. If flags includes the value
112 : : * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
113 : : * eagerly (but this isn't yet implemented).
114 : : *
115 : : * Tuples that are stored may optionally carry a piece of fixed sized
116 : : * meta-data which will be retrieved along with the tuple. This is useful for
117 : : * the hash values used in multi-batch hash joins, but could have other
118 : : * applications.
119 : : *
120 : : * The caller must supply a SharedFileSet, which is essentially a directory
121 : : * that will be cleaned up automatically, and a name which must be unique
122 : : * across all SharedTuplestores created in the same SharedFileSet.
123 : : */
124 : : SharedTuplestoreAccessor *
125 : 840 : sts_initialize(SharedTuplestore *sts, int participants,
126 : : int my_participant_number,
127 : : size_t meta_data_size,
128 : : int flags,
129 : : SharedFileSet *fileset,
130 : : const char *name)
131 : : {
132 : : SharedTuplestoreAccessor *accessor;
133 : : int i;
134 : :
135 [ - + ]: 840 : Assert(my_participant_number < participants);
136 : :
137 : 840 : sts->nparticipants = participants;
138 : 840 : sts->meta_data_size = meta_data_size;
139 : 840 : sts->flags = flags;
140 : :
141 [ - + ]: 840 : if (strlen(name) > sizeof(sts->name) - 1)
3009 andres@anarazel.de 142 [ # # ]:UBC 0 : elog(ERROR, "SharedTuplestore name too long");
3009 andres@anarazel.de 143 :CBC 840 : strcpy(sts->name, name);
144 : :
145 : : /*
146 : : * Limit meta-data so it + tuple size always fits into a single chunk.
147 : : * sts_puttuple() and sts_read_tuple() could be made to support scenarios
148 : : * where that's not the case, but it's not currently required. If so,
149 : : * meta-data size probably should be made variable, too.
150 : : */
151 [ - + ]: 840 : if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
3009 andres@anarazel.de 152 [ # # ]:UBC 0 : elog(ERROR, "meta-data too long");
153 : :
3009 andres@anarazel.de 154 [ + + ]:CBC 3138 : for (i = 0; i < participants; ++i)
155 : : {
156 : 2298 : LWLockInitialize(&sts->participants[i].lock,
157 : : LWTRANCHE_SHARED_TUPLESTORE);
158 : 2298 : sts->participants[i].read_page = 0;
1144 tmunro@postgresql.or 159 : 2298 : sts->participants[i].npages = 0;
3009 andres@anarazel.de 160 : 2298 : sts->participants[i].writing = false;
161 : : }
162 : :
95 michael@paquier.xyz 163 :GNC 840 : accessor = palloc0_object(SharedTuplestoreAccessor);
3009 andres@anarazel.de 164 :CBC 840 : accessor->participant = my_participant_number;
165 : 840 : accessor->sts = sts;
166 : 840 : accessor->fileset = fileset;
167 : 840 : accessor->context = CurrentMemoryContext;
168 : :
169 : 840 : return accessor;
170 : : }
171 : :
172 : : /*
173 : : * Attach to a SharedTuplestore that has been initialized by another backend,
174 : : * so that this backend can read and write tuples.
175 : : */
176 : : SharedTuplestoreAccessor *
177 : 1124 : sts_attach(SharedTuplestore *sts,
178 : : int my_participant_number,
179 : : SharedFileSet *fileset)
180 : : {
181 : : SharedTuplestoreAccessor *accessor;
182 : :
183 [ - + ]: 1124 : Assert(my_participant_number < sts->nparticipants);
184 : :
95 michael@paquier.xyz 185 :GNC 1124 : accessor = palloc0_object(SharedTuplestoreAccessor);
3009 andres@anarazel.de 186 :CBC 1124 : accessor->participant = my_participant_number;
187 : 1124 : accessor->sts = sts;
188 : 1124 : accessor->fileset = fileset;
189 : 1124 : accessor->context = CurrentMemoryContext;
190 : :
191 : 1124 : return accessor;
192 : : }
193 : :
194 : : static void
195 : 1947 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
196 : : {
197 : : size_t size;
198 : :
199 : 1947 : size = STS_CHUNK_PAGES * BLCKSZ;
2098 tmunro@postgresql.or 200 : 1947 : BufFileWrite(accessor->write_file, accessor->write_chunk, size);
3009 andres@anarazel.de 201 : 1947 : memset(accessor->write_chunk, 0, size);
202 : 1947 : accessor->write_pointer = &accessor->write_chunk->data[0];
203 : 1947 : accessor->sts->participants[accessor->participant].npages +=
204 : : STS_CHUNK_PAGES;
205 : 1947 : }
206 : :
207 : : /*
208 : : * Finish writing tuples. This must be called by all backends that have
209 : : * written data before any backend begins reading it.
210 : : */
211 : : void
212 : 3401 : sts_end_write(SharedTuplestoreAccessor *accessor)
213 : : {
214 [ + + ]: 3401 : if (accessor->write_file != NULL)
215 : : {
216 : 1251 : sts_flush_chunk(accessor);
217 : 1251 : BufFileClose(accessor->write_file);
218 : 1251 : pfree(accessor->write_chunk);
219 : 1251 : accessor->write_chunk = NULL;
220 : 1251 : accessor->write_file = NULL;
221 : 1251 : accessor->sts->participants[accessor->participant].writing = false;
222 : : }
223 : 3401 : }
224 : :
225 : : /*
226 : : * Prepare to rescan. Only one participant must call this. After it returns,
227 : : * all participants may call sts_begin_parallel_scan() and then loop over
228 : : * sts_parallel_scan_next(). This function must not be called concurrently
229 : : * with a scan, and synchronization to avoid that is the caller's
230 : : * responsibility.
231 : : */
232 : : void
3009 andres@anarazel.de 233 :UBC 0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
234 : : {
235 : : int i;
236 : :
237 : : /*
238 : : * Reset the shared read head for all participants' files. Also set the
239 : : * initial chunk size to the minimum (any increases from that size will be
240 : : * recorded in chunk_expansion_log).
241 : : */
242 [ # # ]: 0 : for (i = 0; i < accessor->sts->nparticipants; ++i)
243 : : {
244 : 0 : accessor->sts->participants[i].read_page = 0;
245 : : }
246 : 0 : }
247 : :
248 : : /*
249 : : * Begin scanning the contents in parallel.
250 : : */
251 : : void
3009 andres@anarazel.de 252 :CBC 828 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
253 : : {
254 : : int i PG_USED_FOR_ASSERTS_ONLY;
255 : :
256 : : /* End any existing scan that was in progress. */
257 : 828 : sts_end_parallel_scan(accessor);
258 : :
259 : : /*
260 : : * Any backend that might have written into this shared tuplestore must
261 : : * have called sts_end_write(), so that all buffers are flushed and the
262 : : * files have stopped growing.
263 : : */
264 [ + + ]: 3122 : for (i = 0; i < accessor->sts->nparticipants; ++i)
265 [ - + ]: 2294 : Assert(!accessor->sts->participants[i].writing);
266 : :
267 : : /*
268 : : * We will start out reading the file that THIS backend wrote. There may
269 : : * be some caching locality advantage to that.
270 : : */
271 : 828 : accessor->read_participant = accessor->participant;
272 : 828 : accessor->read_file = NULL;
273 : 828 : accessor->read_next_page = 0;
274 : 828 : }
275 : :
276 : : /*
277 : : * Finish a parallel scan, freeing associated backend-local resources.
278 : : */
279 : : void
280 : 4066 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
281 : : {
282 : : /*
283 : : * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
284 : : * we'd probably need a reference count of current parallel scanners so we
285 : : * could safely do it only when the reference count reaches zero.
286 : : */
287 [ - + ]: 4066 : if (accessor->read_file != NULL)
288 : : {
3009 andres@anarazel.de 289 :UBC 0 : BufFileClose(accessor->read_file);
290 : 0 : accessor->read_file = NULL;
291 : : }
3009 andres@anarazel.de 292 :CBC 4066 : }
293 : :
294 : : /*
295 : : * Write a tuple. If a meta-data size was provided to sts_initialize, then a
296 : : * pointer to meta data of that size must be provided.
297 : : */
298 : : void
299 : 1219768 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
300 : : MinimalTuple tuple)
301 : : {
302 : : size_t size;
303 : :
304 : : /* Do we have our own file yet? */
305 [ + + ]: 1219768 : if (accessor->write_file == NULL)
306 : : {
307 : : SharedTuplestoreParticipant *participant;
308 : : char name[MAXPGPATH];
309 : : MemoryContext oldcxt;
310 : :
311 : : /* Create one. Only this backend will write into it. */
312 : 1251 : sts_filename(name, accessor, accessor->participant);
313 : :
1031 tomas.vondra@postgre 314 : 1251 : oldcxt = MemoryContextSwitchTo(accessor->context);
1658 akapila@postgresql.o 315 : 1251 : accessor->write_file =
316 : 1251 : BufFileCreateFileSet(&accessor->fileset->fs, name);
1031 tomas.vondra@postgre 317 : 1251 : MemoryContextSwitchTo(oldcxt);
318 : :
319 : : /* Set up the shared state for this backend's file. */
3009 andres@anarazel.de 320 : 1251 : participant = &accessor->sts->participants[accessor->participant];
321 : 1251 : participant->writing = true; /* for assertions only */
322 : : }
323 : :
324 : : /* Do we have space? */
325 : 1219768 : size = accessor->sts->meta_data_size + tuple->t_len;
39 john.naylor@postgres 326 [ + + ]: 1219768 : if (accessor->write_pointer == NULL ||
327 [ + + ]: 1218517 : accessor->write_pointer + size > accessor->write_end)
328 : : {
3009 andres@anarazel.de 329 [ + + ]: 1839 : if (accessor->write_chunk == NULL)
330 : : {
331 : : /* First time through. Allocate chunk. */
332 : 1251 : accessor->write_chunk = (SharedTuplestoreChunk *)
333 : 1251 : MemoryContextAllocZero(accessor->context,
334 : : STS_CHUNK_PAGES * BLCKSZ);
335 : 1251 : accessor->write_chunk->ntuples = 0;
336 : 1251 : accessor->write_pointer = &accessor->write_chunk->data[0];
337 : 1251 : accessor->write_end = (char *)
338 : 1251 : accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
339 : : }
340 : : else
341 : : {
342 : : /* See if flushing helps. */
343 : 588 : sts_flush_chunk(accessor);
344 : : }
345 : :
346 : : /* It may still not be enough in the case of a gigantic tuple. */
1144 tmunro@postgresql.or 347 [ + + ]: 1839 : if (accessor->write_pointer + size > accessor->write_end)
348 : : {
349 : : size_t written;
350 : :
351 : : /*
352 : : * We'll write the beginning of the oversized tuple, and then
353 : : * write the rest in some number of 'overflow' chunks.
354 : : *
355 : : * sts_initialize() verifies that the size of the tuple +
356 : : * meta-data always fits into a chunk. Because the chunk has been
357 : : * flushed above, we can be sure to have all of a chunk's usable
358 : : * space available.
359 : : */
3009 andres@anarazel.de 360 [ - + ]: 12 : Assert(accessor->write_pointer + accessor->sts->meta_data_size +
361 : : sizeof(uint32) < accessor->write_end);
362 : :
363 : : /* Write the meta-data as one chunk. */
364 [ + - ]: 12 : if (accessor->sts->meta_data_size > 0)
365 : 12 : memcpy(accessor->write_pointer, meta_data,
366 : 12 : accessor->sts->meta_data_size);
367 : :
368 : : /*
369 : : * Write as much of the tuple as we can fit. This includes the
370 : : * tuple's size at the start.
371 : : */
372 : 12 : written = accessor->write_end - accessor->write_pointer -
373 : 12 : accessor->sts->meta_data_size;
374 : 12 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
375 : : tuple, written);
376 : 12 : ++accessor->write_chunk->ntuples;
377 : 12 : size -= accessor->sts->meta_data_size;
378 : 12 : size -= written;
379 : : /* Now write as many overflow chunks as we need for the rest. */
380 [ + + ]: 120 : while (size > 0)
381 : : {
382 : : size_t written_this_chunk;
383 : :
384 : 108 : sts_flush_chunk(accessor);
385 : :
386 : : /*
387 : : * How many overflow chunks to go? This will allow readers to
388 : : * skip all of them at once instead of reading each one.
389 : : */
390 : 108 : accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
391 : : STS_CHUNK_DATA_SIZE;
392 : 108 : written_this_chunk =
393 : 108 : Min(accessor->write_end - accessor->write_pointer, size);
394 : 108 : memcpy(accessor->write_pointer, (char *) tuple + written,
395 : : written_this_chunk);
396 : 108 : accessor->write_pointer += written_this_chunk;
397 : 108 : size -= written_this_chunk;
398 : 108 : written += written_this_chunk;
399 : : }
400 : 12 : return;
401 : : }
402 : : }
403 : :
404 : : /* Copy meta-data and tuple into buffer. */
405 [ + - ]: 1219756 : if (accessor->sts->meta_data_size > 0)
406 : 1219756 : memcpy(accessor->write_pointer, meta_data,
407 : 1219756 : accessor->sts->meta_data_size);
408 : 1219756 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
409 : 1219756 : tuple->t_len);
410 : 1219756 : accessor->write_pointer += size;
411 : 1219756 : ++accessor->write_chunk->ntuples;
412 : : }
413 : :
414 : : static MinimalTuple
415 : 1219768 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
416 : : {
417 : : MinimalTuple tuple;
418 : : uint32 size;
419 : : size_t remaining_size;
420 : : size_t this_chunk_size;
421 : : char *destination;
422 : :
423 : : /*
424 : : * We'll keep track of bytes read from this chunk so that we can detect an
425 : : * overflowing tuple and switch to reading overflow pages.
426 : : */
427 [ + - ]: 1219768 : if (accessor->sts->meta_data_size > 0)
428 : : {
1154 peter@eisentraut.org 429 : 1219768 : BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
3009 andres@anarazel.de 430 : 1219768 : accessor->read_bytes += accessor->sts->meta_data_size;
431 : : }
1154 peter@eisentraut.org 432 : 1219768 : BufFileReadExact(accessor->read_file, &size, sizeof(size));
3009 andres@anarazel.de 433 : 1219768 : accessor->read_bytes += sizeof(size);
434 [ + + ]: 1219768 : if (size > accessor->read_buffer_size)
435 : : {
436 : : size_t new_read_buffer_size;
437 : :
438 [ - + ]: 628 : if (accessor->read_buffer != NULL)
3009 andres@anarazel.de 439 :UBC 0 : pfree(accessor->read_buffer);
3009 andres@anarazel.de 440 :CBC 628 : new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
441 : 628 : accessor->read_buffer =
442 : 628 : MemoryContextAlloc(accessor->context, new_read_buffer_size);
443 : 628 : accessor->read_buffer_size = new_read_buffer_size;
444 : : }
445 : 1219768 : remaining_size = size - sizeof(uint32);
446 : 1219768 : this_chunk_size = Min(remaining_size,
447 : : BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
448 : 1219768 : destination = accessor->read_buffer + sizeof(uint32);
1154 peter@eisentraut.org 449 : 1219768 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
3009 andres@anarazel.de 450 : 1219768 : accessor->read_bytes += this_chunk_size;
451 : 1219768 : remaining_size -= this_chunk_size;
452 : 1219768 : destination += this_chunk_size;
453 : 1219768 : ++accessor->read_ntuples;
454 : :
455 : : /* Check if we need to read any overflow chunks. */
456 [ + + ]: 1219876 : while (remaining_size > 0)
457 : : {
458 : : /* We are now positioned at the start of an overflow chunk. */
459 : : SharedTuplestoreChunk chunk_header;
460 : :
1154 peter@eisentraut.org 461 : 108 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
3009 andres@anarazel.de 462 : 108 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
463 [ - + ]: 108 : if (chunk_header.overflow == 0)
3009 andres@anarazel.de 464 [ # # ]:UBC 0 : ereport(ERROR,
465 : : (errcode_for_file_access(),
466 : : errmsg("unexpected chunk in shared tuplestore temporary file"),
467 : : errdetail_internal("Expected overflow chunk.")));
3009 andres@anarazel.de 468 :CBC 108 : accessor->read_next_page += STS_CHUNK_PAGES;
469 : 108 : this_chunk_size = Min(remaining_size,
470 : : BLCKSZ * STS_CHUNK_PAGES -
471 : : STS_CHUNK_HEADER_SIZE);
1154 peter@eisentraut.org 472 : 108 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
3009 andres@anarazel.de 473 : 108 : accessor->read_bytes += this_chunk_size;
474 : 108 : remaining_size -= this_chunk_size;
475 : 108 : destination += this_chunk_size;
476 : :
477 : : /*
478 : : * These will be used to count regular tuples following the oversized
479 : : * tuple that spilled into this overflow chunk.
480 : : */
481 : 108 : accessor->read_ntuples = 0;
482 : 108 : accessor->read_ntuples_available = chunk_header.ntuples;
483 : : }
484 : :
485 : 1219768 : tuple = (MinimalTuple) accessor->read_buffer;
486 : 1219768 : tuple->t_len = size;
487 : :
488 : 1219768 : return tuple;
489 : : }
490 : :
491 : : /*
492 : : * Get the next tuple in the current parallel scan.
493 : : */
494 : : MinimalTuple
495 : 1220509 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
496 : : {
497 : : SharedTuplestoreParticipant *p;
498 : : BlockNumber read_page;
499 : : bool eof;
500 : :
501 : : for (;;)
502 : : {
503 : : /* Can we read more tuples from the current chunk? */
504 [ + + ]: 1223643 : if (accessor->read_ntuples < accessor->read_ntuples_available)
505 : 1219768 : return sts_read_tuple(accessor, meta_data);
506 : :
507 : : /* Find the location of a new chunk to read. */
508 : 3875 : p = &accessor->sts->participants[accessor->read_participant];
509 : :
510 : 3875 : LWLockAcquire(&p->lock, LW_EXCLUSIVE);
511 : : /* We can skip directly past overflow pages we know about. */
512 [ + + ]: 3875 : if (p->read_page < accessor->read_next_page)
513 : 12 : p->read_page = accessor->read_next_page;
514 : 3875 : eof = p->read_page >= p->npages;
515 [ + + ]: 3875 : if (!eof)
516 : : {
517 : : /* Claim the next chunk. */
518 : 1839 : read_page = p->read_page;
519 : : /* Advance the read head for the next reader. */
520 : 1839 : p->read_page += STS_CHUNK_PAGES;
521 : 1839 : accessor->read_next_page = p->read_page;
522 : : }
523 : 3875 : LWLockRelease(&p->lock);
524 : :
525 [ + + ]: 3875 : if (!eof)
526 : : {
527 : : SharedTuplestoreChunk chunk_header;
528 : :
529 : : /* Make sure we have the file open. */
530 [ + + ]: 1839 : if (accessor->read_file == NULL)
531 : : {
532 : : char name[MAXPGPATH];
533 : : MemoryContext oldcxt;
534 : :
535 : 1280 : sts_filename(name, accessor, accessor->read_participant);
536 : :
1031 tomas.vondra@postgre 537 : 1280 : oldcxt = MemoryContextSwitchTo(accessor->context);
3009 andres@anarazel.de 538 : 1280 : accessor->read_file =
1655 akapila@postgresql.o 539 : 1280 : BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
540 : : false);
1031 tomas.vondra@postgre 541 : 1280 : MemoryContextSwitchTo(oldcxt);
542 : : }
543 : :
544 : : /* Seek and load the chunk header. */
3009 andres@anarazel.de 545 [ - + ]: 1839 : if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
3009 andres@anarazel.de 546 [ # # ]:UBC 0 : ereport(ERROR,
547 : : (errcode_for_file_access(),
548 : : errmsg("could not seek to block %u in shared tuplestore temporary file",
549 : : read_page)));
1154 peter@eisentraut.org 550 :CBC 1839 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
551 : :
552 : : /*
553 : : * If this is an overflow chunk, we skip it and any following
554 : : * overflow chunks all at once.
555 : : */
3009 andres@anarazel.de 556 [ - + ]: 1839 : if (chunk_header.overflow > 0)
557 : : {
3009 andres@anarazel.de 558 :UBC 0 : accessor->read_next_page = read_page +
559 : 0 : chunk_header.overflow * STS_CHUNK_PAGES;
560 : 0 : continue;
561 : : }
562 : :
3009 andres@anarazel.de 563 :CBC 1839 : accessor->read_ntuples = 0;
564 : 1839 : accessor->read_ntuples_available = chunk_header.ntuples;
565 : 1839 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
566 : :
567 : : /* Go around again, so we can get a tuple from this chunk. */
568 : : }
569 : : else
570 : : {
571 [ + + ]: 2036 : if (accessor->read_file != NULL)
572 : : {
573 : 1280 : BufFileClose(accessor->read_file);
574 : 1280 : accessor->read_file = NULL;
575 : : }
576 : :
577 : : /*
578 : : * Try the next participant's file. If we've gone full circle,
579 : : * we're done.
580 : : */
581 : 2036 : accessor->read_participant = (accessor->read_participant + 1) %
582 : 2036 : accessor->sts->nparticipants;
583 [ + + ]: 2036 : if (accessor->read_participant == accessor->participant)
584 : 741 : break;
585 : 1295 : accessor->read_next_page = 0;
586 : :
587 : : /* Go around again, so we can get a chunk from this file. */
588 : : }
589 : : }
590 : :
591 : 741 : return NULL;
592 : : }
593 : :
594 : : /*
595 : : * Create the name used for the BufFile that a given participant will write.
596 : : */
597 : : static void
598 : 2531 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
599 : : {
600 : 2531 : snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
601 : 2531 : }
|