Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * gininsert.c
4 : : * insert routines for the postgres inverted index access method.
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/gin/gininsert.c
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/gin_private.h"
18 : : #include "access/gin_tuple.h"
19 : : #include "access/parallel.h"
20 : : #include "access/table.h"
21 : : #include "access/tableam.h"
22 : : #include "access/xloginsert.h"
23 : : #include "catalog/index.h"
24 : : #include "catalog/pg_collation.h"
25 : : #include "commands/progress.h"
26 : : #include "miscadmin.h"
27 : : #include "nodes/execnodes.h"
28 : : #include "pgstat.h"
29 : : #include "storage/bufmgr.h"
30 : : #include "storage/proc.h"
31 : : #include "storage/predicate.h"
32 : : #include "tcop/tcopprot.h"
33 : : #include "utils/datum.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/builtins.h"
36 : : #include "utils/rel.h"
37 : : #include "utils/typcache.h"
38 : : #include "utils/wait_event.h"
39 : :
40 : :
41 : : /* Magic numbers for parallel state sharing */
42 : : #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001)
43 : : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
44 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
45 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
46 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
47 : :
48 : : /*
49 : : * Status for index builds performed in parallel. This is allocated in a
50 : : * dynamic shared memory segment.
51 : : */
52 : : typedef struct GinBuildShared
53 : : {
54 : : /*
55 : : * These fields are not modified during the build. They primarily exist
56 : : * for the benefit of worker processes that need to create state
57 : : * corresponding to that used by the leader.
58 : : */
59 : : Oid heaprelid;
60 : : Oid indexrelid;
61 : : bool isconcurrent;
62 : : int scantuplesortstates;
63 : :
64 : : /*
65 : : * workersdonecv is used to monitor the progress of workers. All parallel
66 : : * participants must indicate that they are done before leader can use
67 : : * results built by the workers (and before leader can write the data into
68 : : * the index).
69 : : */
70 : : ConditionVariable workersdonecv;
71 : :
72 : : /*
73 : : * mutex protects all following fields
74 : : *
75 : : * These fields contain status information of interest to GIN index builds
76 : : * that must work just the same when an index is built in parallel.
77 : : */
78 : : slock_t mutex;
79 : :
80 : : /*
81 : : * Mutable state that is maintained by workers, and reported back to
82 : : * leader at end of the scans.
83 : : *
84 : : * nparticipantsdone is number of worker processes finished.
85 : : *
86 : : * reltuples is the total number of input heap tuples.
87 : : *
88 : : * indtuples is the total number of tuples that made it into the index.
89 : : */
90 : : int nparticipantsdone;
91 : : double reltuples;
92 : : double indtuples;
93 : :
94 : : /*
95 : : * ParallelTableScanDescData data follows. Can't directly embed here, as
96 : : * implementations of the parallel table scan desc interface might need
97 : : * stronger alignment.
98 : : */
99 : : } GinBuildShared;
100 : :
101 : : /*
102 : : * Return pointer to a GinBuildShared's parallel table scan.
103 : : *
104 : : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
105 : : * MAXALIGN.
106 : : */
107 : : #define ParallelTableScanFromGinBuildShared(shared) \
108 : : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GinBuildShared)))
109 : :
110 : : /*
111 : : * Status for leader in parallel index build.
112 : : */
113 : : typedef struct GinLeader
114 : : {
115 : : /* parallel context itself */
116 : : ParallelContext *pcxt;
117 : :
118 : : /*
119 : : * nparticipanttuplesorts is the exact number of worker processes
120 : : * successfully launched, plus one leader process if it participates as a
121 : : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
122 : : * participating as a worker).
123 : : */
124 : : int nparticipanttuplesorts;
125 : :
126 : : /*
127 : : * Leader process convenience pointers to shared state (leader avoids TOC
128 : : * lookups).
129 : : *
130 : : * GinBuildShared is the shared state for entire build. sharedsort is the
131 : : * shared, tuplesort-managed state passed to each process tuplesort.
132 : : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
133 : : * required.
134 : : */
135 : : GinBuildShared *ginshared;
136 : : Sharedsort *sharedsort;
137 : : Snapshot snapshot;
138 : : WalUsage *walusage;
139 : : BufferUsage *bufferusage;
140 : : } GinLeader;
141 : :
142 : : typedef struct
143 : : {
144 : : GinState ginstate;
145 : : double indtuples;
146 : : GinStatsData buildStats;
147 : : MemoryContext tmpCtx;
148 : : MemoryContext funcCtx;
149 : : BuildAccumulator accum;
150 : : ItemPointerData tid;
151 : : int work_mem;
152 : :
153 : : /*
154 : : * bs_leader is only present when a parallel index build is performed, and
155 : : * only in the leader process.
156 : : */
157 : : GinLeader *bs_leader;
158 : :
159 : : /* number of participating workers (including leader) */
160 : : int bs_num_workers;
161 : :
162 : : /* used to pass information from workers to leader */
163 : : double bs_numtuples;
164 : : double bs_reltuples;
165 : :
166 : : /*
167 : : * The sortstate is used by workers (including the leader). It has to be
168 : : * part of the build state, because that's the only thing passed to the
169 : : * build callback etc.
170 : : */
171 : : Tuplesortstate *bs_sortstate;
172 : :
173 : : /*
174 : : * The sortstate used only within a single worker for the first merge pass
175 : : * happening there. In principle it doesn't need to be part of the build
176 : : * state and we could pass it around directly, but it's more convenient
177 : : * this way. And it's part of the build state, after all.
178 : : */
179 : : Tuplesortstate *bs_worker_sort;
180 : : } GinBuildState;
181 : :
182 : :
183 : : /* parallel index builds */
184 : : static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
185 : : bool isconcurrent, int request);
186 : : static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state);
187 : : static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
188 : : static double _gin_parallel_heapscan(GinBuildState *state);
189 : : static double _gin_parallel_merge(GinBuildState *state);
190 : : static void _gin_leader_participate_as_worker(GinBuildState *buildstate,
191 : : Relation heap, Relation index);
192 : : static void _gin_parallel_scan_and_build(GinBuildState *state,
193 : : GinBuildShared *ginshared,
194 : : Sharedsort *sharedsort,
195 : : Relation heap, Relation index,
196 : : int sortmem, bool progress);
197 : :
198 : : static ItemPointer _gin_parse_tuple_items(GinTuple *a);
199 : : static Datum _gin_parse_tuple_key(GinTuple *a);
200 : :
201 : : static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category,
202 : : Datum key, int16 typlen, bool typbyval,
203 : : ItemPointerData *items, uint32 nitems,
204 : : Size *len);
205 : :
206 : : /*
207 : : * Adds array of item pointers to tuple's posting list, or
208 : : * creates posting tree and tuple pointing to tree in case
209 : : * of not enough space. Max size of tuple is defined in
210 : : * GinFormTuple(). Returns a new, modified index tuple.
211 : : * items[] must be in sorted order with no duplicates.
212 : : */
213 : : static IndexTuple
5546 tgl@sss.pgh.pa.us 214 :CBC 114454 : addItemPointersToLeafTuple(GinState *ginstate,
215 : : IndexTuple old,
216 : : ItemPointerData *items, uint32 nitem,
217 : : GinStatsData *buildStats, Buffer buffer)
218 : : {
219 : : OffsetNumber attnum;
220 : : Datum key;
221 : : GinNullCategory category;
222 : : IndexTuple res;
223 : : ItemPointerData *newItems,
224 : : *oldItems;
225 : : int oldNPosting,
226 : : newNPosting,
227 : : nwritten;
228 : : GinPostingList *compressedList;
229 : :
230 [ - + ]: 114454 : Assert(!GinIsPostingTree(old));
231 : :
232 : 114454 : attnum = gintuple_get_attrnum(ginstate, old);
233 : 114454 : key = gintuple_get_key(ginstate, old, &category);
234 : :
235 : : /* merge the old and new posting lists */
4435 heikki.linnakangas@i 236 : 114454 : oldItems = ginReadTuple(ginstate, attnum, old, &oldNPosting);
237 : :
4374 238 : 114454 : newItems = ginMergeItemPointers(items, nitem,
239 : : oldItems, oldNPosting,
240 : : &newNPosting);
241 : :
242 : : /* Compress the posting list, and try to a build tuple with room for it */
4435 243 : 114454 : res = NULL;
160 msawada@postgresql.o 244 :GNC 114454 : compressedList = ginCompressPostingList(newItems, newNPosting, GinMaxItemSize, &nwritten);
245 [ + + ]: 114454 : if (nwritten == newNPosting)
246 : : {
4435 heikki.linnakangas@i 247 :CBC 114448 : res = GinFormTuple(ginstate, attnum, key, category,
248 : : (char *) compressedList,
249 : 114448 : SizeOfGinPostingList(compressedList),
250 : : newNPosting,
251 : : false);
252 : : }
253 : :
160 msawada@postgresql.o 254 :GNC 114454 : pfree(newItems);
255 : 114454 : pfree(compressedList);
256 : :
4435 heikki.linnakangas@i 257 [ + + ]:CBC 114454 : if (!res)
258 : : {
259 : : /* posting list would be too big, convert to posting tree */
260 : : BlockNumber postingRoot;
261 : :
262 : : /*
263 : : * Initialize posting tree with the old tuple's posting list. It's
264 : : * surely small enough to fit on one posting-tree page, and should
265 : : * already be in order with no duplicates.
266 : : */
5546 tgl@sss.pgh.pa.us 267 : 11 : postingRoot = createPostingTree(ginstate->index,
268 : : oldItems,
269 : : oldNPosting,
270 : : buildStats,
271 : : buffer);
272 : :
273 : : /* Now insert the TIDs-to-be-added into the posting tree */
4498 heikki.linnakangas@i 274 : 11 : ginInsertItemPointers(ginstate->index, postingRoot,
275 : : items, nitem,
276 : : buildStats);
277 : :
278 : : /* And build a new posting-tree-only result tuple */
4435 279 : 11 : res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
5546 tgl@sss.pgh.pa.us 280 : 11 : GinSetPostingTree(res, postingRoot);
281 : : }
4435 heikki.linnakangas@i 282 : 114454 : pfree(oldItems);
283 : :
5546 tgl@sss.pgh.pa.us 284 : 114454 : return res;
285 : : }
286 : :
287 : : /*
288 : : * Build a fresh leaf tuple, either posting-list or posting-tree format
289 : : * depending on whether the given items list will fit.
290 : : * items[] must be in sorted order with no duplicates.
291 : : *
292 : : * This is basically the same logic as in addItemPointersToLeafTuple,
293 : : * but working from slightly different input.
294 : : */
295 : : static IndexTuple
296 : 346931 : buildFreshLeafTuple(GinState *ginstate,
297 : : OffsetNumber attnum, Datum key, GinNullCategory category,
298 : : ItemPointerData *items, uint32 nitem,
299 : : GinStatsData *buildStats, Buffer buffer)
300 : : {
4435 heikki.linnakangas@i 301 : 346931 : IndexTuple res = NULL;
302 : : GinPostingList *compressedList;
303 : : int nwritten;
304 : :
305 : : /* try to build a posting list tuple with all the items */
160 msawada@postgresql.o 306 :GNC 346931 : compressedList = ginCompressPostingList(items, nitem, GinMaxItemSize, &nwritten);
307 [ + + ]: 346931 : if (nwritten == nitem)
308 : : {
4435 heikki.linnakangas@i 309 :CBC 346880 : res = GinFormTuple(ginstate, attnum, key, category,
310 : : (char *) compressedList,
311 : 346880 : SizeOfGinPostingList(compressedList),
312 : : nitem, false);
313 : : }
160 msawada@postgresql.o 314 :GNC 346931 : pfree(compressedList);
315 : :
5546 tgl@sss.pgh.pa.us 316 [ + + ]:CBC 346931 : if (!res)
317 : : {
318 : : /* posting list would be too big, build posting tree */
319 : : BlockNumber postingRoot;
320 : :
321 : : /*
322 : : * Build posting-tree-only result tuple. We do this first so as to
323 : : * fail quickly if the key is too big.
324 : : */
4435 heikki.linnakangas@i 325 : 51 : res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
326 : :
327 : : /*
328 : : * Initialize a new posting tree with the TIDs.
329 : : */
4512 330 : 51 : postingRoot = createPostingTree(ginstate->index, items, nitem,
331 : : buildStats, buffer);
332 : :
333 : : /* And save the root link in the result tuple */
5546 tgl@sss.pgh.pa.us 334 : 51 : GinSetPostingTree(res, postingRoot);
335 : : }
336 : :
7257 teodor@sigaev.ru 337 : 346931 : return res;
338 : : }
339 : :
340 : : /*
341 : : * Insert one or more heap TIDs associated with the given key value.
342 : : * This will either add a single key entry, or enlarge a pre-existing entry.
343 : : *
344 : : * During an index build, buildStats is non-null and the counters
345 : : * it contains should be incremented as needed.
346 : : */
347 : : void
5546 tgl@sss.pgh.pa.us 348 : 486087 : ginEntryInsert(GinState *ginstate,
349 : : OffsetNumber attnum, Datum key, GinNullCategory category,
350 : : ItemPointerData *items, uint32 nitem,
351 : : GinStatsData *buildStats)
352 : : {
353 : : GinBtreeData btree;
354 : : GinBtreeEntryInsertData insertdata;
355 : : GinBtreeStack *stack;
356 : : IndexTuple itup;
357 : : Page page;
358 : :
3133 peter_e@gmx.net 359 : 486087 : insertdata.isDelete = false;
360 : :
5546 tgl@sss.pgh.pa.us 361 : 486087 : ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
2538 heikki.linnakangas@i 362 : 486087 : btree.isBuild = (buildStats != NULL);
363 : :
919 tmunro@postgresql.or 364 : 486087 : stack = ginFindLeafPage(&btree, false, false);
3616 kgrittn@postgresql.o 365 : 486087 : page = BufferGetPage(stack->buffer);
366 : :
7102 bruce@momjian.us 367 [ + + ]: 486087 : if (btree.findItem(&btree, stack))
368 : : {
369 : : /* found pre-existing entry */
7257 teodor@sigaev.ru 370 : 139154 : itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
371 : :
7102 bruce@momjian.us 372 [ + + ]: 139154 : if (GinIsPostingTree(itup))
373 : : {
374 : : /* add entries to existing posting tree */
375 : 24696 : BlockNumber rootPostingTree = GinGetPostingTree(itup);
376 : :
377 : : /* release all stack */
7257 teodor@sigaev.ru 378 : 24696 : LockBuffer(stack->buffer, GIN_UNLOCK);
7102 bruce@momjian.us 379 : 24696 : freeGinBtreeStack(stack);
380 : :
381 : : /* insert into posting tree */
4498 heikki.linnakangas@i 382 : 24696 : ginInsertItemPointers(ginstate->index, rootPostingTree,
383 : : items, nitem,
384 : : buildStats);
7257 teodor@sigaev.ru 385 : 24694 : return;
386 : : }
387 : :
2238 tmunro@postgresql.or 388 : 114458 : CheckForSerializableConflictIn(ginstate->index, NULL,
389 : : BufferGetBlockNumber(stack->buffer));
390 : : /* modify an existing leaf entry */
5546 tgl@sss.pgh.pa.us 391 : 114454 : itup = addItemPointersToLeafTuple(ginstate, itup,
392 : : items, nitem, buildStats, stack->buffer);
393 : :
3133 peter_e@gmx.net 394 : 114454 : insertdata.isDelete = true;
395 : : }
396 : : else
397 : : {
2238 tmunro@postgresql.or 398 : 346933 : CheckForSerializableConflictIn(ginstate->index, NULL,
399 : : BufferGetBlockNumber(stack->buffer));
400 : : /* no match, so construct a new leaf entry */
5546 tgl@sss.pgh.pa.us 401 : 346931 : itup = buildFreshLeafTuple(ginstate, attnum, key, category,
402 : : items, nitem, buildStats, stack->buffer);
403 : :
404 : : /*
405 : : * nEntries counts leaf tuples, so increment it only when we make a
406 : : * new one.
407 : : */
2323 408 [ + + ]: 346931 : if (buildStats)
409 : 76496 : buildStats->nEntries++;
410 : : }
411 : :
412 : : /* Insert the new or modified leaf tuple */
4491 heikki.linnakangas@i 413 : 461385 : insertdata.entry = itup;
414 : 461385 : ginInsertValue(&btree, stack, &insertdata, buildStats);
7102 bruce@momjian.us 415 : 461383 : pfree(itup);
416 : : }
417 : :
418 : : /*
419 : : * Extract index entries for a single indexable item, and add them to the
420 : : * BuildAccumulator's state.
421 : : *
422 : : * This function is used only during initial index creation.
423 : : */
424 : : static void
5546 tgl@sss.pgh.pa.us 425 : 470826 : ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
426 : : Datum value, bool isNull,
427 : : ItemPointer heapptr)
428 : : {
429 : : Datum *entries;
430 : : GinNullCategory *categories;
431 : : int32 nentries;
432 : : MemoryContext oldCtx;
433 : :
7187 teodor@sigaev.ru 434 : 470826 : oldCtx = MemoryContextSwitchTo(buildstate->funcCtx);
5546 tgl@sss.pgh.pa.us 435 : 470826 : entries = ginExtractEntries(buildstate->accum.ginstate, attnum,
436 : : value, isNull,
437 : : &nentries, &categories);
7187 teodor@sigaev.ru 438 : 470826 : MemoryContextSwitchTo(oldCtx);
439 : :
5546 tgl@sss.pgh.pa.us 440 : 470826 : ginInsertBAEntries(&buildstate->accum, heapptr, attnum,
441 : : entries, categories, nentries);
442 : :
443 : 470826 : buildstate->indtuples += nentries;
444 : :
7187 teodor@sigaev.ru 445 : 470826 : MemoryContextReset(buildstate->funcCtx);
7257 446 : 470826 : }
447 : :
448 : : static void
2319 andres@anarazel.de 449 : 459883 : ginBuildCallback(Relation index, ItemPointer tid, Datum *values,
450 : : bool *isnull, bool tupleIsAlive, void *state)
451 : : {
7102 bruce@momjian.us 452 : 459883 : GinBuildState *buildstate = (GinBuildState *) state;
453 : : MemoryContext oldCtx;
454 : : int i;
455 : :
7257 teodor@sigaev.ru 456 : 459883 : oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
457 : :
6121 bruce@momjian.us 458 [ + + ]: 920077 : for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++)
5546 tgl@sss.pgh.pa.us 459 : 460194 : ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1),
2319 andres@anarazel.de 460 : 460194 : values[i], isnull[i], tid);
461 : :
462 : : /* If we've maxed out our available memory, dump everything to the index */
408 tgl@sss.pgh.pa.us 463 [ - + ]: 459883 : if (buildstate->accum.allocatedMemory >= maintenance_work_mem * (Size) 1024)
464 : : {
465 : : ItemPointerData *list;
466 : : Datum key;
467 : : GinNullCategory category;
468 : : uint32 nlist;
469 : : OffsetNumber attnum;
470 : :
5705 tgl@sss.pgh.pa.us 471 :UBC 0 : ginBeginBAScan(&buildstate->accum);
5546 472 : 0 : while ((list = ginGetBAEntry(&buildstate->accum,
3189 473 [ # # ]: 0 : &attnum, &key, &category, &nlist)) != NULL)
474 : : {
475 : : /* there could be many entries, so be willing to abort here */
6512 476 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
5546 477 : 0 : ginEntryInsert(&buildstate->ginstate, attnum, key, category,
478 : : list, nlist, &buildstate->buildStats);
479 : : }
480 : :
7257 teodor@sigaev.ru 481 : 0 : MemoryContextReset(buildstate->tmpCtx);
482 : 0 : ginInitBA(&buildstate->accum);
483 : : }
484 : :
7257 teodor@sigaev.ru 485 :CBC 459883 : MemoryContextSwitchTo(oldCtx);
486 : 459883 : }
487 : :
488 : : /*
489 : : * ginFlushBuildState
490 : : * Write all data from BuildAccumulator into the tuplesort.
491 : : *
492 : : * The number of TIDs written to the tuplesort at once is limited, to reduce
493 : : * the amount of memory needed when merging the intermediate results later.
494 : : * The leader will see up to two chunks per worker, so calculate the limit to
495 : : * not need more than MaxAllocSize overall.
496 : : *
497 : : * We don't need to worry about overflowing maintenance_work_mem. We can't
498 : : * build chunks larger than work_mem, and that limit was set so that workers
499 : : * produce sufficiently small chunks.
500 : : */
501 : : static void
377 tomas.vondra@postgre 502 : 39 : ginFlushBuildState(GinBuildState *buildstate, Relation index)
503 : : {
504 : : ItemPointerData *list;
505 : : Datum key;
506 : : GinNullCategory category;
507 : : uint32 nlist;
508 : : OffsetNumber attnum;
509 : 39 : TupleDesc tdesc = RelationGetDescr(index);
510 : : uint32 maxlen;
511 : :
512 : : /* maximum number of TIDs per chunk (two chunks per worker) */
131 513 : 39 : maxlen = MaxAllocSize / sizeof(ItemPointerData);
514 : 39 : maxlen /= (2 * buildstate->bs_num_workers);
515 : :
377 516 : 39 : ginBeginBAScan(&buildstate->accum);
517 : 18222 : while ((list = ginGetBAEntry(&buildstate->accum,
518 [ + + ]: 18222 : &attnum, &key, &category, &nlist)) != NULL)
519 : : {
520 : : /* information about the key */
144 drowley@postgresql.o 521 :GNC 18183 : CompactAttribute *attr = TupleDescCompactAttr(tdesc, (attnum - 1));
522 : :
523 : : /* start of the chunk */
131 tomas.vondra@postgre 524 :CBC 18183 : uint32 offset = 0;
525 : :
526 : : /* split the entry into smaller chunk with up to maxlen items */
527 [ + + ]: 36366 : while (offset < nlist)
528 : : {
529 : : /* GIN tuple and tuple length */
530 : : GinTuple *tup;
531 : : Size tuplen;
532 : 18183 : uint32 len = Min(maxlen, nlist - offset);
533 : :
534 : : /* there could be many entries, so be willing to abort here */
535 [ + + ]: 18183 : CHECK_FOR_INTERRUPTS();
536 : :
537 : 18183 : tup = _gin_build_tuple(attnum, category,
538 : 18183 : key, attr->attlen, attr->attbyval,
539 : 18183 : &list[offset], len,
540 : : &tuplen);
541 : :
542 : 18183 : offset += len;
543 : :
544 : 18183 : tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
545 : :
546 : 18183 : pfree(tup);
547 : : }
548 : : }
549 : :
377 550 : 39 : MemoryContextReset(buildstate->tmpCtx);
551 : 39 : ginInitBA(&buildstate->accum);
552 : 39 : }
553 : :
554 : : /*
555 : : * ginBuildCallbackParallel
556 : : * Callback for the parallel index build.
557 : : *
558 : : * This is similar to the serial build callback ginBuildCallback, but
559 : : * instead of writing the accumulated entries into the index, each worker
560 : : * writes them into a (local) tuplesort.
561 : : *
562 : : * The worker then sorts and combines these entries, before writing them
563 : : * into a shared tuplesort for the leader (see _gin_parallel_scan_and_build
564 : : * for the whole process).
565 : : */
566 : : static void
567 : 10632 : ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
568 : : bool *isnull, bool tupleIsAlive, void *state)
569 : : {
570 : 10632 : GinBuildState *buildstate = (GinBuildState *) state;
571 : : MemoryContext oldCtx;
572 : : int i;
573 : :
574 : 10632 : oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
575 : :
576 : : /*
577 : : * if scan wrapped around - flush accumulated entries and start anew
578 : : *
579 : : * With parallel scans, we don't have a guarantee the scan does not start
580 : : * half-way through the relation (serial builds disable sync scans and
581 : : * always start from block 0, parallel scans require allow_sync=true).
582 : : *
583 : : * Building the posting lists assumes the TIDs are monotonic and never go
584 : : * back, and the wrap around would break that. We handle that by detecting
585 : : * the wraparound, and flushing all entries. This means we'll later see
586 : : * two separate entries with non-overlapping TID lists (which can be
587 : : * combined by merge sort).
588 : : *
589 : : * To detect a wraparound, we remember the last TID seen by each worker
590 : : * (for any key). If the next TID seen by the worker is lower, the scan
591 : : * must have wrapped around.
592 : : */
593 [ - + ]: 10632 : if (ItemPointerCompare(tid, &buildstate->tid) < 0)
377 tomas.vondra@postgre 594 :UBC 0 : ginFlushBuildState(buildstate, index);
595 : :
596 : : /* remember the TID we're about to process */
377 tomas.vondra@postgre 597 :CBC 10632 : buildstate->tid = *tid;
598 : :
599 [ + + ]: 21264 : for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++)
600 : 10632 : ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1),
601 : 10632 : values[i], isnull[i], tid);
602 : :
603 : : /*
604 : : * If we've maxed out our available memory, dump everything to the
605 : : * tuplesort. We use half the per-worker fraction of maintenance_work_mem,
606 : : * the other half is used for the tuplesort.
607 : : */
608 [ - + ]: 10632 : if (buildstate->accum.allocatedMemory >= buildstate->work_mem * (Size) 1024)
377 tomas.vondra@postgre 609 :UBC 0 : ginFlushBuildState(buildstate, index);
610 : :
377 tomas.vondra@postgre 611 :CBC 10632 : MemoryContextSwitchTo(oldCtx);
612 : 10632 : }
613 : :
614 : : IndexBuildResult *
3710 tgl@sss.pgh.pa.us 615 : 205 : ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
616 : : {
617 : : IndexBuildResult *result;
618 : : double reltuples;
619 : : GinBuildState buildstate;
377 tomas.vondra@postgre 620 : 205 : GinBuildState *state = &buildstate;
621 : : Buffer RootBuffer,
622 : : MetaBuffer;
623 : : ItemPointerData *list;
624 : : Datum key;
625 : : GinNullCategory category;
626 : : uint32 nlist;
627 : : MemoryContext oldCtx;
628 : : OffsetNumber attnum;
629 : :
7257 teodor@sigaev.ru 630 [ - + ]: 205 : if (RelationGetNumberOfBlocks(index) != 0)
7257 teodor@sigaev.ru 631 [ # # ]:UBC 0 : elog(ERROR, "index \"%s\" already contains data",
632 : : RelationGetRelationName(index));
633 : :
7257 teodor@sigaev.ru 634 :CBC 205 : initGinState(&buildstate.ginstate, index);
5628 tgl@sss.pgh.pa.us 635 : 205 : buildstate.indtuples = 0;
636 : 205 : memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
637 : :
638 : : /* Initialize fields for parallel build too. */
377 tomas.vondra@postgre 639 : 205 : buildstate.bs_numtuples = 0;
640 : 205 : buildstate.bs_reltuples = 0;
641 : 205 : buildstate.bs_leader = NULL;
642 : 205 : memset(&buildstate.tid, 0, sizeof(ItemPointerData));
643 : :
644 : : /* initialize the meta page */
6200 tgl@sss.pgh.pa.us 645 : 205 : MetaBuffer = GinNewBuffer(index);
646 : :
647 : : /* initialize the root page */
648 : 205 : RootBuffer = GinNewBuffer(index);
649 : :
7257 teodor@sigaev.ru 650 : 205 : START_CRIT_SECTION();
6200 tgl@sss.pgh.pa.us 651 : 205 : GinInitMetabuffer(MetaBuffer);
652 : 205 : MarkBufferDirty(MetaBuffer);
653 : 205 : GinInitBuffer(RootBuffer, GIN_LEAF);
654 : 205 : MarkBufferDirty(RootBuffer);
655 : :
656 : :
657 : 205 : UnlockReleaseBuffer(MetaBuffer);
658 : 205 : UnlockReleaseBuffer(RootBuffer);
7257 teodor@sigaev.ru 659 [ - + ]: 205 : END_CRIT_SECTION();
660 : :
661 : : /* count the root as first entry page */
5628 tgl@sss.pgh.pa.us 662 : 205 : buildstate.buildStats.nEntryPages++;
663 : :
664 : : /*
665 : : * create a temporary memory context that is used to hold data not yet
666 : : * dumped out to the index
667 : : */
7257 teodor@sigaev.ru 668 : 205 : buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
669 : : "Gin build temporary context",
670 : : ALLOCSET_DEFAULT_SIZES);
671 : :
672 : : /*
673 : : * create a temporary memory context that is used for calling
674 : : * ginExtractEntries(), and can be reset after each tuple
675 : : */
4004 tgl@sss.pgh.pa.us 676 : 205 : buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
677 : : "Gin build temporary context for user-defined function",
678 : : ALLOCSET_DEFAULT_SIZES);
679 : :
7257 teodor@sigaev.ru 680 : 205 : buildstate.accum.ginstate = &buildstate.ginstate;
7102 bruce@momjian.us 681 : 205 : ginInitBA(&buildstate.accum);
682 : :
683 : : /* Report table scan phase started */
377 tomas.vondra@postgre 684 : 205 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
685 : : PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN);
686 : :
687 : : /*
688 : : * Attempt to launch parallel worker scan when required
689 : : *
690 : : * XXX plan_create_index_workers makes the number of workers dependent on
691 : : * maintenance_work_mem, requiring 32MB for each worker. For GIN that's
692 : : * reasonable too, because we sort the data just like btree. It does
693 : : * ignore the memory used to accumulate data in memory (set by work_mem),
694 : : * but there is no way to communicate that to plan_create_index_workers.
695 : : */
696 [ + + ]: 205 : if (indexInfo->ii_ParallelWorkers > 0)
697 : 13 : _gin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
698 : : indexInfo->ii_ParallelWorkers);
699 : :
700 : : /*
701 : : * If parallel build requested and at least one worker process was
702 : : * successfully launched, set up coordination state, wait for workers to
703 : : * complete. Then read all tuples from the shared tuplesort and insert
704 : : * them into the index.
705 : : *
706 : : * In serial mode, simply scan the table and build the index one index
707 : : * tuple at a time.
708 : : */
709 [ + + ]: 205 : if (state->bs_leader)
710 : : {
711 : : SortCoordinate coordinate;
712 : :
95 michael@paquier.xyz 713 :GNC 13 : coordinate = palloc0_object(SortCoordinateData);
377 tomas.vondra@postgre 714 :CBC 13 : coordinate->isWorker = false;
715 : 13 : coordinate->nParticipants =
716 : 13 : state->bs_leader->nparticipanttuplesorts;
717 : 13 : coordinate->sharedsort = state->bs_leader->sharedsort;
718 : :
719 : : /*
720 : : * Begin leader tuplesort.
721 : : *
722 : : * In cases where parallelism is involved, the leader receives the
723 : : * same share of maintenance_work_mem as a serial sort (it is
724 : : * generally treated in the same way as a serial sort once we return).
725 : : * Parallel worker Tuplesortstates will have received only a fraction
726 : : * of maintenance_work_mem, though.
727 : : *
728 : : * We rely on the lifetime of the Leader Tuplesortstate almost not
729 : : * overlapping with any worker Tuplesortstate's lifetime. There may
730 : : * be some small overlap, but that's okay because we rely on leader
731 : : * Tuplesortstate only allocating a small, fixed amount of memory
732 : : * here. When its tuplesort_performsort() is called (by our caller),
733 : : * and significant amounts of memory are likely to be used, all
734 : : * workers must have already freed almost all memory held by their
735 : : * Tuplesortstates (they are about to go away completely, too). The
736 : : * overall effect is that maintenance_work_mem always represents an
737 : : * absolute high watermark on the amount of memory used by a CREATE
738 : : * INDEX operation, regardless of the use of parallelism or any other
739 : : * factor.
740 : : */
741 : 13 : state->bs_sortstate =
742 : 13 : tuplesort_begin_index_gin(heap, index,
743 : : maintenance_work_mem, coordinate,
744 : : TUPLESORT_NONE);
745 : :
746 : : /* scan the relation in parallel and merge per-worker results */
747 : 13 : reltuples = _gin_parallel_merge(state);
748 : :
749 : 13 : _gin_end_parallel(state->bs_leader, state);
750 : : }
751 : : else /* no parallel index build */
752 : : {
753 : : /*
754 : : * Do the heap scan. We disallow sync scan here because
755 : : * dataPlaceToPage prefers to receive tuples in TID order.
756 : : */
757 : 192 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
758 : : ginBuildCallback, &buildstate, NULL);
759 : :
760 : : /* dump remaining entries to the index */
761 : 192 : oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
762 : 192 : ginBeginBAScan(&buildstate.accum);
763 : 65363 : while ((list = ginGetBAEntry(&buildstate.accum,
764 [ + + ]: 65363 : &attnum, &key, &category, &nlist)) != NULL)
765 : : {
766 : : /* there could be many entries, so be willing to abort here */
767 [ - + ]: 65171 : CHECK_FOR_INTERRUPTS();
768 : 65171 : ginEntryInsert(&buildstate.ginstate, attnum, key, category,
769 : : list, nlist, &buildstate.buildStats);
770 : : }
771 : 192 : MemoryContextSwitchTo(oldCtx);
772 : : }
773 : :
4004 tgl@sss.pgh.pa.us 774 : 205 : MemoryContextDelete(buildstate.funcCtx);
7257 teodor@sigaev.ru 775 : 205 : MemoryContextDelete(buildstate.tmpCtx);
776 : :
777 : : /*
778 : : * Update metapage stats
779 : : */
5628 tgl@sss.pgh.pa.us 780 : 205 : buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
2538 heikki.linnakangas@i 781 : 205 : ginUpdateStats(index, &buildstate.buildStats, true);
782 : :
783 : : /*
784 : : * We didn't write WAL records as we built the index, so if WAL-logging is
785 : : * required, write all pages to the WAL now.
786 : : */
787 [ + + + + : 205 : if (RelationNeedsWAL(index))
+ + + + ]
788 : : {
789 : 146 : log_newpage_range(index, MAIN_FORKNUM,
790 : : 0, RelationGetNumberOfBlocks(index),
791 : : true);
792 : : }
793 : :
794 : : /*
795 : : * Return statistics
796 : : */
95 michael@paquier.xyz 797 :GNC 205 : result = palloc_object(IndexBuildResult);
798 : :
7249 tgl@sss.pgh.pa.us 799 :CBC 205 : result->heap_tuples = reltuples;
800 : 205 : result->index_tuples = buildstate.indtuples;
801 : :
3710 802 : 205 : return result;
803 : : }
804 : :
805 : : /*
806 : : * ginbuildempty() -- build an empty gin index in the initialization fork
807 : : */
808 : : void
809 : 3 : ginbuildempty(Relation index)
810 : : {
811 : : Buffer RootBuffer,
812 : : MetaBuffer;
813 : :
814 : : /* An empty GIN index has two pages. */
935 tmunro@postgresql.or 815 : 3 : MetaBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
816 : : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
817 : 3 : RootBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
818 : : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
819 : :
820 : : /* Initialize and xlog metabuffer and root buffer. */
5555 rhaas@postgresql.org 821 : 3 : START_CRIT_SECTION();
822 : 3 : GinInitMetabuffer(MetaBuffer);
823 : 3 : MarkBufferDirty(MetaBuffer);
3055 tgl@sss.pgh.pa.us 824 : 3 : log_newpage_buffer(MetaBuffer, true);
5555 rhaas@postgresql.org 825 : 3 : GinInitBuffer(RootBuffer, GIN_LEAF);
826 : 3 : MarkBufferDirty(RootBuffer);
4484 heikki.linnakangas@i 827 : 3 : log_newpage_buffer(RootBuffer, false);
5555 rhaas@postgresql.org 828 [ - + ]: 3 : END_CRIT_SECTION();
829 : :
830 : : /* Unlock and release the buffers. */
831 : 3 : UnlockReleaseBuffer(MetaBuffer);
832 : 3 : UnlockReleaseBuffer(RootBuffer);
833 : 3 : }
834 : :
835 : : /*
836 : : * Insert index entries for a single indexable item during "normal"
837 : : * (non-fast-update) insertion
838 : : */
839 : : static void
5546 tgl@sss.pgh.pa.us 840 : 27048 : ginHeapTupleInsert(GinState *ginstate, OffsetNumber attnum,
841 : : Datum value, bool isNull,
842 : : ItemPointer item)
843 : : {
844 : : Datum *entries;
845 : : GinNullCategory *categories;
846 : : int32 i,
847 : : nentries;
848 : :
849 : 27048 : entries = ginExtractEntries(ginstate, attnum, value, isNull,
850 : : &nentries, &categories);
851 : :
7102 bruce@momjian.us 852 [ + + ]: 253515 : for (i = 0; i < nentries; i++)
853 : : {
854 : : /* there could be many entries, so be willing to abort here */
2 heikki.linnakangas@i 855 [ - + ]:GNC 226477 : CHECK_FOR_INTERRUPTS();
5546 tgl@sss.pgh.pa.us 856 :CBC 226477 : ginEntryInsert(ginstate, attnum, entries[i], categories[i],
857 : : item, 1, NULL);
858 : : }
7257 teodor@sigaev.ru 859 : 27038 : }
860 : :
861 : : bool
3710 tgl@sss.pgh.pa.us 862 : 159945 : gininsert(Relation index, Datum *values, bool *isnull,
863 : : ItemPointer ht_ctid, Relation heapRel,
864 : : IndexUniqueCheck checkUnique,
865 : : bool indexUnchanged,
866 : : IndexInfo *indexInfo)
867 : : {
3321 868 : 159945 : GinState *ginstate = (GinState *) indexInfo->ii_AmCache;
869 : : MemoryContext oldCtx;
870 : : MemoryContext insertCtx;
871 : : int i;
872 : :
873 : : /* Initialize GinState cache if first call in this statement */
874 [ + + ]: 159945 : if (ginstate == NULL)
875 : : {
876 : 1414 : oldCtx = MemoryContextSwitchTo(indexInfo->ii_Context);
95 michael@paquier.xyz 877 :GNC 1414 : ginstate = palloc_object(GinState);
3321 tgl@sss.pgh.pa.us 878 :CBC 1414 : initGinState(ginstate, index);
472 peter@eisentraut.org 879 : 1414 : indexInfo->ii_AmCache = ginstate;
3321 tgl@sss.pgh.pa.us 880 : 1414 : MemoryContextSwitchTo(oldCtx);
881 : : }
882 : :
7257 teodor@sigaev.ru 883 : 159945 : insertCtx = AllocSetContextCreate(CurrentMemoryContext,
884 : : "Gin insert temporary context",
885 : : ALLOCSET_DEFAULT_SIZES);
886 : :
887 : 159945 : oldCtx = MemoryContextSwitchTo(insertCtx);
888 : :
6121 bruce@momjian.us 889 [ + - - + : 159945 : if (GinGetUseFastUpdate(index))
+ + + + ]
6200 tgl@sss.pgh.pa.us 890 : 132894 : {
891 : : GinTupleCollector collector;
892 : :
893 : 132897 : memset(&collector, 0, sizeof(GinTupleCollector));
894 : :
3321 895 [ + + ]: 325833 : for (i = 0; i < ginstate->origTupdesc->natts; i++)
896 : 192936 : ginHeapTupleFastCollect(ginstate, &collector,
5546 897 : 192936 : (OffsetNumber) (i + 1),
898 : 192936 : values[i], isnull[i],
899 : : ht_ctid);
900 : :
3321 901 : 132897 : ginHeapTupleFastInsert(ginstate, &collector);
902 : : }
903 : : else
904 : : {
905 [ + + ]: 54086 : for (i = 0; i < ginstate->origTupdesc->natts; i++)
906 : 27048 : ginHeapTupleInsert(ginstate, (OffsetNumber) (i + 1),
5546 907 : 27048 : values[i], isnull[i],
908 : : ht_ctid);
909 : : }
910 : :
7257 teodor@sigaev.ru 911 : 159932 : MemoryContextSwitchTo(oldCtx);
912 : 159932 : MemoryContextDelete(insertCtx);
913 : :
3710 tgl@sss.pgh.pa.us 914 : 159932 : return false;
915 : : }
916 : :
917 : : /*
918 : : * Create parallel context, and launch workers for leader.
919 : : *
920 : : * buildstate argument should be initialized (with the exception of the
921 : : * tuplesort states, which may later be created based on shared
922 : : * state initially set up here).
923 : : *
924 : : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
925 : : *
926 : : * request is the target number of parallel worker processes to launch.
927 : : *
928 : : * Sets buildstate's GinLeader, which caller must use to shut down parallel
929 : : * mode by passing it to _gin_end_parallel() at the very end of its index
930 : : * build. If not even a single worker process can be launched, this is
931 : : * never set, and caller should proceed with a serial index build.
932 : : */
933 : : static void
377 tomas.vondra@postgre 934 : 13 : _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
935 : : bool isconcurrent, int request)
936 : : {
937 : : ParallelContext *pcxt;
938 : : int scantuplesortstates;
939 : : Snapshot snapshot;
940 : : Size estginshared;
941 : : Size estsort;
942 : : GinBuildShared *ginshared;
943 : : Sharedsort *sharedsort;
95 michael@paquier.xyz 944 :GNC 13 : GinLeader *ginleader = palloc0_object(GinLeader);
945 : : WalUsage *walusage;
946 : : BufferUsage *bufferusage;
377 tomas.vondra@postgre 947 :CBC 13 : bool leaderparticipates = true;
948 : : int querylen;
949 : :
950 : : #ifdef DISABLE_LEADER_PARTICIPATION
951 : : leaderparticipates = false;
952 : : #endif
953 : :
954 : : /*
955 : : * Enter parallel mode, and create context for parallel build of gin index
956 : : */
957 : 13 : EnterParallelMode();
958 [ - + ]: 13 : Assert(request > 0);
959 : 13 : pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main",
960 : : request);
961 : :
962 [ + - ]: 13 : scantuplesortstates = leaderparticipates ? request + 1 : request;
963 : :
964 : : /*
965 : : * Prepare for scan of the base relation. In a normal index build, we use
966 : : * SnapshotAny because we must retrieve all tuples and do our own time
967 : : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
968 : : * concurrent build, we take a regular MVCC snapshot and index whatever's
969 : : * live according to that.
970 : : */
971 [ + + ]: 13 : if (!isconcurrent)
972 : 10 : snapshot = SnapshotAny;
973 : : else
974 : 3 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
975 : :
976 : : /*
977 : : * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
978 : : */
979 : 13 : estginshared = _gin_parallel_estimate_shared(heap, snapshot);
980 : 13 : shm_toc_estimate_chunk(&pcxt->estimator, estginshared);
981 : 13 : estsort = tuplesort_estimate_shared(scantuplesortstates);
982 : 13 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
983 : :
984 : 13 : shm_toc_estimate_keys(&pcxt->estimator, 2);
985 : :
986 : : /*
987 : : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
988 : : * and PARALLEL_KEY_BUFFER_USAGE.
989 : : *
990 : : * If there are no extensions loaded that care, we could skip this. We
991 : : * have no way of knowing whether anyone's looking at pgWalUsage or
992 : : * pgBufferUsage, so do it unconditionally.
993 : : */
994 : 13 : shm_toc_estimate_chunk(&pcxt->estimator,
995 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
996 : 13 : shm_toc_estimate_keys(&pcxt->estimator, 1);
997 : 13 : shm_toc_estimate_chunk(&pcxt->estimator,
998 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
999 : 13 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1000 : :
1001 : : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
1002 [ + - ]: 13 : if (debug_query_string)
1003 : : {
1004 : 13 : querylen = strlen(debug_query_string);
1005 : 13 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
1006 : 13 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1007 : : }
1008 : : else
377 tomas.vondra@postgre 1009 :UBC 0 : querylen = 0; /* keep compiler quiet */
1010 : :
1011 : : /* Everyone's had a chance to ask for space, so now create the DSM */
377 tomas.vondra@postgre 1012 :CBC 13 : InitializeParallelDSM(pcxt);
1013 : :
1014 : : /* If no DSM segment was available, back out (do serial build) */
1015 [ - + ]: 13 : if (pcxt->seg == NULL)
1016 : : {
377 tomas.vondra@postgre 1017 [ # # # # ]:UBC 0 : if (IsMVCCSnapshot(snapshot))
1018 : 0 : UnregisterSnapshot(snapshot);
1019 : 0 : DestroyParallelContext(pcxt);
1020 : 0 : ExitParallelMode();
1021 : 0 : return;
1022 : : }
1023 : :
1024 : : /* Store shared build state, for which we reserved space */
377 tomas.vondra@postgre 1025 :CBC 13 : ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared);
1026 : : /* Initialize immutable state */
1027 : 13 : ginshared->heaprelid = RelationGetRelid(heap);
1028 : 13 : ginshared->indexrelid = RelationGetRelid(index);
1029 : 13 : ginshared->isconcurrent = isconcurrent;
1030 : 13 : ginshared->scantuplesortstates = scantuplesortstates;
1031 : :
1032 : 13 : ConditionVariableInit(&ginshared->workersdonecv);
1033 : 13 : SpinLockInit(&ginshared->mutex);
1034 : :
1035 : : /* Initialize mutable state */
1036 : 13 : ginshared->nparticipantsdone = 0;
1037 : 13 : ginshared->reltuples = 0.0;
1038 : 13 : ginshared->indtuples = 0.0;
1039 : :
1040 : 13 : table_parallelscan_initialize(heap,
1041 : : ParallelTableScanFromGinBuildShared(ginshared),
1042 : : snapshot);
1043 : :
1044 : : /*
1045 : : * Store shared tuplesort-private state, for which we reserved space.
1046 : : * Then, initialize opaque state using tuplesort routine.
1047 : : */
1048 : 13 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
1049 : 13 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
1050 : : pcxt->seg);
1051 : :
1052 : 13 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIN_SHARED, ginshared);
1053 : 13 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
1054 : :
1055 : : /* Store query string for workers */
1056 [ + - ]: 13 : if (debug_query_string)
1057 : : {
1058 : : char *sharedquery;
1059 : :
1060 : 13 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
1061 : 13 : memcpy(sharedquery, debug_query_string, querylen + 1);
1062 : 13 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
1063 : : }
1064 : :
1065 : : /*
1066 : : * Allocate space for each worker's WalUsage and BufferUsage; no need to
1067 : : * initialize.
1068 : : */
1069 : 13 : walusage = shm_toc_allocate(pcxt->toc,
1070 : 13 : mul_size(sizeof(WalUsage), pcxt->nworkers));
1071 : 13 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
1072 : 13 : bufferusage = shm_toc_allocate(pcxt->toc,
1073 : 13 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
1074 : 13 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
1075 : :
1076 : : /* Launch workers, saving status for leader/caller */
1077 : 13 : LaunchParallelWorkers(pcxt);
1078 : 13 : ginleader->pcxt = pcxt;
1079 : 13 : ginleader->nparticipanttuplesorts = pcxt->nworkers_launched;
1080 [ + - ]: 13 : if (leaderparticipates)
1081 : 13 : ginleader->nparticipanttuplesorts++;
1082 : 13 : ginleader->ginshared = ginshared;
1083 : 13 : ginleader->sharedsort = sharedsort;
1084 : 13 : ginleader->snapshot = snapshot;
1085 : 13 : ginleader->walusage = walusage;
1086 : 13 : ginleader->bufferusage = bufferusage;
1087 : :
1088 : : /* If no workers were successfully launched, back out (do serial build) */
1089 [ - + ]: 13 : if (pcxt->nworkers_launched == 0)
1090 : : {
377 tomas.vondra@postgre 1091 :UBC 0 : _gin_end_parallel(ginleader, NULL);
1092 : 0 : return;
1093 : : }
1094 : :
1095 : : /* Save leader state now that it's clear build will be parallel */
377 tomas.vondra@postgre 1096 :CBC 13 : buildstate->bs_leader = ginleader;
1097 : :
1098 : : /* Join heap scan ourselves */
1099 [ + - ]: 13 : if (leaderparticipates)
1100 : 13 : _gin_leader_participate_as_worker(buildstate, heap, index);
1101 : :
1102 : : /*
1103 : : * Caller needs to wait for all launched workers when we return. Make
1104 : : * sure that the failure-to-start case will not hang forever.
1105 : : */
1106 : 13 : WaitForParallelWorkersToAttach(pcxt);
1107 : : }
1108 : :
1109 : : /*
1110 : : * Shut down workers, destroy parallel context, and end parallel mode.
1111 : : */
1112 : : static void
1113 : 13 : _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
1114 : : {
1115 : : int i;
1116 : :
1117 : : /* Shutdown worker processes */
1118 : 13 : WaitForParallelWorkersToFinish(ginleader->pcxt);
1119 : :
1120 : : /*
1121 : : * Next, accumulate WAL usage. (This must wait for the workers to finish,
1122 : : * or we might get incomplete data.)
1123 : : */
1124 [ + + ]: 39 : for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
1125 : 26 : InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]);
1126 : :
1127 : : /* Free last reference to MVCC snapshot, if one was used */
1128 [ + + - + ]: 13 : if (IsMVCCSnapshot(ginleader->snapshot))
1129 : 3 : UnregisterSnapshot(ginleader->snapshot);
1130 : 13 : DestroyParallelContext(ginleader->pcxt);
1131 : 13 : ExitParallelMode();
1132 : 13 : }
1133 : :
1134 : : /*
1135 : : * Within leader, wait for end of heap scan.
1136 : : *
1137 : : * When called, parallel heap scan started by _gin_begin_parallel() will
1138 : : * already be underway within worker processes (when leader participates
1139 : : * as a worker, we should end up here just as workers are finishing).
1140 : : *
1141 : : * Returns the total number of heap tuples scanned.
1142 : : */
1143 : : static double
1144 : 13 : _gin_parallel_heapscan(GinBuildState *state)
1145 : : {
1146 : 13 : GinBuildShared *ginshared = state->bs_leader->ginshared;
1147 : : int nparticipanttuplesorts;
1148 : :
1149 : 13 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
1150 : : for (;;)
1151 : : {
1152 [ - + ]: 26 : SpinLockAcquire(&ginshared->mutex);
1153 [ + + ]: 26 : if (ginshared->nparticipantsdone == nparticipanttuplesorts)
1154 : : {
1155 : : /* copy the data into leader state */
1156 : 13 : state->bs_reltuples = ginshared->reltuples;
1157 : 13 : state->bs_numtuples = ginshared->indtuples;
1158 : :
1159 : 13 : SpinLockRelease(&ginshared->mutex);
1160 : 13 : break;
1161 : : }
1162 : 13 : SpinLockRelease(&ginshared->mutex);
1163 : :
1164 : 13 : ConditionVariableSleep(&ginshared->workersdonecv,
1165 : : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
1166 : : }
1167 : :
1168 : 13 : ConditionVariableCancelSleep();
1169 : :
1170 : 13 : return state->bs_reltuples;
1171 : : }
1172 : :
1173 : : /*
1174 : : * Buffer used to accumulate TIDs from multiple GinTuples for the same key
1175 : : * (we read these from the tuplesort, sorted by the key).
1176 : : *
1177 : : * This is similar to BuildAccumulator in that it's used to collect TIDs
1178 : : * in memory before inserting them into the index, but it's much simpler
1179 : : * as it only deals with a single index key at a time.
1180 : : *
1181 : : * When adding TIDs to the buffer, we make sure to keep them sorted, both
1182 : : * during the initial table scan (and detecting when the scan wraps around),
1183 : : * and during merging (where we do mergesort).
1184 : : */
1185 : : typedef struct GinBuffer
1186 : : {
1187 : : OffsetNumber attnum;
1188 : : GinNullCategory category;
1189 : : Datum key; /* 0 if no key (and keylen == 0) */
1190 : : Size keylen; /* number of bytes (not typlen) */
1191 : :
1192 : : /* type info */
1193 : : int16 typlen;
1194 : : bool typbyval;
1195 : :
1196 : : /* Number of TIDs to collect before attempt to write some out. */
1197 : : int maxitems;
1198 : :
1199 : : /* array of TID values */
1200 : : int nitems;
1201 : : int nfrozen;
1202 : : SortSupport ssup; /* for sorting/comparing keys */
1203 : : ItemPointerData *items;
1204 : : } GinBuffer;
1205 : :
1206 : : /*
1207 : : * Check that TID array contains valid values, and that it's sorted (if we
1208 : : * expect it to be).
1209 : : */
1210 : : static void
1211 : 109062 : AssertCheckItemPointers(GinBuffer *buffer)
1212 : : {
1213 : : #ifdef USE_ASSERT_CHECKING
1214 : : /* we should not have a buffer with no TIDs to sort */
1215 [ - + ]: 109062 : Assert(buffer->items != NULL);
1216 [ - + ]: 109062 : Assert(buffer->nitems > 0);
1217 : :
1218 [ + + ]: 1333260 : for (int i = 0; i < buffer->nitems; i++)
1219 : : {
1220 [ - + ]: 1224198 : Assert(ItemPointerIsValid(&buffer->items[i]));
1221 : :
1222 : : /* don't check ordering for the first TID item */
1223 [ + + ]: 1224198 : if (i == 0)
1224 : 109062 : continue;
1225 : :
1226 [ - + ]: 1115136 : Assert(ItemPointerCompare(&buffer->items[i - 1], &buffer->items[i]) < 0);
1227 : : }
1228 : : #endif
1229 : 109062 : }
1230 : :
1231 : : /*
1232 : : * GinBuffer checks
1233 : : *
1234 : : * Make sure the nitems/items fields are consistent (either the array is empty
1235 : : * or not empty, the fields need to agree). If there are items, check ordering.
1236 : : */
1237 : : static void
1238 : 72696 : AssertCheckGinBuffer(GinBuffer *buffer)
1239 : : {
1240 : : #ifdef USE_ASSERT_CHECKING
1241 : : /* if we have any items, the array must exist */
1242 [ + + - + ]: 72696 : Assert(!((buffer->nitems > 0) && (buffer->items == NULL)));
1243 : :
1244 : : /*
1245 : : * The buffer may be empty, in which case we must not call the check of
1246 : : * item pointers, because that assumes non-emptiness.
1247 : : */
1248 [ + + ]: 72696 : if (buffer->nitems == 0)
1249 : 29508 : return;
1250 : :
1251 : : /* Make sure the item pointers are valid and sorted. */
1252 : 43188 : AssertCheckItemPointers(buffer);
1253 : : #endif
1254 : : }
1255 : :
1256 : : /*
1257 : : * GinBufferInit
1258 : : * Initialize buffer to store tuples for a GIN index.
1259 : : *
1260 : : * Initialize the buffer used to accumulate TID for a single key at a time
1261 : : * (we process the data sorted), so we know when we received all data for
1262 : : * a given key.
1263 : : *
1264 : : * Initializes sort support procedures for all index attributes.
1265 : : */
1266 : : static GinBuffer *
1267 : 52 : GinBufferInit(Relation index)
1268 : : {
95 michael@paquier.xyz 1269 :GNC 52 : GinBuffer *buffer = palloc0_object(GinBuffer);
1270 : : int i,
1271 : : nKeys;
377 tomas.vondra@postgre 1272 :CBC 52 : TupleDesc desc = RelationGetDescr(index);
1273 : :
1274 : : /*
1275 : : * How many items can we fit into the memory limit? We don't want to end
1276 : : * with too many TIDs. and 64kB seems more than enough. But maybe this
1277 : : * should be tied to maintenance_work_mem or something like that?
1278 : : */
376 1279 : 52 : buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData);
1280 : :
377 1281 : 52 : nKeys = IndexRelationGetNumberOfKeyAttributes(index);
1282 : :
95 michael@paquier.xyz 1283 :GNC 52 : buffer->ssup = palloc0_array(SortSupportData, nKeys);
1284 : :
1285 : : /*
1286 : : * Lookup ordering operator for the index key data type, and initialize
1287 : : * the sort support function.
1288 : : */
377 tomas.vondra@postgre 1289 [ + + ]:CBC 104 : for (i = 0; i < nKeys; i++)
1290 : : {
1291 : : Oid cmpFunc;
1292 : 52 : SortSupport sortKey = &buffer->ssup[i];
1293 : 52 : Form_pg_attribute att = TupleDescAttr(desc, i);
1294 : :
1295 : 52 : sortKey->ssup_cxt = CurrentMemoryContext;
1296 : 52 : sortKey->ssup_collation = index->rd_indcollation[i];
1297 : :
1298 [ + - ]: 52 : if (!OidIsValid(sortKey->ssup_collation))
1299 : 52 : sortKey->ssup_collation = DEFAULT_COLLATION_OID;
1300 : :
1301 : 52 : sortKey->ssup_nulls_first = false;
1302 : 52 : sortKey->ssup_attno = i + 1;
1303 : 52 : sortKey->abbreviate = false;
1304 : :
1305 [ - + ]: 52 : Assert(sortKey->ssup_attno != 0);
1306 : :
1307 : : /*
1308 : : * If the compare proc isn't specified in the opclass definition, look
1309 : : * up the index key type's default btree comparator.
1310 : : */
1311 : 52 : cmpFunc = index_getprocid(index, i + 1, GIN_COMPARE_PROC);
1312 [ - + ]: 52 : if (cmpFunc == InvalidOid)
1313 : : {
1314 : : TypeCacheEntry *typentry;
1315 : :
377 tomas.vondra@postgre 1316 :UBC 0 : typentry = lookup_type_cache(att->atttypid,
1317 : : TYPECACHE_CMP_PROC_FINFO);
1318 [ # # ]: 0 : if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
1319 [ # # ]: 0 : ereport(ERROR,
1320 : : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1321 : : errmsg("could not identify a comparison function for type %s",
1322 : : format_type_be(att->atttypid))));
1323 : :
1324 : 0 : cmpFunc = typentry->cmp_proc_finfo.fn_oid;
1325 : : }
1326 : :
377 tomas.vondra@postgre 1327 :CBC 52 : PrepareSortSupportComparisonShim(cmpFunc, sortKey);
1328 : : }
1329 : :
1330 : 52 : return buffer;
1331 : : }
1332 : :
1333 : : /* Is the buffer empty, i.e. has no TID values in the array? */
1334 : : static bool
1335 : 102344 : GinBufferIsEmpty(GinBuffer *buffer)
1336 : : {
1337 : 102344 : return (buffer->nitems == 0);
1338 : : }
1339 : :
1340 : : /*
1341 : : * GinBufferKeyEquals
1342 : : * Can the buffer store TIDs for the provided GIN tuple (same key)?
1343 : : *
1344 : : * Compare if the tuple matches the already accumulated data in the GIN
1345 : : * buffer. Compare scalar fields first, before the actual key.
1346 : : *
1347 : : * Returns true if the key matches, and the TID belongs to the buffer, or
1348 : : * false if the key does not match.
1349 : : */
1350 : : static bool
1351 : 36330 : GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
1352 : : {
1353 : : int r;
1354 : : Datum tupkey;
1355 : :
1356 : 36330 : AssertCheckGinBuffer(buffer);
1357 : :
1358 [ - + ]: 36330 : if (tup->attrnum != buffer->attnum)
377 tomas.vondra@postgre 1359 :UBC 0 : return false;
1360 : :
1361 : : /* same attribute should have the same type info */
377 tomas.vondra@postgre 1362 [ - + ]:CBC 36330 : Assert(tup->typbyval == buffer->typbyval);
1363 [ - + ]: 36330 : Assert(tup->typlen == buffer->typlen);
1364 : :
1365 [ + + ]: 36330 : if (tup->category != buffer->category)
1366 : 30 : return false;
1367 : :
1368 : : /*
1369 : : * For NULL/empty keys, this means equality, for normal keys we need to
1370 : : * compare the actual key value.
1371 : : */
1372 [ + + ]: 36300 : if (buffer->category != GIN_CAT_NORM_KEY)
1373 : 6 : return true;
1374 : :
1375 : : /*
1376 : : * For the tuple, get either the first sizeof(Datum) bytes for byval
1377 : : * types, or a pointer to the beginning of the data array.
1378 : : */
1379 [ + + ]: 36294 : tupkey = (buffer->typbyval) ? *(Datum *) tup->data : PointerGetDatum(tup->data);
1380 : :
1381 : 36294 : r = ApplySortComparator(buffer->key, false,
1382 : : tupkey, false,
1383 : 36294 : &buffer->ssup[buffer->attnum - 1]);
1384 : :
1385 : 36294 : return (r == 0);
1386 : : }
1387 : :
1388 : : /*
1389 : : * GinBufferShouldTrim
1390 : : * Should we trim the list of item pointers?
1391 : : *
1392 : : * By trimming we understand writing out and removing the tuple IDs that
1393 : : * we know can't change by future merges. We can deduce the TID up to which
1394 : : * this is guaranteed from the "first" TID in each GIN tuple, which provides
1395 : : * a "horizon" (for a given key) thanks to the sort.
1396 : : *
1397 : : * We don't want to do this too often - compressing longer TID lists is more
1398 : : * efficient. But we also don't want to accumulate too many TIDs, for two
1399 : : * reasons. First, it consumes memory and we might exceed maintenance_work_mem
1400 : : * (or whatever limit applies), even if that's unlikely because TIDs are very
1401 : : * small so we can fit a lot of them. Second, and more importantly, long TID
1402 : : * lists are an issue if the scan wraps around, because a key may get a very
1403 : : * wide list (with min/max TID for that key), forcing "full" mergesorts for
1404 : : * every list merged into it (instead of the efficient append).
1405 : : *
1406 : : * So we look at two things when deciding if to trim - if the resulting list
1407 : : * (after adding TIDs from the new tuple) would be too long, and if there is
1408 : : * enough TIDs to trim (with values less than "first" TID from the new tuple),
1409 : : * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary
1410 : : * number).
1411 : : *
1412 : : * We try freezing TIDs at the beginning of the list first, before attempting
1413 : : * to trim the buffer. This may allow trimming the data earlier, reducing the
1414 : : * memory usage and excluding it from the mergesort.
1415 : : */
1416 : : static bool
376 1417 : 36366 : GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
1418 : : {
1419 : : /*
1420 : : * Check if the last TID in the current list is frozen. This is the case
1421 : : * when merging non-overlapping lists, e.g. in each parallel worker.
1422 : : */
131 tomas.vondra@postgre 1423 [ + + - + ]:GNC 43224 : if ((buffer->nitems > 0) &&
1424 : 6858 : (ItemPointerCompare(&buffer->items[buffer->nitems - 1],
1425 : 6858 : GinTupleGetFirst(tup)) == 0))
131 tomas.vondra@postgre 1426 :UNC 0 : buffer->nfrozen = buffer->nitems;
1427 : :
1428 : : /*
1429 : : * Now find the last TID we know to be frozen, i.e. the last TID right
1430 : : * before the new GIN tuple.
1431 : : *
1432 : : * Start with the first not-yet-frozen tuple, and walk until we find the
1433 : : * first TID that's higher. If we already know the whole list is frozen
1434 : : * (i.e. nfrozen == nitems), this does nothing.
1435 : : *
1436 : : * XXX This might do a binary search for sufficiently long lists, but it
1437 : : * does not seem worth the complexity. Overlapping lists should be rare
1438 : : * common, TID comparisons are cheap, and we should quickly freeze most of
1439 : : * the list.
1440 : : */
131 tomas.vondra@postgre 1441 [ + + ]:GNC 99630 : for (int i = buffer->nfrozen; i < buffer->nitems; i++)
1442 : : {
1443 : : /* Is the TID after the first TID of the new tuple? Can't freeze. */
1444 [ + + ]: 69140 : if (ItemPointerCompare(&buffer->items[i],
1445 : 69140 : GinTupleGetFirst(tup)) > 0)
1446 : 5876 : break;
1447 : :
1448 : 63264 : buffer->nfrozen++;
1449 : : }
1450 : :
1451 : : /* not enough TIDs to trim (1024 is somewhat arbitrary number) */
376 tomas.vondra@postgre 1452 [ + - ]:CBC 36366 : if (buffer->nfrozen < 1024)
1453 : 36366 : return false;
1454 : :
1455 : : /* no need to trim if we have not hit the memory limit yet */
376 tomas.vondra@postgre 1456 [ # # ]:UBC 0 : if ((buffer->nitems + tup->nitems) < buffer->maxitems)
1457 : 0 : return false;
1458 : :
1459 : : /*
1460 : : * OK, we have enough frozen TIDs to flush, and we have hit the memory
1461 : : * limit, so it's time to write it out.
1462 : : */
1463 : 0 : return true;
1464 : : }
1465 : :
1466 : : /*
1467 : : * GinBufferStoreTuple
1468 : : * Add data (especially TID list) from a GIN tuple to the buffer.
1469 : : *
1470 : : * The buffer is expected to be empty (in which case it's initialized), or
1471 : : * having the same key. The TID values from the tuple are combined with the
1472 : : * stored values using a merge sort.
1473 : : *
1474 : : * The tuples (for the same key) are expected to be sorted by first TID. But
1475 : : * this does not guarantee the lists do not overlap, especially in the leader,
1476 : : * because the workers process interleaving data. There should be no overlaps
1477 : : * in a single worker - it could happen when the parallel scan wraps around,
1478 : : * but we detect that and flush the data (see ginBuildCallbackParallel).
1479 : : *
1480 : : * By sorting the GinTuple not only by key, but also by the first TID, we make
1481 : : * it more less likely the lists will overlap during merge. We merge them using
1482 : : * mergesort, but it's cheaper to just append one list to the other.
1483 : : *
1484 : : * How often can the lists overlap? There should be no overlaps in workers,
1485 : : * and in the leader we can see overlaps between lists built by different
1486 : : * workers. But the workers merge the items as much as possible, so there
1487 : : * should not be too many.
1488 : : */
1489 : : static void
377 tomas.vondra@postgre 1490 :CBC 36366 : GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
1491 : : {
1492 : : ItemPointerData *items;
1493 : : Datum key;
1494 : :
1495 : 36366 : AssertCheckGinBuffer(buffer);
1496 : :
376 1497 : 36366 : key = _gin_parse_tuple_key(tup);
1498 : 36366 : items = _gin_parse_tuple_items(tup);
1499 : :
1500 : : /* if the buffer is empty, set the fields (and copy the key) */
377 1501 [ + + ]: 36366 : if (GinBufferIsEmpty(buffer))
1502 : : {
1503 : 29508 : buffer->category = tup->category;
1504 : 29508 : buffer->keylen = tup->keylen;
1505 : 29508 : buffer->attnum = tup->attrnum;
1506 : :
1507 : 29508 : buffer->typlen = tup->typlen;
1508 : 29508 : buffer->typbyval = tup->typbyval;
1509 : :
1510 [ + + ]: 29508 : if (tup->category == GIN_CAT_NORM_KEY)
1511 : 29478 : buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen);
1512 : : else
1513 : 30 : buffer->key = (Datum) 0;
1514 : : }
1515 : :
1516 : : /* add the new TIDs into the buffer, combine using merge-sort */
1517 : : {
1518 : : int nnew;
1519 : : ItemPointer new;
1520 : :
1521 : : /*
1522 : : * Resize the array - we do this first, because we'll dereference the
1523 : : * first unfrozen TID, which would fail if the array is NULL. We'll
1524 : : * still pass 0 as number of elements in that array though.
1525 : : */
376 1526 [ + + ]: 36366 : if (buffer->items == NULL)
1527 : 36 : buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
1528 : : else
1529 : 36330 : buffer->items = repalloc(buffer->items,
1530 : 36330 : (buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
1531 : :
330 michael@paquier.xyz 1532 : 36366 : new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfrozen */
376 tomas.vondra@postgre 1533 : 36366 : (buffer->nitems - buffer->nfrozen), /* num of unfrozen */
377 1534 : 36366 : items, tup->nitems, &nnew);
1535 : :
376 1536 [ - + ]: 36366 : Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen)));
1537 : :
1538 : 36366 : memcpy(&buffer->items[buffer->nfrozen], new,
1539 : : nnew * sizeof(ItemPointerData));
1540 : :
1541 : 36366 : pfree(new);
1542 : :
1543 : 36366 : buffer->nitems += tup->nitems;
1544 : :
377 1545 : 36366 : AssertCheckItemPointers(buffer);
1546 : : }
1547 : :
1548 : : /* free the decompressed TID list */
376 1549 : 36366 : pfree(items);
377 1550 : 36366 : }
1551 : :
1552 : : /*
1553 : : * GinBufferReset
1554 : : * Reset the buffer into a state as if it contains no data.
1555 : : */
1556 : : static void
1557 : 29508 : GinBufferReset(GinBuffer *buffer)
1558 : : {
1559 [ - + ]: 29508 : Assert(!GinBufferIsEmpty(buffer));
1560 : :
1561 : : /* release byref values, do nothing for by-val ones */
1562 [ + + + + ]: 29508 : if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
1563 : 19162 : pfree(DatumGetPointer(buffer->key));
1564 : :
1565 : : /*
1566 : : * Not required, but makes it more likely to trigger NULL dereference if
1567 : : * using the value incorrectly, etc.
1568 : : */
1569 : 29508 : buffer->key = (Datum) 0;
1570 : :
1571 : 29508 : buffer->attnum = 0;
1572 : 29508 : buffer->category = 0;
1573 : 29508 : buffer->keylen = 0;
1574 : 29508 : buffer->nitems = 0;
376 1575 : 29508 : buffer->nfrozen = 0;
1576 : :
377 1577 : 29508 : buffer->typlen = 0;
1578 : 29508 : buffer->typbyval = 0;
1579 : 29508 : }
1580 : :
1581 : : /*
1582 : : * GinBufferTrim
1583 : : * Discard the "frozen" part of the TID list (which should have been
1584 : : * written to disk/index before this call).
1585 : : */
1586 : : static void
376 tomas.vondra@postgre 1587 :UBC 0 : GinBufferTrim(GinBuffer *buffer)
1588 : : {
1589 [ # # # # ]: 0 : Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems));
1590 : :
1591 : 0 : memmove(&buffer->items[0], &buffer->items[buffer->nfrozen],
1592 : 0 : sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen));
1593 : :
1594 : 0 : buffer->nitems -= buffer->nfrozen;
1595 : 0 : buffer->nfrozen = 0;
1596 : 0 : }
1597 : :
1598 : : /*
1599 : : * GinBufferFree
1600 : : * Release memory associated with the GinBuffer (including TID array).
1601 : : */
1602 : : static void
377 tomas.vondra@postgre 1603 :CBC 52 : GinBufferFree(GinBuffer *buffer)
1604 : : {
1605 [ + + ]: 52 : if (buffer->items)
1606 : 36 : pfree(buffer->items);
1607 : :
1608 : : /* release byref values, do nothing for by-val ones */
1609 [ - + ]: 52 : if (!GinBufferIsEmpty(buffer) &&
377 tomas.vondra@postgre 1610 [ # # # # ]:UBC 0 : (buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
1611 : 0 : pfree(DatumGetPointer(buffer->key));
1612 : :
377 tomas.vondra@postgre 1613 :CBC 52 : pfree(buffer);
1614 : 52 : }
1615 : :
1616 : : /*
1617 : : * GinBufferCanAddKey
1618 : : * Check if a given GIN tuple can be added to the current buffer.
1619 : : *
1620 : : * Returns true if the buffer is either empty or for the same index key.
1621 : : */
1622 : : static bool
1623 : 36366 : GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup)
1624 : : {
1625 : : /* empty buffer can accept data for any key */
1626 [ + + ]: 36366 : if (GinBufferIsEmpty(buffer))
1627 : 36 : return true;
1628 : :
1629 : : /* otherwise just data for the same key */
1630 : 36330 : return GinBufferKeyEquals(buffer, tup);
1631 : : }
1632 : :
1633 : : /*
1634 : : * Within leader, wait for end of heap scan and merge per-worker results.
1635 : : *
1636 : : * After waiting for all workers to finish, merge the per-worker results into
1637 : : * the complete index. The results from each worker are sorted by block number
1638 : : * (start of the page range). While combining the per-worker results we merge
1639 : : * summaries for the same page range, and also fill-in empty summaries for
1640 : : * ranges without any tuples.
1641 : : *
1642 : : * Returns the total number of heap tuples scanned.
1643 : : */
1644 : : static double
1645 : 13 : _gin_parallel_merge(GinBuildState *state)
1646 : : {
1647 : : GinTuple *tup;
1648 : : Size tuplen;
1649 : 13 : double reltuples = 0;
1650 : : GinBuffer *buffer;
1651 : :
1652 : : /* GIN tuples from workers, merged by leader */
1653 : 13 : double numtuples = 0;
1654 : :
1655 : : /* wait for workers to scan table and produce partial results */
1656 : 13 : reltuples = _gin_parallel_heapscan(state);
1657 : :
1658 : : /* Execute the sort */
1659 : 13 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1660 : : PROGRESS_GIN_PHASE_PERFORMSORT_2);
1661 : :
1662 : : /* do the actual sort in the leader */
1663 : 13 : tuplesort_performsort(state->bs_sortstate);
1664 : :
1665 : : /*
1666 : : * Initialize buffer to combine entries for the same key.
1667 : : *
1668 : : * The leader is allowed to use the whole maintenance_work_mem buffer to
1669 : : * combine data. The parallel workers already completed.
1670 : : */
1671 : 13 : buffer = GinBufferInit(state->ginstate.index);
1672 : :
1673 : : /*
1674 : : * Set the progress target for the next phase. Reset the block number
1675 : : * values set by table_index_build_scan
1676 : : */
1677 : : {
1678 : 13 : const int progress_index[] = {
1679 : : PROGRESS_CREATEIDX_SUBPHASE,
1680 : : PROGRESS_CREATEIDX_TUPLES_TOTAL,
1681 : : PROGRESS_SCAN_BLOCKS_TOTAL,
1682 : : PROGRESS_SCAN_BLOCKS_DONE
1683 : : };
1684 : 13 : const int64 progress_vals[] = {
1685 : : PROGRESS_GIN_PHASE_MERGE_2,
1686 : 13 : state->bs_numtuples,
1687 : : 0, 0
1688 : : };
1689 : :
1690 : 13 : pgstat_progress_update_multi_param(4, progress_index, progress_vals);
1691 : : }
1692 : :
1693 : : /*
1694 : : * Read the GIN tuples from the shared tuplesort, sorted by category and
1695 : : * key. That probably gives us order matching how data is organized in the
1696 : : * index.
1697 : : *
1698 : : * We don't insert the GIN tuples right away, but instead accumulate as
1699 : : * many TIDs for the same key as possible, and then insert that at once.
1700 : : * This way we don't need to decompress/recompress the posting lists, etc.
1701 : : */
1702 [ + + ]: 18196 : while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL)
1703 : : {
1704 : : MemoryContext oldCtx;
1705 : :
1706 [ - + ]: 18183 : CHECK_FOR_INTERRUPTS();
1707 : :
1708 : : /*
1709 : : * If the buffer can accept the new GIN tuple, just store it there and
1710 : : * we're done. If it's a different key (or maybe too much data) flush
1711 : : * the current contents into the index first.
1712 : : */
1713 [ + + ]: 18183 : if (!GinBufferCanAddKey(buffer, tup))
1714 : : {
1715 : : /*
1716 : : * Buffer is not empty and it's storing a different key - flush
1717 : : * the data into the insert, and start a new entry for current
1718 : : * GinTuple.
1719 : : */
1720 : 11313 : AssertCheckItemPointers(buffer);
1721 : :
317 1722 : 11313 : oldCtx = MemoryContextSwitchTo(state->tmpCtx);
1723 : :
377 1724 : 11313 : ginEntryInsert(&state->ginstate,
1725 : 11313 : buffer->attnum, buffer->key, buffer->category,
1726 : 11313 : buffer->items, buffer->nitems, &state->buildStats);
1727 : :
317 1728 : 11313 : MemoryContextSwitchTo(oldCtx);
1729 : 11313 : MemoryContextReset(state->tmpCtx);
1730 : :
1731 : : /* discard the existing data */
377 1732 : 11313 : GinBufferReset(buffer);
1733 : : }
1734 : :
1735 : : /*
1736 : : * We're about to add a GIN tuple to the buffer - check the memory
1737 : : * limit first, and maybe write out some of the data into the index
1738 : : * first, if needed (and possible). We only flush the part of the TID
1739 : : * list that we know won't change, and only if there's enough data for
1740 : : * compression to work well.
1741 : : */
376 1742 [ - + ]: 18183 : if (GinBufferShouldTrim(buffer, tup))
1743 : : {
376 tomas.vondra@postgre 1744 [ # # ]:UBC 0 : Assert(buffer->nfrozen > 0);
1745 : :
1746 : : /*
1747 : : * Buffer is not empty and it's storing a different key - flush
1748 : : * the data into the insert, and start a new entry for current
1749 : : * GinTuple.
1750 : : */
1751 : 0 : AssertCheckItemPointers(buffer);
1752 : :
317 1753 : 0 : oldCtx = MemoryContextSwitchTo(state->tmpCtx);
1754 : :
376 1755 : 0 : ginEntryInsert(&state->ginstate,
1756 : 0 : buffer->attnum, buffer->key, buffer->category,
1757 : 0 : buffer->items, buffer->nfrozen, &state->buildStats);
1758 : :
317 1759 : 0 : MemoryContextSwitchTo(oldCtx);
1760 : 0 : MemoryContextReset(state->tmpCtx);
1761 : :
1762 : : /* truncate the data we've just discarded */
376 1763 : 0 : GinBufferTrim(buffer);
1764 : : }
1765 : :
1766 : : /*
1767 : : * Remember data for the current tuple (either remember the new key,
1768 : : * or append if to the existing data).
1769 : : */
377 tomas.vondra@postgre 1770 :CBC 18183 : GinBufferStoreTuple(buffer, tup);
1771 : :
1772 : : /* Report progress */
1773 : 18183 : pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
1774 : : ++numtuples);
1775 : : }
1776 : :
1777 : : /* flush data remaining in the buffer (for the last key) */
1778 [ + + ]: 13 : if (!GinBufferIsEmpty(buffer))
1779 : : {
1780 : 12 : AssertCheckItemPointers(buffer);
1781 : :
1782 : 12 : ginEntryInsert(&state->ginstate,
1783 : 12 : buffer->attnum, buffer->key, buffer->category,
1784 : 12 : buffer->items, buffer->nitems, &state->buildStats);
1785 : :
1786 : : /* discard the existing data */
1787 : 12 : GinBufferReset(buffer);
1788 : :
1789 : : /* Report progress */
1790 : 12 : pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
1791 : : ++numtuples);
1792 : : }
1793 : :
1794 : : /* release all the memory */
1795 : 13 : GinBufferFree(buffer);
1796 : :
1797 : 13 : tuplesort_end(state->bs_sortstate);
1798 : :
1799 : 13 : return reltuples;
1800 : : }
1801 : :
1802 : : /*
1803 : : * Returns size of shared memory required to store state for a parallel
1804 : : * gin index build based on the snapshot its parallel scan will use.
1805 : : */
1806 : : static Size
1807 : 13 : _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
1808 : : {
1809 : : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
1810 : 13 : return add_size(BUFFERALIGN(sizeof(GinBuildShared)),
1811 : : table_parallelscan_estimate(heap, snapshot));
1812 : : }
1813 : :
1814 : : /*
1815 : : * Within leader, participate as a parallel worker.
1816 : : */
1817 : : static void
1818 : 13 : _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Relation index)
1819 : : {
1820 : 13 : GinLeader *ginleader = buildstate->bs_leader;
1821 : : int sortmem;
1822 : :
1823 : : /*
1824 : : * Might as well use reliable figure when doling out maintenance_work_mem
1825 : : * (when requested number of workers were not launched, this will be
1826 : : * somewhat higher than it is for other workers).
1827 : : */
1828 : 13 : sortmem = maintenance_work_mem / ginleader->nparticipanttuplesorts;
1829 : :
1830 : : /* Perform work common to all participants */
1831 : 13 : _gin_parallel_scan_and_build(buildstate, ginleader->ginshared,
1832 : : ginleader->sharedsort, heap, index,
1833 : : sortmem, true);
1834 : 13 : }
1835 : :
1836 : : /*
1837 : : * _gin_process_worker_data
1838 : : * First phase of the key merging, happening in the worker.
1839 : : *
1840 : : * Depending on the number of distinct keys, the TID lists produced by the
1841 : : * callback may be very short (due to frequent evictions in the callback).
1842 : : * But combining many tiny lists is expensive, so we try to do as much as
1843 : : * possible in the workers and only then pass the results to the leader.
1844 : : *
1845 : : * We read the tuples sorted by the key, and merge them into larger lists.
1846 : : * At the moment there's no memory limit, so this will just produce one
1847 : : * huge (sorted) list per key in each worker. Which means the leader will
1848 : : * do a very limited number of mergesorts, which is good.
1849 : : */
1850 : : static void
1851 : 39 : _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort,
1852 : : bool progress)
1853 : : {
1854 : : GinTuple *tup;
1855 : : Size tuplen;
1856 : :
1857 : : GinBuffer *buffer;
1858 : :
1859 : : /*
1860 : : * Initialize buffer to combine entries for the same key.
1861 : : *
1862 : : * The workers are limited to the same amount of memory as during the sort
1863 : : * in ginBuildCallbackParallel. But this probably should be the 32MB used
1864 : : * during planning, just like there.
1865 : : */
1866 : 39 : buffer = GinBufferInit(state->ginstate.index);
1867 : :
1868 : : /* sort the raw per-worker data */
1869 [ + + ]: 39 : if (progress)
1870 : 13 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1871 : : PROGRESS_GIN_PHASE_PERFORMSORT_1);
1872 : :
1873 : 39 : tuplesort_performsort(state->bs_worker_sort);
1874 : :
1875 : : /* reset the number of GIN tuples produced by this worker */
1876 : 39 : state->bs_numtuples = 0;
1877 : :
1878 [ + + ]: 39 : if (progress)
1879 : 13 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1880 : : PROGRESS_GIN_PHASE_MERGE_1);
1881 : :
1882 : : /*
1883 : : * Read the GIN tuples from the shared tuplesort, sorted by the key, and
1884 : : * merge them into larger chunks for the leader to combine.
1885 : : */
1886 [ + + ]: 18222 : while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL)
1887 : : {
1888 : :
1889 [ + + ]: 18183 : CHECK_FOR_INTERRUPTS();
1890 : :
1891 : : /*
1892 : : * If the buffer can accept the new GIN tuple, just store it there and
1893 : : * we're done. If it's a different key (or maybe too much data) flush
1894 : : * the current contents into the index first.
1895 : : */
1896 [ + + ]: 18183 : if (!GinBufferCanAddKey(buffer, tup))
1897 : : {
1898 : : GinTuple *ntup;
1899 : : Size ntuplen;
1900 : :
1901 : : /*
1902 : : * Buffer is not empty and it's storing a different key - flush
1903 : : * the data into the insert, and start a new entry for current
1904 : : * GinTuple.
1905 : : */
1906 : 18159 : AssertCheckItemPointers(buffer);
1907 : :
1908 : 18159 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1909 : 18159 : buffer->key, buffer->typlen, buffer->typbyval,
1910 : 18159 : buffer->items, buffer->nitems, &ntuplen);
1911 : :
1912 : 18159 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1913 : 18159 : state->bs_numtuples++;
1914 : :
1915 : 18159 : pfree(ntup);
1916 : :
1917 : : /* discard the existing data */
1918 : 18159 : GinBufferReset(buffer);
1919 : : }
1920 : :
1921 : : /*
1922 : : * We're about to add a GIN tuple to the buffer - check the memory
1923 : : * limit first, and maybe write out some of the data into the index
1924 : : * first, if needed (and possible). We only flush the part of the TID
1925 : : * list that we know won't change, and only if there's enough data for
1926 : : * compression to work well.
1927 : : */
376 1928 [ - + ]: 18183 : if (GinBufferShouldTrim(buffer, tup))
1929 : : {
1930 : : GinTuple *ntup;
1931 : : Size ntuplen;
1932 : :
376 tomas.vondra@postgre 1933 [ # # ]:UBC 0 : Assert(buffer->nfrozen > 0);
1934 : :
1935 : : /*
1936 : : * Buffer is not empty and it's storing a different key - flush
1937 : : * the data into the insert, and start a new entry for current
1938 : : * GinTuple.
1939 : : */
1940 : 0 : AssertCheckItemPointers(buffer);
1941 : :
1942 : 0 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1943 : 0 : buffer->key, buffer->typlen, buffer->typbyval,
1944 : 0 : buffer->items, buffer->nfrozen, &ntuplen);
1945 : :
1946 : 0 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1947 : :
1948 : 0 : pfree(ntup);
1949 : :
1950 : : /* truncate the data we've just discarded */
1951 : 0 : GinBufferTrim(buffer);
1952 : : }
1953 : :
1954 : : /*
1955 : : * Remember data for the current tuple (either remember the new key,
1956 : : * or append if to the existing data).
1957 : : */
377 tomas.vondra@postgre 1958 :CBC 18183 : GinBufferStoreTuple(buffer, tup);
1959 : : }
1960 : :
1961 : : /* flush data remaining in the buffer (for the last key) */
1962 [ + + ]: 39 : if (!GinBufferIsEmpty(buffer))
1963 : : {
1964 : : GinTuple *ntup;
1965 : : Size ntuplen;
1966 : :
1967 : 24 : AssertCheckItemPointers(buffer);
1968 : :
1969 : 24 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1970 : 24 : buffer->key, buffer->typlen, buffer->typbyval,
1971 : 24 : buffer->items, buffer->nitems, &ntuplen);
1972 : :
1973 : 24 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1974 : 24 : state->bs_numtuples++;
1975 : :
1976 : 24 : pfree(ntup);
1977 : :
1978 : : /* discard the existing data */
1979 : 24 : GinBufferReset(buffer);
1980 : : }
1981 : :
1982 : : /* release all the memory */
1983 : 39 : GinBufferFree(buffer);
1984 : :
1985 : 39 : tuplesort_end(worker_sort);
1986 : 39 : }
1987 : :
1988 : : /*
1989 : : * Perform a worker's portion of a parallel GIN index build sort.
1990 : : *
1991 : : * This generates a tuplesort for the worker portion of the table.
1992 : : *
1993 : : * sortmem is the amount of working memory to use within each worker,
1994 : : * expressed in KBs.
1995 : : *
1996 : : * When this returns, workers are done, and need only release resources.
1997 : : *
1998 : : * Before feeding data into a shared tuplesort (for the leader process),
1999 : : * the workers process data in two phases.
2000 : : *
2001 : : * 1) A worker reads a portion of rows from the table, accumulates entries
2002 : : * in memory, and flushes them into a private tuplesort (e.g. because of
2003 : : * using too much memory).
2004 : : *
2005 : : * 2) The private tuplesort gets sorted (by key and TID), the worker reads
2006 : : * the data again, and combines the entries as much as possible. This has
2007 : : * to happen eventually, and this way it's done in workers in parallel.
2008 : : *
2009 : : * Finally, the combined entries are written into the shared tuplesort, so
2010 : : * that the leader can process them.
2011 : : *
2012 : : * How well this works (compared to just writing entries into the shared
2013 : : * tuplesort) depends on the data set. For large tables with many distinct
2014 : : * keys this helps a lot. With many distinct keys it's likely the buffers has
2015 : : * to be flushed often, generating many entries with the same key and short
2016 : : * TID lists. These entries need to be sorted and merged at some point,
2017 : : * before writing them to the index. The merging is quite expensive, it can
2018 : : * easily be ~50% of a serial build, and doing as much of it in the workers
2019 : : * means it's parallelized. The leader still has to merge results from the
2020 : : * workers, but it's much more efficient to merge few large entries than
2021 : : * many tiny ones.
2022 : : *
2023 : : * This also reduces the amount of data the workers pass to the leader through
2024 : : * the shared tuplesort. OTOH the workers need more space for the private sort,
2025 : : * possibly up to 2x of the data, if no entries be merged in a worker. But this
2026 : : * is very unlikely, and the only consequence is inefficiency, so we ignore it.
2027 : : */
2028 : : static void
2029 : 39 : _gin_parallel_scan_and_build(GinBuildState *state,
2030 : : GinBuildShared *ginshared, Sharedsort *sharedsort,
2031 : : Relation heap, Relation index,
2032 : : int sortmem, bool progress)
2033 : : {
2034 : : SortCoordinate coordinate;
2035 : : TableScanDesc scan;
2036 : : double reltuples;
2037 : : IndexInfo *indexInfo;
2038 : :
2039 : : /* Initialize local tuplesort coordination state */
95 michael@paquier.xyz 2040 :GNC 39 : coordinate = palloc0_object(SortCoordinateData);
377 tomas.vondra@postgre 2041 :CBC 39 : coordinate->isWorker = true;
2042 : 39 : coordinate->nParticipants = -1;
2043 : 39 : coordinate->sharedsort = sharedsort;
2044 : :
2045 : : /* remember how much space is allowed for the accumulated entries */
2046 : 39 : state->work_mem = (sortmem / 2);
2047 : :
2048 : : /* remember how many workers participate in the build */
131 2049 : 39 : state->bs_num_workers = ginshared->scantuplesortstates;
2050 : :
2051 : : /* Begin "partial" tuplesort */
377 2052 : 39 : state->bs_sortstate = tuplesort_begin_index_gin(heap, index,
2053 : : state->work_mem,
2054 : : coordinate,
2055 : : TUPLESORT_NONE);
2056 : :
2057 : : /* Local per-worker sort of raw-data */
2058 : 39 : state->bs_worker_sort = tuplesort_begin_index_gin(heap, index,
2059 : : state->work_mem,
2060 : : NULL,
2061 : : TUPLESORT_NONE);
2062 : :
2063 : : /* Join parallel scan */
2064 : 39 : indexInfo = BuildIndexInfo(index);
2065 : 39 : indexInfo->ii_Concurrent = ginshared->isconcurrent;
2066 : :
2067 : 39 : scan = table_beginscan_parallel(heap,
2068 : : ParallelTableScanFromGinBuildShared(ginshared));
2069 : :
2070 : 39 : reltuples = table_index_build_scan(heap, index, indexInfo, true, progress,
2071 : : ginBuildCallbackParallel, state, scan);
2072 : :
2073 : : /* write remaining accumulated entries */
2074 : 39 : ginFlushBuildState(state, index);
2075 : :
2076 : : /*
2077 : : * Do the first phase of in-worker processing - sort the data produced by
2078 : : * the callback, and combine them into much larger chunks and place that
2079 : : * into the shared tuplestore for leader to process.
2080 : : */
2081 : 39 : _gin_process_worker_data(state, state->bs_worker_sort, progress);
2082 : :
2083 : : /* sort the GIN tuples built by this worker */
2084 : 39 : tuplesort_performsort(state->bs_sortstate);
2085 : :
2086 : 39 : state->bs_reltuples += reltuples;
2087 : :
2088 : : /*
2089 : : * Done. Record ambuild statistics.
2090 : : */
2091 [ - + ]: 39 : SpinLockAcquire(&ginshared->mutex);
2092 : 39 : ginshared->nparticipantsdone++;
2093 : 39 : ginshared->reltuples += state->bs_reltuples;
2094 : 39 : ginshared->indtuples += state->bs_numtuples;
2095 : 39 : SpinLockRelease(&ginshared->mutex);
2096 : :
2097 : : /* Notify leader */
2098 : 39 : ConditionVariableSignal(&ginshared->workersdonecv);
2099 : :
2100 : 39 : tuplesort_end(state->bs_sortstate);
2101 : 39 : }
2102 : :
2103 : : /*
2104 : : * Perform work within a launched parallel process.
2105 : : */
2106 : : void
2107 : 26 : _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2108 : : {
2109 : : char *sharedquery;
2110 : : GinBuildShared *ginshared;
2111 : : Sharedsort *sharedsort;
2112 : : GinBuildState buildstate;
2113 : : Relation heapRel;
2114 : : Relation indexRel;
2115 : : LOCKMODE heapLockmode;
2116 : : LOCKMODE indexLockmode;
2117 : : WalUsage *walusage;
2118 : : BufferUsage *bufferusage;
2119 : : int sortmem;
2120 : :
2121 : : /*
2122 : : * The only possible status flag that can be set to the parallel worker is
2123 : : * PROC_IN_SAFE_IC.
2124 : : */
2125 [ + + - + ]: 26 : Assert((MyProc->statusFlags == 0) ||
2126 : : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2127 : :
2128 : : /* Set debug_query_string for individual workers first */
2129 : 26 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2130 : 26 : debug_query_string = sharedquery;
2131 : :
2132 : : /* Report the query string from leader */
2133 : 26 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2134 : :
2135 : : /* Look up gin shared state */
2136 : 26 : ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false);
2137 : :
2138 : : /* Open relations using lock modes known to be obtained by index.c */
2139 [ + + ]: 26 : if (!ginshared->isconcurrent)
2140 : : {
2141 : 20 : heapLockmode = ShareLock;
2142 : 20 : indexLockmode = AccessExclusiveLock;
2143 : : }
2144 : : else
2145 : : {
2146 : 6 : heapLockmode = ShareUpdateExclusiveLock;
2147 : 6 : indexLockmode = RowExclusiveLock;
2148 : : }
2149 : :
2150 : : /* Open relations within worker */
2151 : 26 : heapRel = table_open(ginshared->heaprelid, heapLockmode);
2152 : 26 : indexRel = index_open(ginshared->indexrelid, indexLockmode);
2153 : :
2154 : : /* initialize the GIN build state */
2155 : 26 : initGinState(&buildstate.ginstate, indexRel);
2156 : 26 : buildstate.indtuples = 0;
2157 : 26 : memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
2158 : 26 : memset(&buildstate.tid, 0, sizeof(ItemPointerData));
2159 : :
2160 : : /*
2161 : : * create a temporary memory context that is used to hold data not yet
2162 : : * dumped out to the index
2163 : : */
2164 : 26 : buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
2165 : : "Gin build temporary context",
2166 : : ALLOCSET_DEFAULT_SIZES);
2167 : :
2168 : : /*
2169 : : * create a temporary memory context that is used for calling
2170 : : * ginExtractEntries(), and can be reset after each tuple
2171 : : */
2172 : 26 : buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
2173 : : "Gin build temporary context for user-defined function",
2174 : : ALLOCSET_DEFAULT_SIZES);
2175 : :
2176 : 26 : buildstate.accum.ginstate = &buildstate.ginstate;
2177 : 26 : ginInitBA(&buildstate.accum);
2178 : :
2179 : :
2180 : : /* Look up shared state private to tuplesort.c */
2181 : 26 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2182 : 26 : tuplesort_attach_shared(sharedsort, seg);
2183 : :
2184 : : /* Prepare to track buffer usage during parallel execution */
2185 : 26 : InstrStartParallelQuery();
2186 : :
2187 : : /*
2188 : : * Might as well use reliable figure when doling out maintenance_work_mem
2189 : : * (when requested number of workers were not launched, this will be
2190 : : * somewhat higher than it is for other workers).
2191 : : */
2192 : 26 : sortmem = maintenance_work_mem / ginshared->scantuplesortstates;
2193 : :
2194 : 26 : _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort,
2195 : : heapRel, indexRel, sortmem, false);
2196 : :
2197 : : /* Report WAL/buffer usage during parallel execution */
2198 : 26 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2199 : 26 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2200 : 26 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2201 : 26 : &walusage[ParallelWorkerNumber]);
2202 : :
2203 : 26 : index_close(indexRel, indexLockmode);
2204 : 26 : table_close(heapRel, heapLockmode);
2205 : 26 : }
2206 : :
2207 : : /*
2208 : : * Used to keep track of compressed TID lists when building a GIN tuple.
2209 : : */
2210 : : typedef struct
2211 : : {
2212 : : dlist_node node; /* linked list pointers */
2213 : : GinPostingList *seg;
2214 : : } GinSegmentInfo;
2215 : :
2216 : : /*
2217 : : * _gin_build_tuple
2218 : : * Serialize the state for an index key into a tuple for tuplesort.
2219 : : *
2220 : : * The tuple has a number of scalar fields (mostly matching the build state),
2221 : : * and then a data array that stores the key first, and then the TID list.
2222 : : *
2223 : : * For by-reference data types, we store the actual data. For by-val types
2224 : : * we simply copy the whole Datum, so that we don't have to care about stuff
2225 : : * like endianness etc. We could make it a little bit smaller, but it's not
2226 : : * worth it - it's a tiny fraction of the data, and we need to MAXALIGN the
2227 : : * start of the TID list anyway. So we wouldn't save anything. (This would
2228 : : * not be a good idea for the permanent in-index data, since we'd prefer
2229 : : * that that not depend on sizeof(Datum). But this is just a transient
2230 : : * representation to use while sorting the data.)
2231 : : *
2232 : : * The TID list is serialized as compressed - it's highly compressible, and
2233 : : * we already have ginCompressPostingList for this purpose. The list may be
2234 : : * pretty long, so we compress it into multiple segments and then copy all
2235 : : * of that into the GIN tuple.
2236 : : */
2237 : : static GinTuple *
2238 : 36366 : _gin_build_tuple(OffsetNumber attrnum, unsigned char category,
2239 : : Datum key, int16 typlen, bool typbyval,
2240 : : ItemPointerData *items, uint32 nitems,
2241 : : Size *len)
2242 : : {
2243 : : GinTuple *tuple;
2244 : : char *ptr;
2245 : :
2246 : : Size tuplen;
2247 : : int keylen;
2248 : :
2249 : : dlist_mutable_iter iter;
2250 : : dlist_head segments;
2251 : : int ncompressed;
2252 : : Size compresslen;
2253 : :
2254 : : /*
2255 : : * Calculate how long is the key value. Only keys with GIN_CAT_NORM_KEY
2256 : : * have actual non-empty key. We include varlena headers and \0 bytes for
2257 : : * strings, to make it easier to access the data in-line.
2258 : : *
2259 : : * For byval types we simply copy the whole Datum. We could store just the
2260 : : * necessary bytes, but this is simpler to work with and not worth the
2261 : : * extra complexity. Moreover we still need to do the MAXALIGN to allow
2262 : : * direct access to items pointers.
2263 : : *
2264 : : * XXX Note that for byval types we store the whole datum, no matter what
2265 : : * the typlen value is.
2266 : : */
2267 [ + + ]: 36366 : if (category != GIN_CAT_NORM_KEY)
2268 : 36 : keylen = 0;
2269 [ + + ]: 36330 : else if (typbyval)
2270 : 10654 : keylen = sizeof(Datum);
2271 [ - + ]: 25676 : else if (typlen > 0)
377 tomas.vondra@postgre 2272 :UBC 0 : keylen = typlen;
377 tomas.vondra@postgre 2273 [ + - ]:CBC 25676 : else if (typlen == -1)
222 peter@eisentraut.org 2274 :GNC 25676 : keylen = VARSIZE_ANY(DatumGetPointer(key));
377 tomas.vondra@postgre 2275 [ # # ]:UBC 0 : else if (typlen == -2)
2276 : 0 : keylen = strlen(DatumGetPointer(key)) + 1;
2277 : : else
2278 [ # # ]: 0 : elog(ERROR, "unexpected typlen value (%d)", typlen);
2279 : :
2280 : : /* compress the item pointers */
376 tomas.vondra@postgre 2281 :CBC 36366 : ncompressed = 0;
2282 : 36366 : compresslen = 0;
2283 : 36366 : dlist_init(&segments);
2284 : :
2285 : : /* generate compressed segments of TID list chunks */
2286 [ + + ]: 72732 : while (ncompressed < nitems)
2287 : : {
2288 : : int cnt;
95 michael@paquier.xyz 2289 :GNC 36366 : GinSegmentInfo *seginfo = palloc_object(GinSegmentInfo);
2290 : :
376 tomas.vondra@postgre 2291 :CBC 72732 : seginfo->seg = ginCompressPostingList(&items[ncompressed],
2292 : 36366 : (nitems - ncompressed),
2293 : : UINT16_MAX,
2294 : : &cnt);
2295 : :
2296 : 36366 : ncompressed += cnt;
2297 : 36366 : compresslen += SizeOfGinPostingList(seginfo->seg);
2298 : :
2299 : 36366 : dlist_push_tail(&segments, &seginfo->node);
2300 : : }
2301 : :
2302 : : /*
2303 : : * Determine GIN tuple length with all the data included. Be careful about
2304 : : * alignment, to allow direct access to compressed segments (those require
2305 : : * only SHORTALIGN).
2306 : : */
2307 : 36366 : tuplen = SHORTALIGN(offsetof(GinTuple, data) + keylen) + compresslen;
2308 : :
377 2309 : 36366 : *len = tuplen;
2310 : :
2311 : : /*
2312 : : * Allocate space for the whole GIN tuple.
2313 : : *
2314 : : * The palloc0 is needed - writetup_index_gin will write the whole tuple
2315 : : * to disk, so we need to make sure the padding bytes are defined
2316 : : * (otherwise valgrind would report this).
2317 : : */
2318 : 36366 : tuple = palloc0(tuplen);
2319 : :
2320 : 36366 : tuple->tuplen = tuplen;
2321 : 36366 : tuple->attrnum = attrnum;
2322 : 36366 : tuple->category = category;
2323 : 36366 : tuple->keylen = keylen;
2324 : 36366 : tuple->nitems = nitems;
2325 : :
2326 : : /* key type info */
2327 : 36366 : tuple->typlen = typlen;
2328 : 36366 : tuple->typbyval = typbyval;
2329 : :
2330 : : /*
2331 : : * Copy the key and items into the tuple. First the key value, which we
2332 : : * can simply copy right at the beginning of the data array.
2333 : : */
2334 [ + + ]: 36366 : if (category == GIN_CAT_NORM_KEY)
2335 : : {
2336 [ + + ]: 36330 : if (typbyval)
2337 : : {
2338 : 10654 : memcpy(tuple->data, &key, sizeof(Datum));
2339 : : }
2340 [ - + ]: 25676 : else if (typlen > 0) /* byref, fixed length */
2341 : : {
377 tomas.vondra@postgre 2342 :UBC 0 : memcpy(tuple->data, DatumGetPointer(key), typlen);
2343 : : }
377 tomas.vondra@postgre 2344 [ + - ]:CBC 25676 : else if (typlen == -1)
2345 : : {
2346 : 25676 : memcpy(tuple->data, DatumGetPointer(key), keylen);
2347 : : }
377 tomas.vondra@postgre 2348 [ # # ]:UBC 0 : else if (typlen == -2)
2349 : : {
2350 : 0 : memcpy(tuple->data, DatumGetPointer(key), keylen);
2351 : : }
2352 : : }
2353 : :
2354 : : /* finally, copy the TIDs into the array */
377 tomas.vondra@postgre 2355 :CBC 36366 : ptr = (char *) tuple + SHORTALIGN(offsetof(GinTuple, data) + keylen);
2356 : :
2357 : : /* copy in the compressed data, and free the segments */
376 2358 [ + - + + ]: 72732 : dlist_foreach_modify(iter, &segments)
2359 : : {
2360 : 36366 : GinSegmentInfo *seginfo = dlist_container(GinSegmentInfo, node, iter.cur);
2361 : :
2362 : 36366 : memcpy(ptr, seginfo->seg, SizeOfGinPostingList(seginfo->seg));
2363 : :
2364 : 36366 : ptr += SizeOfGinPostingList(seginfo->seg);
2365 : :
2366 : 36366 : dlist_delete(&seginfo->node);
2367 : :
2368 : 36366 : pfree(seginfo->seg);
2369 : 36366 : pfree(seginfo);
2370 : : }
2371 : :
377 2372 : 36366 : return tuple;
2373 : : }
2374 : :
2375 : : /*
2376 : : * _gin_parse_tuple_key
2377 : : * Return a Datum representing the key stored in the tuple.
2378 : : *
2379 : : * Most of the tuple fields are directly accessible, the only thing that
2380 : : * needs more care is the key and the TID list.
2381 : : *
2382 : : * For the key, this returns a regular Datum representing it. It's either the
2383 : : * actual key value, or a pointer to the beginning of the data array (which is
2384 : : * where the data was copied by _gin_build_tuple).
2385 : : */
2386 : : static Datum
376 2387 : 162044 : _gin_parse_tuple_key(GinTuple *a)
2388 : : {
2389 : : Datum key;
2390 : :
377 2391 [ + + ]: 162044 : if (a->category != GIN_CAT_NORM_KEY)
2392 : 36 : return (Datum) 0;
2393 : :
2394 [ + + ]: 162008 : if (a->typbyval)
2395 : : {
2396 : 35946 : memcpy(&key, a->data, a->keylen);
2397 : 35946 : return key;
2398 : : }
2399 : :
2400 : 126062 : return PointerGetDatum(a->data);
2401 : : }
2402 : :
2403 : : /*
2404 : : * _gin_parse_tuple_items
2405 : : * Return a pointer to a palloc'd array of decompressed TID array.
2406 : : */
2407 : : static ItemPointer
376 2408 : 36366 : _gin_parse_tuple_items(GinTuple *a)
2409 : : {
2410 : : int len;
2411 : : char *ptr;
2412 : : int ndecoded;
2413 : : ItemPointer items;
2414 : :
2415 : 36366 : len = a->tuplen - SHORTALIGN(offsetof(GinTuple, data) + a->keylen);
2416 : 36366 : ptr = (char *) a + SHORTALIGN(offsetof(GinTuple, data) + a->keylen);
2417 : :
2418 : 36366 : items = ginPostingListDecodeAllSegments((GinPostingList *) ptr, len, &ndecoded);
2419 : :
2420 [ - + ]: 36366 : Assert(ndecoded == a->nitems);
2421 : :
103 peter@eisentraut.org 2422 :GNC 36366 : return items;
2423 : : }
2424 : :
2425 : : /*
2426 : : * _gin_compare_tuples
2427 : : * Compare GIN tuples, used by tuplesort during parallel index build.
2428 : : *
2429 : : * The scalar fields (attrnum, category) are compared first, the key value is
2430 : : * compared last. The comparisons are done using type-specific sort support
2431 : : * functions.
2432 : : *
2433 : : * If the key value matches, we compare the first TID value in the TID list,
2434 : : * which means the tuples are merged in an order in which they are most
2435 : : * likely to be simply concatenated. (This "first" TID will also allow us
2436 : : * to determine a point up to which the list is fully determined and can be
2437 : : * written into the index to enforce a memory limit etc.)
2438 : : */
2439 : : int
377 tomas.vondra@postgre 2440 :CBC 62899 : _gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup)
2441 : : {
2442 : : int r;
2443 : : Datum keya,
2444 : : keyb;
2445 : :
2446 [ - + ]: 62899 : if (a->attrnum < b->attrnum)
377 tomas.vondra@postgre 2447 :UBC 0 : return -1;
2448 : :
377 tomas.vondra@postgre 2449 [ - + ]:CBC 62899 : if (a->attrnum > b->attrnum)
377 tomas.vondra@postgre 2450 :UBC 0 : return 1;
2451 : :
377 tomas.vondra@postgre 2452 [ + + ]:CBC 62899 : if (a->category < b->category)
2453 : 40 : return -1;
2454 : :
2455 [ + + ]: 62859 : if (a->category > b->category)
2456 : 12 : return 1;
2457 : :
2458 [ + + ]: 62847 : if (a->category == GIN_CAT_NORM_KEY)
2459 : : {
376 2460 : 62839 : keya = _gin_parse_tuple_key(a);
2461 : 62839 : keyb = _gin_parse_tuple_key(b);
2462 : :
377 2463 : 62839 : r = ApplySortComparator(keya, false,
2464 : : keyb, false,
2465 : 62839 : &ssup[a->attrnum - 1]);
2466 : :
2467 : : /* if the key is the same, consider the first TID in the array */
2468 [ + + ]: 73703 : return (r != 0) ? r : ItemPointerCompare(GinTupleGetFirst(a),
377 tomas.vondra@postgre 2469 :GIC 10864 : GinTupleGetFirst(b));
2470 : : }
2471 : :
377 tomas.vondra@postgre 2472 :CBC 8 : return ItemPointerCompare(GinTupleGetFirst(a),
377 tomas.vondra@postgre 2473 :GIC 8 : GinTupleGetFirst(b));
2474 : : }
|