Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * spgdoinsert.c
4 : : * implementation of insert algorithm
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/spgist/spgdoinsert.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres.h"
17 : :
18 : : #include "access/genam.h"
19 : : #include "access/spgist_private.h"
20 : : #include "access/spgxlog.h"
21 : : #include "access/xloginsert.h"
22 : : #include "common/int.h"
23 : : #include "common/pg_prng.h"
24 : : #include "miscadmin.h"
25 : : #include "storage/bufmgr.h"
26 : : #include "utils/rel.h"
27 : :
28 : :
29 : : /*
30 : : * SPPageDesc tracks all info about a page we are inserting into. In some
31 : : * situations it actually identifies a tuple, or even a specific node within
32 : : * an inner tuple. But any of the fields can be invalid. If the buffer
33 : : * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 : : * page pointer should be valid exactly when buffer is.
35 : : */
36 : : typedef struct SPPageDesc
37 : : {
38 : : BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 : : Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 : : Page page; /* pointer to page buffer, or NULL */
41 : : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 : : int node; /* node number within inner tuple, or -1 */
43 : : } SPPageDesc;
44 : :
45 : :
46 : : /*
47 : : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 : : * is used to update the parent inner tuple's downlink after a move or
49 : : * split operation.
50 : : */
51 : : void
5200 tgl@sss.pgh.pa.us 52 :CBC 5960 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
53 : : BlockNumber blkno, OffsetNumber offset)
54 : : {
55 : : int i;
56 : : SpGistNodeTuple node;
57 : :
5202 58 [ + - ]: 30617 : SGITITERATE(tup, i, node)
59 : : {
60 [ + + ]: 30617 : if (i == nodeN)
61 : : {
62 : 5960 : ItemPointerSet(&node->t_tid, blkno, offset);
63 : 5960 : return;
64 : : }
65 : : }
66 : :
5202 tgl@sss.pgh.pa.us 67 [ # # ]:UBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 : : nodeN);
69 : : }
70 : :
71 : : /*
72 : : * Form a new inner tuple containing one more node than the given one, with
73 : : * the specified label datum, inserted at offset "offset" in the node array.
74 : : * The new tuple's prefix is the same as the old one's.
75 : : *
76 : : * Note that the new node initially has an invalid downlink. We'll find a
77 : : * page to point it to later.
78 : : */
79 : : static SpGistInnerTuple
5202 tgl@sss.pgh.pa.us 80 :CBC 789 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81 : : {
82 : : SpGistNodeTuple node,
83 : : *nodes;
84 : : int i;
85 : :
86 : : /* if offset is negative, insert at end */
87 [ - + ]: 789 : if (offset < 0)
5202 tgl@sss.pgh.pa.us 88 :UBC 0 : offset = tuple->nNodes;
5202 tgl@sss.pgh.pa.us 89 [ - + ]:CBC 789 : else if (offset > tuple->nNodes)
5202 tgl@sss.pgh.pa.us 90 [ # # ]:UBC 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91 : :
95 michael@paquier.xyz 92 :GNC 789 : nodes = palloc_array(SpGistNodeTuple, tuple->nNodes + 1);
5202 tgl@sss.pgh.pa.us 93 [ + + ]:CBC 5094 : SGITITERATE(tuple, i, node)
94 : : {
95 [ + + ]: 4305 : if (i < offset)
96 : 3716 : nodes[i] = node;
97 : : else
98 : 589 : nodes[i + 1] = node;
99 : : }
100 : :
101 : 789 : nodes[offset] = spgFormNodeTuple(state, label, false);
102 : :
103 : 789 : return spgFormInnerTuple(state,
104 : 789 : (tuple->prefixSize > 0),
105 [ - + ]: 376 : SGITDATUM(tuple, state),
106 [ + + ]: 789 : tuple->nNodes + 1,
107 : : nodes);
108 : : }
109 : :
110 : : /* qsort comparator for sorting OffsetNumbers */
111 : : static int
112 : 3168652 : cmpOffsetNumbers(const void *a, const void *b)
113 : : {
758 nathan@postgresql.or 114 : 3168652 : return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115 : : }
116 : :
117 : : /*
118 : : * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 : : *
120 : : * The first tuple in the given list is replaced with a dead tuple of type
121 : : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 : : * with dead tuples of type "reststate". If either firststate or reststate
123 : : * is REDIRECT, blkno/offnum specify where to link to.
124 : : *
125 : : * NB: this is used during WAL replay, so beware of trying to make it too
126 : : * smart. In particular, it shouldn't use "state" except for calling
127 : : * spgFormDeadTuple(). This is also used in a critical section, so no
128 : : * pallocs either!
129 : : */
130 : : void
5202 tgl@sss.pgh.pa.us 131 : 5371 : spgPageIndexMultiDelete(SpGistState *state, Page page,
132 : : OffsetNumber *itemnos, int nitems,
133 : : int firststate, int reststate,
134 : : BlockNumber blkno, OffsetNumber offnum)
135 : : {
136 : : OffsetNumber firstItem;
137 : : OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 : 5371 : SpGistDeadTuple tuple = NULL;
139 : : int i;
140 : :
141 [ + + ]: 5371 : if (nitems == 0)
142 : 467 : return; /* nothing to do */
143 : :
144 : : /*
145 : : * For efficiency we want to use PageIndexMultiDelete, which requires the
146 : : * targets to be listed in sorted order, so we have to sort the itemnos
147 : : * array. (This also greatly simplifies the math for reinserting the
148 : : * replacement tuples.) However, we must not scribble on the caller's
149 : : * array, so we have to make a copy.
150 : : */
151 : 4904 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 [ + + ]: 4904 : if (nitems > 1)
153 : 4471 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 : :
155 : 4904 : PageIndexMultiDelete(page, sortednos, nitems);
156 : :
157 : 4904 : firstItem = itemnos[0];
158 : :
159 [ + + ]: 451971 : for (i = 0; i < nitems; i++)
160 : : {
5026 bruce@momjian.us 161 : 447067 : OffsetNumber itemno = sortednos[i];
162 : : int tupstate;
163 : :
5202 tgl@sss.pgh.pa.us 164 [ + + ]: 447067 : tupstate = (itemno == firstItem) ? firststate : reststate;
165 [ + + + + ]: 447067 : if (tuple == NULL || tuple->tupstate != tupstate)
166 : 7214 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 : :
139 peter@eisentraut.org 168 [ - + ]:GNC 447067 : if (PageAddItem(page, tuple, tuple->size, itemno, false, false) != itemno)
5202 tgl@sss.pgh.pa.us 169 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 : : tuple->size);
171 : :
5202 tgl@sss.pgh.pa.us 172 [ + + ]:CBC 447067 : if (tupstate == SPGIST_REDIRECT)
173 : 1193 : SpGistPageGetOpaque(page)->nRedirection++;
174 [ + - ]: 445874 : else if (tupstate == SPGIST_PLACEHOLDER)
175 : 445874 : SpGistPageGetOpaque(page)->nPlaceholder++;
176 : : }
177 : : }
178 : :
179 : : /*
180 : : * Update the parent inner tuple's downlink, and mark the parent buffer
181 : : * dirty (this must be the last change to the parent page in the current
182 : : * WAL action).
183 : : */
184 : : static void
185 : 4940 : saveNodeLink(Relation index, SPPageDesc *parent,
186 : : BlockNumber blkno, OffsetNumber offnum)
187 : : {
188 : : SpGistInnerTuple innerTuple;
189 : :
190 : 4940 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
3189 191 : 4940 : PageGetItemId(parent->page, parent->offnum));
192 : :
5200 193 : 4940 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194 : :
5202 195 : 4940 : MarkBufferDirty(parent->buffer);
196 : 4940 : }
197 : :
198 : : /*
199 : : * Add a leaf tuple to a leaf page where there is known to be room for it
200 : : */
201 : : static void
202 : 399104 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203 : : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 : : {
205 : : spgxlogAddLeaf xlrec;
206 : :
207 : 399104 : xlrec.newPage = isNew;
5117 208 : 399104 : xlrec.storesNulls = isNulls;
209 : :
210 : : /* these will be filled below as needed */
5202 211 : 399104 : xlrec.offnumLeaf = InvalidOffsetNumber;
212 : 399104 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
213 : 399104 : xlrec.offnumParent = InvalidOffsetNumber;
214 : 399104 : xlrec.nodeI = 0;
215 : :
216 : 399104 : START_CRIT_SECTION();
217 : :
218 [ + + ]: 399104 : if (current->offnum == InvalidOffsetNumber ||
5117 219 [ + + + + ]: 398083 : SpGistBlockIsRoot(current->blkno))
220 : : {
221 : : /* Tuple is not part of a chain */
1805 222 : 9505 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
5202 223 : 19010 : current->offnum = SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 224 :GNC 9505 : leafTuple, leafTuple->size,
225 : : NULL, false);
226 : :
5202 tgl@sss.pgh.pa.us 227 :CBC 9505 : xlrec.offnumLeaf = current->offnum;
228 : :
229 : : /* Must update parent's downlink if any */
230 [ + + ]: 9505 : if (parent->buffer != InvalidBuffer)
231 : : {
232 : 1021 : xlrec.offnumParent = parent->offnum;
233 : 1021 : xlrec.nodeI = parent->node;
234 : :
235 : 1021 : saveNodeLink(index, parent, current->blkno, current->offnum);
236 : : }
237 : : }
238 : : else
239 : : {
240 : : /*
241 : : * Tuple must be inserted into existing chain. We mustn't change the
242 : : * chain's head address, but we don't need to chase the entire chain
243 : : * to put the tuple at the end; we can insert it second.
244 : : *
245 : : * Also, it's possible that the "chain" consists only of a DEAD tuple,
246 : : * in which case we should replace the DEAD tuple in-place.
247 : : */
248 : : SpGistLeafTuple head;
249 : : OffsetNumber offnum;
250 : :
251 : 389599 : head = (SpGistLeafTuple) PageGetItem(current->page,
3189 252 : 389599 : PageGetItemId(current->page, current->offnum));
5202 253 [ + - ]: 389599 : if (head->tupstate == SPGIST_LIVE)
254 : : {
1805 255 : 389599 : SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
5202 256 : 389599 : offnum = SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 257 :GNC 389599 : leafTuple, leafTuple->size,
258 : : NULL, false);
259 : :
260 : : /*
261 : : * re-get head of list because it could have been moved on page,
262 : : * and set new second element
263 : : */
5202 tgl@sss.pgh.pa.us 264 :CBC 389599 : head = (SpGistLeafTuple) PageGetItem(current->page,
3189 265 : 389599 : PageGetItemId(current->page, current->offnum));
1805 266 : 389599 : SGLT_SET_NEXTOFFSET(head, offnum);
267 : :
5202 268 : 389599 : xlrec.offnumLeaf = offnum;
269 : 389599 : xlrec.offnumHeadLeaf = current->offnum;
270 : : }
5202 tgl@sss.pgh.pa.us 271 [ # # ]:UBC 0 : else if (head->tupstate == SPGIST_DEAD)
272 : : {
1805 273 : 0 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
5202 274 : 0 : PageIndexTupleDelete(current->page, current->offnum);
275 : 0 : if (PageAddItem(current->page,
276 : : leafTuple, leafTuple->size,
277 [ # # ]: 0 : current->offnum, false, false) != current->offnum)
278 [ # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
279 : : leafTuple->size);
280 : :
281 : : /* WAL replay distinguishes this case by equal offnums */
282 : 0 : xlrec.offnumLeaf = current->offnum;
283 : 0 : xlrec.offnumHeadLeaf = current->offnum;
284 : : }
285 : : else
286 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
287 : : }
288 : :
5202 tgl@sss.pgh.pa.us 289 :CBC 399104 : MarkBufferDirty(current->buffer);
290 : :
2538 heikki.linnakangas@i 291 [ + + + + : 399104 : if (RelationNeedsWAL(index) && !state->isBuild)
- + - - +
+ ]
292 : : {
293 : : XLogRecPtr recptr;
294 : : int flags;
295 : :
4133 296 : 120372 : XLogBeginInsert();
397 peter@eisentraut.org 297 : 120372 : XLogRegisterData(&xlrec, sizeof(xlrec));
298 : 120372 : XLogRegisterData(leafTuple, leafTuple->size);
299 : :
3891 heikki.linnakangas@i 300 : 120372 : flags = REGBUF_STANDARD;
301 [ + + ]: 120372 : if (xlrec.newPage)
302 : 2 : flags |= REGBUF_WILL_INIT;
303 : 120372 : XLogRegisterBuffer(0, current->buffer, flags);
4133 304 [ + + ]: 120372 : if (xlrec.offnumParent != InvalidOffsetNumber)
305 : 414 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
306 : :
307 : 120372 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
308 : :
5202 tgl@sss.pgh.pa.us 309 : 120372 : PageSetLSN(current->page, recptr);
310 : :
311 : : /* update parent only if we actually changed it */
4133 heikki.linnakangas@i 312 [ + + ]: 120372 : if (xlrec.offnumParent != InvalidOffsetNumber)
313 : : {
5202 tgl@sss.pgh.pa.us 314 : 414 : PageSetLSN(parent->page, recptr);
315 : : }
316 : : }
317 : :
318 [ - + ]: 399104 : END_CRIT_SECTION();
319 : 399104 : }
320 : :
321 : : /*
322 : : * Count the number and total size of leaf tuples in the chain starting at
323 : : * current->offnum. Return number into *nToSplit and total size as function
324 : : * result.
325 : : *
326 : : * Klugy special case when considering the root page (i.e., root is a leaf
327 : : * page, but we're about to split for the first time): return fake large
328 : : * values to force spgdoinsert() to take the doPickSplit rather than
329 : : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
330 : : */
331 : : static int
332 : 3957 : checkSplitConditions(Relation index, SpGistState *state,
333 : : SPPageDesc *current, int *nToSplit)
334 : : {
335 : : int i,
336 : 3957 : n = 0,
337 : 3957 : totalSize = 0;
338 : :
5117 339 [ + + - + ]: 3957 : if (SpGistBlockIsRoot(current->blkno))
340 : : {
341 : : /* return impossible values to force split */
5202 342 : 41 : *nToSplit = BLCKSZ;
343 : 41 : return BLCKSZ;
344 : : }
345 : :
346 : 3916 : i = current->offnum;
347 [ + + ]: 401331 : while (i != InvalidOffsetNumber)
348 : : {
349 : : SpGistLeafTuple it;
350 : :
351 [ + - - + ]: 397415 : Assert(i >= FirstOffsetNumber &&
352 : : i <= PageGetMaxOffsetNumber(current->page));
353 : 397415 : it = (SpGistLeafTuple) PageGetItem(current->page,
354 : 397415 : PageGetItemId(current->page, i));
355 [ + - ]: 397415 : if (it->tupstate == SPGIST_LIVE)
356 : : {
357 : 397415 : n++;
358 : 397415 : totalSize += it->size + sizeof(ItemIdData);
359 : : }
5202 tgl@sss.pgh.pa.us 360 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
361 : : {
362 : : /* We could see a DEAD tuple as first/only chain item */
363 [ # # ]: 0 : Assert(i == current->offnum);
1805 364 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
365 : : /* Don't count it in result, because it won't go to other page */
366 : : }
367 : : else
5202 368 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
369 : :
1805 tgl@sss.pgh.pa.us 370 :CBC 397415 : i = SGLT_GET_NEXTOFFSET(it);
371 : : }
372 : :
5202 373 : 3916 : *nToSplit = n;
374 : :
375 : 3916 : return totalSize;
376 : : }
377 : :
378 : : /*
379 : : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
380 : : * but the chain has to be moved because there's not enough room to add
381 : : * newLeafTuple to its page. We use this method when the chain contains
382 : : * very little data so a split would be inefficient. We are sure we can
383 : : * fit the chain plus newLeafTuple on one other page.
384 : : */
385 : : static void
386 : 1121 : moveLeafs(Relation index, SpGistState *state,
387 : : SPPageDesc *current, SPPageDesc *parent,
388 : : SpGistLeafTuple newLeafTuple, bool isNulls)
389 : : {
390 : : int i,
391 : : nDelete,
392 : : nInsert,
393 : : size;
394 : : Buffer nbuf;
395 : : Page npage;
396 : 1121 : OffsetNumber r = InvalidOffsetNumber,
397 : 1121 : startOffset = InvalidOffsetNumber;
398 : 1121 : bool replaceDead = false;
399 : : OffsetNumber *toDelete;
400 : : OffsetNumber *toInsert;
401 : : BlockNumber nblkno;
402 : : spgxlogMoveLeafs xlrec;
403 : : char *leafdata,
404 : : *leafptr;
405 : :
406 : : /* This doesn't work on root page */
407 [ - + ]: 1121 : Assert(parent->buffer != InvalidBuffer);
408 [ - + ]: 1121 : Assert(parent->buffer != current->buffer);
409 : :
410 : : /* Locate the tuples to be moved, and count up the space needed */
411 : 1121 : i = PageGetMaxOffsetNumber(current->page);
95 michael@paquier.xyz 412 :GNC 1121 : toDelete = palloc_array(OffsetNumber, i);
413 : 1121 : toInsert = palloc_array(OffsetNumber, i + 1);
414 : :
5202 tgl@sss.pgh.pa.us 415 :CBC 1121 : size = newLeafTuple->size + sizeof(ItemIdData);
416 : :
417 : 1121 : nDelete = 0;
418 : 1121 : i = current->offnum;
419 [ + + ]: 45913 : while (i != InvalidOffsetNumber)
420 : : {
421 : : SpGistLeafTuple it;
422 : :
423 [ + - - + ]: 44792 : Assert(i >= FirstOffsetNumber &&
424 : : i <= PageGetMaxOffsetNumber(current->page));
425 : 44792 : it = (SpGistLeafTuple) PageGetItem(current->page,
426 : 44792 : PageGetItemId(current->page, i));
427 : :
428 [ + - ]: 44792 : if (it->tupstate == SPGIST_LIVE)
429 : : {
430 : 44792 : toDelete[nDelete] = i;
431 : 44792 : size += it->size + sizeof(ItemIdData);
432 : 44792 : nDelete++;
433 : : }
5202 tgl@sss.pgh.pa.us 434 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
435 : : {
436 : : /* We could see a DEAD tuple as first/only chain item */
437 [ # # ]: 0 : Assert(i == current->offnum);
1805 438 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
439 : : /* We don't want to move it, so don't count it in size */
5202 440 : 0 : toDelete[nDelete] = i;
441 : 0 : nDelete++;
442 : 0 : replaceDead = true;
443 : : }
444 : : else
445 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
446 : :
1805 tgl@sss.pgh.pa.us 447 :CBC 44792 : i = SGLT_GET_NEXTOFFSET(it);
448 : : }
449 : :
450 : : /* Find a leaf page that will hold them */
5117 451 [ - + ]: 1121 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
452 : : size, &xlrec.newPage);
3616 kgrittn@postgresql.o 453 : 1121 : npage = BufferGetPage(nbuf);
5202 tgl@sss.pgh.pa.us 454 : 1121 : nblkno = BufferGetBlockNumber(nbuf);
455 [ - + ]: 1121 : Assert(nblkno != current->blkno);
456 : :
457 : 1121 : leafdata = leafptr = palloc(size);
458 : :
459 : 1121 : START_CRIT_SECTION();
460 : :
461 : : /* copy all the old tuples to new page, unless they're dead */
462 : 1121 : nInsert = 0;
463 [ + - ]: 1121 : if (!replaceDead)
464 : : {
465 [ + + ]: 45913 : for (i = 0; i < nDelete; i++)
466 : : {
467 : : SpGistLeafTuple it;
468 : :
469 : 44792 : it = (SpGistLeafTuple) PageGetItem(current->page,
3189 470 : 44792 : PageGetItemId(current->page, toDelete[i]));
5202 471 [ - + ]: 44792 : Assert(it->tupstate == SPGIST_LIVE);
472 : :
473 : : /*
474 : : * Update chain link (notice the chain order gets reversed, but we
475 : : * don't care). We're modifying the tuple on the source page
476 : : * here, but it's okay since we're about to delete it.
477 : : */
1805 478 : 44792 : SGLT_SET_NEXTOFFSET(it, r);
479 : :
139 peter@eisentraut.org 480 :GNC 44792 : r = SpGistPageAddNewItem(state, npage, it, it->size, &startOffset, false);
481 : :
5202 tgl@sss.pgh.pa.us 482 :CBC 44792 : toInsert[nInsert] = r;
483 : 44792 : nInsert++;
484 : :
485 : : /* save modified tuple into leafdata as well */
486 : 44792 : memcpy(leafptr, it, it->size);
487 : 44792 : leafptr += it->size;
488 : : }
489 : : }
490 : :
491 : : /* add the new tuple as well */
1805 492 : 1121 : SGLT_SET_NEXTOFFSET(newLeafTuple, r);
139 peter@eisentraut.org 493 :GNC 1121 : r = SpGistPageAddNewItem(state, npage, newLeafTuple, newLeafTuple->size, &startOffset, false);
5202 tgl@sss.pgh.pa.us 494 :CBC 1121 : toInsert[nInsert] = r;
495 : 1121 : nInsert++;
496 : 1121 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
497 : 1121 : leafptr += newLeafTuple->size;
498 : :
499 : : /*
500 : : * Now delete the old tuples, leaving a redirection pointer behind for the
501 : : * first one, unless we're doing an index build; in which case there can't
502 : : * be any concurrent scan so we need not provide a redirect.
503 : : */
504 : 1121 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
3189 505 [ + + ]: 1121 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
506 : : SPGIST_PLACEHOLDER,
507 : : nblkno, r);
508 : :
509 : : /* Update parent's downlink and mark parent page dirty */
5202 510 : 1121 : saveNodeLink(index, parent, nblkno, r);
511 : :
512 : : /* Mark the leaf pages too */
513 : 1121 : MarkBufferDirty(current->buffer);
514 : 1121 : MarkBufferDirty(nbuf);
515 : :
2538 heikki.linnakangas@i 516 [ + + - + : 1121 : if (RelationNeedsWAL(index) && !state->isBuild)
- - - - +
+ ]
517 : : {
518 : : XLogRecPtr recptr;
519 : :
520 : : /* prepare WAL info */
4133 521 : 266 : STORE_STATE(state, xlrec.stateSrc);
522 : :
523 : 266 : xlrec.nMoves = nDelete;
524 : 266 : xlrec.replaceDead = replaceDead;
525 : 266 : xlrec.storesNulls = isNulls;
526 : :
527 : 266 : xlrec.offnumParent = parent->offnum;
528 : 266 : xlrec.nodeI = parent->node;
529 : :
530 : 266 : XLogBeginInsert();
397 peter@eisentraut.org 531 : 266 : XLogRegisterData(&xlrec, SizeOfSpgxlogMoveLeafs);
532 : 266 : XLogRegisterData(toDelete,
533 : : sizeof(OffsetNumber) * nDelete);
534 : 266 : XLogRegisterData(toInsert,
535 : : sizeof(OffsetNumber) * nInsert);
536 : 266 : XLogRegisterData(leafdata, leafptr - leafdata);
537 : :
4133 heikki.linnakangas@i 538 : 266 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
539 [ + + ]: 266 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
540 : 266 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
541 : :
542 : 266 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
543 : :
5202 tgl@sss.pgh.pa.us 544 : 266 : PageSetLSN(current->page, recptr);
545 : 266 : PageSetLSN(npage, recptr);
546 : 266 : PageSetLSN(parent->page, recptr);
547 : : }
548 : :
549 [ - + ]: 1121 : END_CRIT_SECTION();
550 : :
551 : : /* Update local free-space cache and release new buffer */
552 : 1121 : SpGistSetLastUsedPage(index, nbuf);
553 : 1121 : UnlockReleaseBuffer(nbuf);
554 : 1121 : }
555 : :
556 : : /*
557 : : * Update previously-created redirection tuple with appropriate destination
558 : : *
559 : : * We use this when it's not convenient to know the destination first.
560 : : * The tuple should have been made with the "impossible" destination of
561 : : * the metapage.
562 : : */
563 : : static void
564 : 649 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
565 : : BlockNumber blkno, OffsetNumber offnum)
566 : : {
567 : : SpGistDeadTuple dt;
568 : :
569 : 649 : dt = (SpGistDeadTuple) PageGetItem(current->page,
3189 570 : 649 : PageGetItemId(current->page, position));
5202 571 [ - + ]: 649 : Assert(dt->tupstate == SPGIST_REDIRECT);
572 [ - + ]: 649 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
573 : 649 : ItemPointerSet(&dt->pointer, blkno, offnum);
574 : 649 : }
575 : :
576 : : /*
577 : : * Test to see if the user-defined picksplit function failed to do its job,
578 : : * ie, it put all the leaf tuples into the same node.
579 : : * If so, randomly divide the tuples into several nodes (all with the same
580 : : * label) and return true to select allTheSame mode for this inner tuple.
581 : : *
582 : : * (This code is also used to forcibly select allTheSame mode for nulls.)
583 : : *
584 : : * If we know that the leaf tuples wouldn't all fit on one page, then we
585 : : * exclude the last tuple (which is the incoming new tuple that forced a split)
586 : : * from the check to see if more than one node is used. The reason for this
587 : : * is that if the existing tuples are put into only one chain, then even if
588 : : * we move them all to an empty page, there would still not be room for the
589 : : * new tuple, so we'd get into an infinite loop of picksplit attempts.
590 : : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
591 : : * be split across pages. (Exercise for the reader: figure out why this
592 : : * fixes the problem even when there is only one old tuple.)
593 : : */
594 : : static bool
595 : 2836 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
596 : : bool *includeNew)
597 : : {
598 : : int theNode;
599 : : int limit;
600 : : int i;
601 : :
602 : : /* For the moment, assume we can include the new leaf tuple */
603 : 2836 : *includeNew = true;
604 : :
605 : : /* If there's only the new leaf tuple, don't select allTheSame mode */
606 [ + + ]: 2836 : if (in->nTuples <= 1)
607 : 22 : return false;
608 : :
609 : : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
610 [ + + ]: 2814 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
611 : :
612 : : /* Check to see if more than one node is populated */
613 : 2814 : theNode = out->mapTuplesToNodes[0];
614 [ + + ]: 63110 : for (i = 1; i < limit; i++)
615 : : {
616 [ + + ]: 62956 : if (out->mapTuplesToNodes[i] != theNode)
617 : 2660 : return false;
618 : : }
619 : :
620 : : /* Nope, so override the picksplit function's decisions */
621 : :
622 : : /* If the new tuple is in its own node, it can't be included in split */
623 [ + + - + ]: 154 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
5202 tgl@sss.pgh.pa.us 624 :UBC 0 : *includeNew = false;
625 : :
5202 tgl@sss.pgh.pa.us 626 :CBC 154 : out->nNodes = 8; /* arbitrary number of child nodes */
627 : :
628 : : /* Random assignment of tuples to nodes (note we include new tuple) */
629 [ + + ]: 16216 : for (i = 0; i < in->nTuples; i++)
630 : 16062 : out->mapTuplesToNodes[i] = i % out->nNodes;
631 : :
632 : : /* The opclass may not use node labels, but if it does, duplicate 'em */
633 [ + + ]: 154 : if (out->nodeLabels)
634 : : {
5026 bruce@momjian.us 635 : 25 : Datum theLabel = out->nodeLabels[theNode];
636 : :
95 michael@paquier.xyz 637 :GNC 25 : out->nodeLabels = palloc_array(Datum, out->nNodes);
5202 tgl@sss.pgh.pa.us 638 [ + + ]:CBC 225 : for (i = 0; i < out->nNodes; i++)
639 : 200 : out->nodeLabels[i] = theLabel;
640 : : }
641 : :
642 : : /* We don't touch the prefix or the leaf tuple datum assignments */
643 : :
644 : 154 : return true;
645 : : }
646 : :
647 : : /*
648 : : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
649 : : * but the chain has to be split because there's not enough room to add
650 : : * newLeafTuple to its page.
651 : : *
652 : : * This function splits the leaf tuple set according to picksplit's rules,
653 : : * creating one or more new chains that are spread across the current page
654 : : * and an additional leaf page (we assume that two leaf pages will be
655 : : * sufficient). A new inner tuple is created, and the parent downlink
656 : : * pointer is updated to point to that inner tuple instead of the leaf chain.
657 : : *
658 : : * On exit, current contains the address of the new inner tuple.
659 : : *
660 : : * Returns true if we successfully inserted newLeafTuple during this function,
661 : : * false if caller still has to do it (meaning another picksplit operation is
662 : : * probably needed). Failure could occur if the picksplit result is fairly
663 : : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
664 : : * Because we force the picksplit result to be at least two chains, each
665 : : * cycle will get rid of at least one leaf tuple from the chain, so the loop
666 : : * will eventually terminate if lack of balance is the issue. If the tuple
667 : : * is too big, we assume that repeated picksplit operations will eventually
668 : : * make it small enough by repeated prefix-stripping. A broken opclass could
669 : : * make this an infinite loop, though, so spgdoinsert() checks that the
670 : : * leaf datums get smaller each time.
671 : : */
672 : : static bool
673 : 2836 : doPickSplit(Relation index, SpGistState *state,
674 : : SPPageDesc *current, SPPageDesc *parent,
675 : : SpGistLeafTuple newLeafTuple,
676 : : int level, bool isNulls, bool isNew)
677 : : {
678 : 2836 : bool insertedNew = false;
679 : : spgPickSplitIn in;
680 : : spgPickSplitOut out;
681 : : FmgrInfo *procinfo;
682 : : bool includeNew;
683 : : int i,
684 : : max,
685 : : n;
686 : : SpGistInnerTuple innerTuple;
687 : : SpGistNodeTuple node,
688 : : *nodes;
689 : : Buffer newInnerBuffer,
690 : : newLeafBuffer;
691 : : uint8 *leafPageSelect;
692 : : int *leafSizes;
693 : : OffsetNumber *toDelete;
694 : : OffsetNumber *toInsert;
695 : 2836 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
696 : : OffsetNumber startOffsets[2];
697 : : SpGistLeafTuple *oldLeafs;
698 : : SpGistLeafTuple *newLeafs;
699 : : Datum leafDatums[INDEX_MAX_KEYS];
700 : : bool leafIsnulls[INDEX_MAX_KEYS];
701 : : int spaceToDelete;
702 : : int currentFreeSpace;
703 : : int totalLeafSizes;
704 : : bool allTheSame;
705 : : spgxlogPickSplit xlrec;
706 : : char *leafdata,
707 : : *leafptr;
708 : : SPPageDesc saveCurrent;
709 : : int nToDelete,
710 : : nToInsert,
711 : : maxToInclude;
712 : :
713 : 2836 : in.level = level;
714 : :
715 : : /*
716 : : * Allocate per-leaf-tuple work arrays with max possible size
717 : : */
718 : 2836 : max = PageGetMaxOffsetNumber(current->page);
719 : 2836 : n = max + 1;
95 michael@paquier.xyz 720 :GNC 2836 : in.datums = palloc_array(Datum, n);
721 : 2836 : toDelete = palloc_array(OffsetNumber, n);
722 : 2836 : toInsert = palloc_array(OffsetNumber, n);
723 : 2836 : oldLeafs = palloc_array(SpGistLeafTuple, n);
724 : 2836 : newLeafs = palloc_array(SpGistLeafTuple, n);
725 : 2836 : leafPageSelect = palloc_array(uint8, n);
726 : :
5202 tgl@sss.pgh.pa.us 727 :CBC 2836 : STORE_STATE(state, xlrec.stateSrc);
728 : :
729 : : /*
730 : : * Form list of leaf tuples which will be distributed as split result;
731 : : * also, count up the amount of space that will be freed from current.
732 : : * (Note that in the non-root case, we won't actually delete the old
733 : : * tuples, only replace them with redirects or placeholders.)
734 : : */
735 : 2836 : nToInsert = 0;
736 : 2836 : nToDelete = 0;
737 : 2836 : spaceToDelete = 0;
5117 738 [ + + - + ]: 2836 : if (SpGistBlockIsRoot(current->blkno))
739 : : {
740 : : /*
741 : : * We are splitting the root (which up to now is also a leaf page).
742 : : * Its tuples are not linked, so scan sequentially to get them all. We
743 : : * ignore the original value of current->offnum.
744 : : */
5202 745 [ + + ]: 7709 : for (i = FirstOffsetNumber; i <= max; i++)
746 : : {
747 : : SpGistLeafTuple it;
748 : :
749 : 7668 : it = (SpGistLeafTuple) PageGetItem(current->page,
3189 750 : 7668 : PageGetItemId(current->page, i));
5202 751 [ + - ]: 7668 : if (it->tupstate == SPGIST_LIVE)
752 : : {
1809 753 : 7668 : in.datums[nToInsert] =
754 [ + - ]: 7668 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
1805 755 : 7668 : oldLeafs[nToInsert] = it;
5202 756 : 7668 : nToInsert++;
757 : 7668 : toDelete[nToDelete] = i;
758 : 7668 : nToDelete++;
759 : : /* we will delete the tuple altogether, so count full space */
760 : 7668 : spaceToDelete += it->size + sizeof(ItemIdData);
761 : : }
762 : : else /* tuples on root should be live */
5202 tgl@sss.pgh.pa.us 763 [ # # ]:UBC 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
764 : : }
765 : : }
766 : : else
767 : : {
768 : : /* Normal case, just collect the leaf tuples in the chain */
5202 tgl@sss.pgh.pa.us 769 :CBC 2795 : i = current->offnum;
770 [ + + ]: 355418 : while (i != InvalidOffsetNumber)
771 : : {
772 : : SpGistLeafTuple it;
773 : :
774 [ + - - + ]: 352623 : Assert(i >= FirstOffsetNumber && i <= max);
775 : 352623 : it = (SpGistLeafTuple) PageGetItem(current->page,
3189 776 : 352623 : PageGetItemId(current->page, i));
5202 777 [ + - ]: 352623 : if (it->tupstate == SPGIST_LIVE)
778 : : {
1809 779 : 352623 : in.datums[nToInsert] =
780 [ + - ]: 352623 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
1805 781 : 352623 : oldLeafs[nToInsert] = it;
5202 782 : 352623 : nToInsert++;
783 : 352623 : toDelete[nToDelete] = i;
784 : 352623 : nToDelete++;
785 : : /* we will not delete the tuple, only replace with dead */
786 [ - + ]: 352623 : Assert(it->size >= SGDTSIZE);
787 : 352623 : spaceToDelete += it->size - SGDTSIZE;
788 : : }
5202 tgl@sss.pgh.pa.us 789 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
790 : : {
791 : : /* We could see a DEAD tuple as first/only chain item */
792 [ # # ]: 0 : Assert(i == current->offnum);
1805 793 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
5202 794 : 0 : toDelete[nToDelete] = i;
795 : 0 : nToDelete++;
796 : : /* replacing it with redirect will save no space */
797 : : }
798 : : else
799 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
800 : :
1805 tgl@sss.pgh.pa.us 801 :CBC 352623 : i = SGLT_GET_NEXTOFFSET(it);
802 : : }
803 : : }
5202 804 : 2836 : in.nTuples = nToInsert;
805 : :
806 : : /*
807 : : * We may not actually insert new tuple because another picksplit may be
808 : : * necessary due to too large value, but we will try to allocate enough
809 : : * space to include it; and in any case it has to be included in the input
810 : : * for the picksplit function. So don't increment nToInsert yet.
811 : : */
1809 812 : 2836 : in.datums[in.nTuples] =
813 [ + - ]: 2836 : isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
1805 814 : 2836 : oldLeafs[in.nTuples] = newLeafTuple;
5202 815 : 2836 : in.nTuples++;
816 : :
817 : 2836 : memset(&out, 0, sizeof(out));
818 : :
5117 819 [ + - ]: 2836 : if (!isNulls)
820 : : {
821 : : /*
822 : : * Perform split using user-defined method.
823 : : */
824 : 2836 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
825 : 2836 : FunctionCall2Coll(procinfo,
826 : 2836 : index->rd_indcollation[0],
827 : : PointerGetDatum(&in),
828 : : PointerGetDatum(&out));
829 : :
830 : : /*
831 : : * Form new leaf tuples and count up the total space needed.
832 : : */
833 : 2836 : totalLeafSizes = 0;
834 [ + + ]: 365963 : for (i = 0; i < in.nTuples; i++)
835 : : {
1805 836 [ + + ]: 363127 : if (state->leafTupDesc->natts > 1)
837 : 30868 : spgDeformLeafTuple(oldLeafs[i],
838 : : state->leafTupDesc,
839 : : leafDatums,
840 : : leafIsnulls,
841 : : isNulls);
842 : :
843 : 363127 : leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
844 : 363127 : leafIsnulls[spgKeyColumn] = false;
845 : :
846 : 363127 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
847 : : leafDatums,
848 : : leafIsnulls);
5117 849 : 363127 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
850 : : }
851 : : }
852 : : else
853 : : {
854 : : /*
855 : : * Perform dummy split that puts all tuples into one node.
856 : : * checkAllTheSame will override this and force allTheSame mode.
857 : : */
5117 tgl@sss.pgh.pa.us 858 :UBC 0 : out.hasPrefix = false;
859 : 0 : out.nNodes = 1;
860 : 0 : out.nodeLabels = NULL;
95 michael@paquier.xyz 861 :UNC 0 : out.mapTuplesToNodes = palloc0_array(int, in.nTuples);
862 : :
863 : : /*
864 : : * Form new leaf tuples and count up the total space needed.
865 : : */
5117 tgl@sss.pgh.pa.us 866 :UBC 0 : totalLeafSizes = 0;
867 [ # # ]: 0 : for (i = 0; i < in.nTuples; i++)
868 : : {
1805 869 [ # # ]: 0 : if (state->leafTupDesc->natts > 1)
870 : 0 : spgDeformLeafTuple(oldLeafs[i],
871 : : state->leafTupDesc,
872 : : leafDatums,
873 : : leafIsnulls,
874 : : isNulls);
875 : :
876 : : /*
877 : : * Nulls tree can contain only null key values.
878 : : */
879 : 0 : leafDatums[spgKeyColumn] = (Datum) 0;
880 : 0 : leafIsnulls[spgKeyColumn] = true;
881 : :
882 : 0 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
883 : : leafDatums,
884 : : leafIsnulls);
5117 885 : 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
886 : : }
887 : : }
888 : :
889 : : /*
890 : : * Check to see if the picksplit function failed to separate the values,
891 : : * ie, it put them all into the same child node. If so, select allTheSame
892 : : * mode and create a random split instead. See comments for
893 : : * checkAllTheSame as to why we need to know if the new leaf tuples could
894 : : * fit on one page.
895 : : */
5202 tgl@sss.pgh.pa.us 896 :CBC 2836 : allTheSame = checkAllTheSame(&in, &out,
897 : : totalLeafSizes > SPGIST_PAGE_CAPACITY,
898 : : &includeNew);
899 : :
900 : : /*
901 : : * If checkAllTheSame decided we must exclude the new tuple, don't
902 : : * consider it any further.
903 : : */
904 [ + - ]: 2836 : if (includeNew)
905 : 2836 : maxToInclude = in.nTuples;
906 : : else
907 : : {
5202 tgl@sss.pgh.pa.us 908 :UBC 0 : maxToInclude = in.nTuples - 1;
909 : 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
910 : : }
911 : :
912 : : /*
913 : : * Allocate per-node work arrays. Since checkAllTheSame could replace
914 : : * out.nNodes with a value larger than the number of tuples on the input
915 : : * page, we can't allocate these arrays before here.
916 : : */
95 michael@paquier.xyz 917 :GNC 2836 : nodes = palloc_array(SpGistNodeTuple, out.nNodes);
918 : 2836 : leafSizes = palloc0_array(int, out.nNodes);
919 : :
920 : : /*
921 : : * Form nodes of inner tuple and inner tuple itself
922 : : */
5202 tgl@sss.pgh.pa.us 923 [ + + ]:CBC 22010 : for (i = 0; i < out.nNodes; i++)
924 : : {
925 : 19174 : Datum label = (Datum) 0;
5117 926 : 19174 : bool labelisnull = (out.nodeLabels == NULL);
927 : :
928 [ + + ]: 19174 : if (!labelisnull)
5202 929 : 1758 : label = out.nodeLabels[i];
5117 930 : 19174 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
931 : : }
5202 932 : 2836 : innerTuple = spgFormInnerTuple(state,
933 : 2836 : out.hasPrefix, out.prefixDatum,
934 : : out.nNodes, nodes);
935 : 2836 : innerTuple->allTheSame = allTheSame;
936 : :
937 : : /*
938 : : * Update nodes[] array to point into the newly formed innerTuple, so that
939 : : * we can adjust their downlinks below.
940 : : */
941 [ + + ]: 22010 : SGITITERATE(innerTuple, i, node)
942 : : {
943 : 19174 : nodes[i] = node;
944 : : }
945 : :
946 : : /*
947 : : * Re-scan new leaf tuples and count up the space needed under each node.
948 : : */
949 [ + + ]: 365963 : for (i = 0; i < maxToInclude; i++)
950 : : {
951 : 363127 : n = out.mapTuplesToNodes[i];
952 [ + - - + ]: 363127 : if (n < 0 || n >= out.nNodes)
5202 tgl@sss.pgh.pa.us 953 [ # # ]:UBC 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
5202 tgl@sss.pgh.pa.us 954 :CBC 363127 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
955 : : }
956 : :
957 : : /*
958 : : * To perform the split, we must insert a new inner tuple, which can't go
959 : : * on a leaf page; and unless we are splitting the root page, we must then
960 : : * update the parent tuple's downlink to point to the inner tuple. If
961 : : * there is room, we'll put the new inner tuple on the same page as the
962 : : * parent tuple, otherwise we need another non-leaf buffer. But if the
963 : : * parent page is the root, we can't add the new inner tuple there,
964 : : * because the root page must have only one inner tuple.
965 : : */
966 : 2836 : xlrec.initInner = false;
967 [ + + ]: 2836 : if (parent->buffer != InvalidBuffer &&
5117 968 [ + + + - ]: 2795 : !SpGistBlockIsRoot(parent->blkno) &&
5202 969 [ + - ]: 2617 : (SpGistPageGetFreeSpace(parent->page, 1) >=
970 [ + + ]: 2617 : innerTuple->size + sizeof(ItemIdData)))
971 : : {
972 : : /* New inner tuple will fit on parent page */
973 : 2413 : newInnerBuffer = parent->buffer;
974 : : }
975 [ + + ]: 423 : else if (parent->buffer != InvalidBuffer)
976 : : {
977 : : /* Send tuple to page with next triple parity (see README) */
978 : 764 : newInnerBuffer = SpGistGetBuffer(index,
3189 979 : 382 : GBUF_INNER_PARITY(parent->blkno + 1) |
5117 980 : 382 : (isNulls ? GBUF_NULLS : 0),
3189 981 [ - + ]: 382 : innerTuple->size + sizeof(ItemIdData),
982 : : &xlrec.initInner);
983 : : }
984 : : else
985 : : {
986 : : /* Root page split ... inner tuple will go to root page */
5202 987 : 41 : newInnerBuffer = InvalidBuffer;
988 : : }
989 : :
990 : : /*
991 : : * The new leaf tuples converted from the existing ones should require the
992 : : * same or less space, and therefore should all fit onto one page
993 : : * (although that's not necessarily the current page, since we can't
994 : : * delete the old tuples but only replace them with placeholders).
995 : : * However, the incoming new tuple might not also fit, in which case we
996 : : * might need another picksplit cycle to reduce it some more.
997 : : *
998 : : * If there's not room to put everything back onto the current page, then
999 : : * we decide on a per-node basis which tuples go to the new page. (We do
1000 : : * it like that because leaf tuple chains can't cross pages, so we must
1001 : : * place all leaf tuples belonging to the same parent node on the same
1002 : : * page.)
1003 : : *
1004 : : * If we are splitting the root page (turning it from a leaf page into an
1005 : : * inner page), then no leaf tuples can go back to the current page; they
1006 : : * must all go somewhere else.
1007 : : */
5117 1008 [ + + + - ]: 2836 : if (!SpGistBlockIsRoot(current->blkno))
5202 1009 : 2795 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1010 : : else
1011 : 41 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
1012 : :
1013 : 2836 : xlrec.initDest = false;
1014 : :
1015 [ + + ]: 2836 : if (totalLeafSizes <= currentFreeSpace)
1016 : : {
1017 : : /* All the leaf tuples will fit on current page */
1018 : 12 : newLeafBuffer = InvalidBuffer;
1019 : : /* mark new leaf tuple as included in insertions, if allowed */
1020 [ + - ]: 12 : if (includeNew)
1021 : : {
1022 : 12 : nToInsert++;
1023 : 12 : insertedNew = true;
1024 : : }
1025 [ + + ]: 1485 : for (i = 0; i < nToInsert; i++)
3189 1026 : 1473 : leafPageSelect[i] = 0; /* signifies current page */
1027 : : }
5202 1028 [ + + + - ]: 2824 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1029 : : {
1030 : : /*
1031 : : * We're trying to split up a long value by repeated suffixing, but
1032 : : * it's not going to fit yet. Don't bother allocating a second leaf
1033 : : * buffer that we won't be able to use.
1034 : : */
1035 : 22 : newLeafBuffer = InvalidBuffer;
1036 [ - + ]: 22 : Assert(includeNew);
1037 [ - + ]: 22 : Assert(nToInsert == 0);
1038 : : }
1039 : : else
1040 : : {
1041 : : /* We will need another leaf page */
1042 : : uint8 *nodePageSelect;
1043 : : int curspace;
1044 : : int newspace;
1045 : :
5117 1046 [ - + ]: 2802 : newLeafBuffer = SpGistGetBuffer(index,
1047 : : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
5202 1048 [ + + ]: 2802 : Min(totalLeafSizes,
1049 : : SPGIST_PAGE_CAPACITY),
1050 : : &xlrec.initDest);
1051 : :
1052 : : /*
1053 : : * Attempt to assign node groups to the two pages. We might fail to
1054 : : * do so, even if totalLeafSizes is less than the available space,
1055 : : * because we can't split a group across pages.
1056 : : */
95 michael@paquier.xyz 1057 :GNC 2802 : nodePageSelect = palloc_array(uint8, out.nNodes);
1058 : :
5202 tgl@sss.pgh.pa.us 1059 :CBC 2802 : curspace = currentFreeSpace;
3616 kgrittn@postgresql.o 1060 : 2802 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
5202 tgl@sss.pgh.pa.us 1061 [ + + ]: 21930 : for (i = 0; i < out.nNodes; i++)
1062 : : {
1063 [ + + ]: 19128 : if (leafSizes[i] <= curspace)
1064 : : {
5026 bruce@momjian.us 1065 : 12410 : nodePageSelect[i] = 0; /* signifies current page */
5202 tgl@sss.pgh.pa.us 1066 : 12410 : curspace -= leafSizes[i];
1067 : : }
1068 : : else
1069 : : {
5026 bruce@momjian.us 1070 : 6718 : nodePageSelect[i] = 1; /* signifies new leaf page */
5202 tgl@sss.pgh.pa.us 1071 : 6718 : newspace -= leafSizes[i];
1072 : : }
1073 : : }
1074 [ + - + + ]: 2802 : if (curspace >= 0 && newspace >= 0)
1075 : : {
1076 : : /* Successful assignment, so we can include the new leaf tuple */
1077 [ + - ]: 2682 : if (includeNew)
1078 : : {
1079 : 2682 : nToInsert++;
1080 : 2682 : insertedNew = true;
1081 : : }
1082 : : }
1083 [ + - ]: 120 : else if (includeNew)
1084 : : {
1085 : : /* We must exclude the new leaf tuple from the split */
5026 bruce@momjian.us 1086 : 120 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1087 : :
5202 tgl@sss.pgh.pa.us 1088 : 120 : leafSizes[nodeOfNewTuple] -=
1089 : 120 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1090 : :
1091 : : /* Repeat the node assignment process --- should succeed now */
1092 : 120 : curspace = currentFreeSpace;
3616 kgrittn@postgresql.o 1093 : 120 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
5202 tgl@sss.pgh.pa.us 1094 [ + + ]: 582 : for (i = 0; i < out.nNodes; i++)
1095 : : {
1096 [ + + ]: 462 : if (leafSizes[i] <= curspace)
1097 : : {
3189 1098 : 154 : nodePageSelect[i] = 0; /* signifies current page */
5202 1099 : 154 : curspace -= leafSizes[i];
1100 : : }
1101 : : else
1102 : : {
3189 1103 : 308 : nodePageSelect[i] = 1; /* signifies new leaf page */
5202 1104 : 308 : newspace -= leafSizes[i];
1105 : : }
1106 : : }
1107 [ + - - + ]: 120 : if (curspace < 0 || newspace < 0)
5202 tgl@sss.pgh.pa.us 1108 [ # # ]:UBC 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1109 : : }
1110 : : else
1111 : : {
1112 : : /* oops, we already excluded new tuple ... should not get here */
1113 [ # # ]: 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1114 : : }
1115 : : /* Expand the per-node assignments to be shown per leaf tuple */
5202 tgl@sss.pgh.pa.us 1116 [ + + ]:CBC 364314 : for (i = 0; i < nToInsert; i++)
1117 : : {
1118 : 361512 : n = out.mapTuplesToNodes[i];
1119 : 361512 : leafPageSelect[i] = nodePageSelect[n];
1120 : : }
1121 : : }
1122 : :
1123 : : /* Start preparing WAL record */
1124 : 2836 : xlrec.nDelete = 0;
1125 : 2836 : xlrec.initSrc = isNew;
5117 1126 : 2836 : xlrec.storesNulls = isNulls;
4133 heikki.linnakangas@i 1127 [ + + - + ]: 2836 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1128 : :
5202 tgl@sss.pgh.pa.us 1129 : 2836 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
1130 : :
1131 : : /* Here we begin making the changes to the target pages */
1132 : 2836 : START_CRIT_SECTION();
1133 : :
1134 : : /*
1135 : : * Delete old leaf tuples from current buffer, except when we're splitting
1136 : : * the root; in that case there's no need because we'll re-init the page
1137 : : * below. We do this first to make room for reinserting new leaf tuples.
1138 : : */
5117 1139 [ + + + - ]: 2836 : if (!SpGistBlockIsRoot(current->blkno))
1140 : : {
1141 : : /*
1142 : : * Init buffer instead of deleting individual tuples, but only if
1143 : : * there aren't any other live tuples and only during build; otherwise
1144 : : * we need to set a redirection tuple for concurrent scans.
1145 : : */
5202 1146 [ + + ]: 2795 : if (state->isBuild &&
1147 : 2124 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1148 [ + + ]: 2124 : PageGetMaxOffsetNumber(current->page))
1149 : : {
5117 1150 [ - + ]: 154 : SpGistInitBuffer(current->buffer,
1151 : : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
5202 1152 : 154 : xlrec.initSrc = true;
1153 : : }
1154 [ + + ]: 2641 : else if (isNew)
1155 : : {
1156 : : /* don't expose the freshly init'd buffer as a backup block */
1157 [ - + ]: 22 : Assert(nToDelete == 0);
1158 : : }
1159 : : else
1160 : : {
1161 : 2619 : xlrec.nDelete = nToDelete;
1162 : :
1163 [ + + ]: 2619 : if (!state->isBuild)
1164 : : {
1165 : : /*
1166 : : * Need to create redirect tuple (it will point to new inner
1167 : : * tuple) but right now the new tuple's location is not known
1168 : : * yet. So, set the redirection pointer to "impossible" value
1169 : : * and remember its position to update tuple later.
1170 : : */
1171 [ + - ]: 649 : if (nToDelete > 0)
1172 : 649 : redirectTuplePos = toDelete[0];
1173 : 649 : spgPageIndexMultiDelete(state, current->page,
1174 : : toDelete, nToDelete,
1175 : : SPGIST_REDIRECT,
1176 : : SPGIST_PLACEHOLDER,
1177 : : SPGIST_METAPAGE_BLKNO,
1178 : : FirstOffsetNumber);
1179 : : }
1180 : : else
1181 : : {
1182 : : /*
1183 : : * During index build there is not concurrent searches, so we
1184 : : * don't need to create redirection tuple.
1185 : : */
1186 : 1970 : spgPageIndexMultiDelete(state, current->page,
1187 : : toDelete, nToDelete,
1188 : : SPGIST_PLACEHOLDER,
1189 : : SPGIST_PLACEHOLDER,
1190 : : InvalidBlockNumber,
1191 : : InvalidOffsetNumber);
1192 : : }
1193 : : }
1194 : : }
1195 : :
1196 : : /*
1197 : : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1198 : : * nodes.
1199 : : */
1200 : 2836 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1201 [ + + ]: 365821 : for (i = 0; i < nToInsert; i++)
1202 : : {
1203 : 362985 : SpGistLeafTuple it = newLeafs[i];
1204 : : Buffer leafBuffer;
1205 : : BlockNumber leafBlock;
1206 : : OffsetNumber newoffset;
1207 : :
1208 : : /* Which page is it going to? */
1209 [ + + ]: 362985 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1210 : 362985 : leafBlock = BufferGetBlockNumber(leafBuffer);
1211 : :
1212 : : /* Link tuple into correct chain for its node */
1213 : 362985 : n = out.mapTuplesToNodes[i];
1214 : :
1215 [ + + ]: 362985 : if (ItemPointerIsValid(&nodes[n]->t_tid))
1216 : : {
1217 [ - + ]: 352124 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1805 1218 : 352124 : SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1219 : : }
1220 : : else
1221 : 10861 : SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1222 : :
1223 : : /* Insert it on page */
3616 kgrittn@postgresql.o 1224 : 362985 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
139 peter@eisentraut.org 1225 :GNC 362985 : it, it->size,
5202 tgl@sss.pgh.pa.us 1226 :CBC 362985 : &startOffsets[leafPageSelect[i]],
1227 : : false);
1228 : 362985 : toInsert[i] = newoffset;
1229 : :
1230 : : /* ... and complete the chain linking */
1231 : 362985 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1232 : :
1233 : : /* Also copy leaf tuple into WAL data */
1234 : 362985 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1235 : 362985 : leafptr += newLeafs[i]->size;
1236 : : }
1237 : :
1238 : : /*
1239 : : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1240 : : * current->buffer will be marked below, after we're entirely done
1241 : : * modifying it.
1242 : : */
1243 [ + + ]: 2836 : if (newLeafBuffer != InvalidBuffer)
1244 : : {
1245 : 2802 : MarkBufferDirty(newLeafBuffer);
1246 : : }
1247 : :
1248 : : /* Remember current buffer, since we're about to change "current" */
1249 : 2836 : saveCurrent = *current;
1250 : :
1251 : : /*
1252 : : * Store the new innerTuple
1253 : : */
1254 [ + + + + ]: 2836 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1255 : : {
1256 : : /*
1257 : : * new inner tuple goes to parent page
1258 : : */
1259 [ - + ]: 2413 : Assert(current->buffer != parent->buffer);
1260 : :
1261 : : /* Repoint "current" at the new inner tuple */
1262 : 2413 : current->blkno = parent->blkno;
1263 : 2413 : current->buffer = parent->buffer;
1264 : 2413 : current->page = parent->page;
1265 : 2413 : xlrec.offnumInner = current->offnum =
1266 : 2413 : SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 1267 :GNC 2413 : innerTuple, innerTuple->size,
1268 : : NULL, false);
1269 : :
1270 : : /*
1271 : : * Update parent node link and mark parent page dirty
1272 : : */
4133 heikki.linnakangas@i 1273 :CBC 2413 : xlrec.innerIsParent = true;
5202 tgl@sss.pgh.pa.us 1274 : 2413 : xlrec.offnumParent = parent->offnum;
1275 : 2413 : xlrec.nodeI = parent->node;
1276 : 2413 : saveNodeLink(index, parent, current->blkno, current->offnum);
1277 : :
1278 : : /*
1279 : : * Update redirection link (in old current buffer)
1280 : : */
1281 [ + + ]: 2413 : if (redirectTuplePos != InvalidOffsetNumber)
1282 : 588 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1283 : 588 : current->blkno, current->offnum);
1284 : :
1285 : : /* Done modifying old current buffer, mark it dirty */
1286 : 2413 : MarkBufferDirty(saveCurrent.buffer);
1287 : : }
1288 [ + + ]: 423 : else if (parent->buffer != InvalidBuffer)
1289 : : {
1290 : : /*
1291 : : * new inner tuple will be stored on a new page
1292 : : */
1293 [ - + ]: 382 : Assert(newInnerBuffer != InvalidBuffer);
1294 : :
1295 : : /* Repoint "current" at the new inner tuple */
1296 : 382 : current->buffer = newInnerBuffer;
1297 : 382 : current->blkno = BufferGetBlockNumber(current->buffer);
3616 kgrittn@postgresql.o 1298 : 382 : current->page = BufferGetPage(current->buffer);
5202 tgl@sss.pgh.pa.us 1299 : 382 : xlrec.offnumInner = current->offnum =
1300 : 382 : SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 1301 :GNC 382 : innerTuple, innerTuple->size,
1302 : : NULL, false);
1303 : :
1304 : : /* Done modifying new current buffer, mark it dirty */
5202 tgl@sss.pgh.pa.us 1305 :CBC 382 : MarkBufferDirty(current->buffer);
1306 : :
1307 : : /*
1308 : : * Update parent node link and mark parent page dirty
1309 : : */
4133 heikki.linnakangas@i 1310 : 382 : xlrec.innerIsParent = (parent->buffer == current->buffer);
5202 tgl@sss.pgh.pa.us 1311 : 382 : xlrec.offnumParent = parent->offnum;
1312 : 382 : xlrec.nodeI = parent->node;
1313 : 382 : saveNodeLink(index, parent, current->blkno, current->offnum);
1314 : :
1315 : : /*
1316 : : * Update redirection link (in old current buffer)
1317 : : */
1318 [ + + ]: 382 : if (redirectTuplePos != InvalidOffsetNumber)
1319 : 61 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1320 : 61 : current->blkno, current->offnum);
1321 : :
1322 : : /* Done modifying old current buffer, mark it dirty */
1323 : 382 : MarkBufferDirty(saveCurrent.buffer);
1324 : : }
1325 : : else
1326 : : {
1327 : : /*
1328 : : * Splitting root page, which was a leaf but now becomes inner page
1329 : : * (and so "current" continues to point at it)
1330 : : */
5117 1331 [ - + - - ]: 41 : Assert(SpGistBlockIsRoot(current->blkno));
5202 1332 [ - + ]: 41 : Assert(redirectTuplePos == InvalidOffsetNumber);
1333 : :
5117 1334 [ - + ]: 41 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
5202 1335 : 41 : xlrec.initInner = true;
4133 heikki.linnakangas@i 1336 : 41 : xlrec.innerIsParent = false;
1337 : :
5202 tgl@sss.pgh.pa.us 1338 : 41 : xlrec.offnumInner = current->offnum =
139 peter@eisentraut.org 1339 :GNC 41 : PageAddItem(current->page, innerTuple, innerTuple->size,
1340 : : InvalidOffsetNumber, false, false);
5202 tgl@sss.pgh.pa.us 1341 [ - + ]:CBC 41 : if (current->offnum != FirstOffsetNumber)
5202 tgl@sss.pgh.pa.us 1342 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1343 : : innerTuple->size);
1344 : :
1345 : : /* No parent link to update, nor redirection to do */
5202 tgl@sss.pgh.pa.us 1346 :CBC 41 : xlrec.offnumParent = InvalidOffsetNumber;
1347 : 41 : xlrec.nodeI = 0;
1348 : :
1349 : : /* Done modifying new current buffer, mark it dirty */
1350 : 41 : MarkBufferDirty(current->buffer);
1351 : :
1352 : : /* saveCurrent doesn't represent a different buffer */
1353 : 41 : saveCurrent.buffer = InvalidBuffer;
1354 : : }
1355 : :
2538 heikki.linnakangas@i 1356 [ + + - + : 2836 : if (RelationNeedsWAL(index) && !state->isBuild)
- - - - +
+ ]
1357 : : {
1358 : : XLogRecPtr recptr;
1359 : : int flags;
1360 : :
4133 1361 : 681 : XLogBeginInsert();
1362 : :
1363 : 681 : xlrec.nInsert = nToInsert;
397 peter@eisentraut.org 1364 : 681 : XLogRegisterData(&xlrec, SizeOfSpgxlogPickSplit);
1365 : :
1366 : 681 : XLogRegisterData(toDelete,
4133 heikki.linnakangas@i 1367 : 681 : sizeof(OffsetNumber) * xlrec.nDelete);
397 peter@eisentraut.org 1368 : 681 : XLogRegisterData(toInsert,
4133 heikki.linnakangas@i 1369 : 681 : sizeof(OffsetNumber) * xlrec.nInsert);
397 peter@eisentraut.org 1370 : 681 : XLogRegisterData(leafPageSelect,
4133 heikki.linnakangas@i 1371 : 681 : sizeof(uint8) * xlrec.nInsert);
397 peter@eisentraut.org 1372 : 681 : XLogRegisterData(innerTuple, innerTuple->size);
4133 heikki.linnakangas@i 1373 : 681 : XLogRegisterData(leafdata, leafptr - leafdata);
1374 : :
1375 : : /* Old leaf page */
1376 [ + + ]: 681 : if (BufferIsValid(saveCurrent.buffer))
1377 : : {
3891 1378 : 671 : flags = REGBUF_STANDARD;
1379 [ + + ]: 671 : if (xlrec.initSrc)
1380 : 22 : flags |= REGBUF_WILL_INIT;
4133 1381 : 671 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1382 : : }
1383 : :
1384 : : /* New leaf page */
1385 [ + + ]: 681 : if (BufferIsValid(newLeafBuffer))
1386 : : {
1387 : 659 : flags = REGBUF_STANDARD;
1388 [ + + ]: 659 : if (xlrec.initDest)
1389 : 615 : flags |= REGBUF_WILL_INIT;
1390 : 659 : XLogRegisterBuffer(1, newLeafBuffer, flags);
1391 : : }
1392 : :
1393 : : /* Inner page */
3891 1394 : 681 : flags = REGBUF_STANDARD;
1395 [ + + ]: 681 : if (xlrec.initInner)
1396 : 20 : flags |= REGBUF_WILL_INIT;
1397 : 681 : XLogRegisterBuffer(2, current->buffer, flags);
1398 : :
1399 : : /* Parent page, if different from inner page */
4133 1400 [ + + ]: 681 : if (parent->buffer != InvalidBuffer)
1401 : : {
1402 [ + + ]: 671 : if (parent->buffer != current->buffer)
1403 : 61 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1404 : : else
1405 [ - + ]: 610 : Assert(xlrec.innerIsParent);
1406 : : }
1407 : :
1408 : : /* Issue the WAL record */
1409 : 681 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1410 : :
1411 : : /* Update page LSNs on all affected pages */
5202 tgl@sss.pgh.pa.us 1412 [ + + ]: 681 : if (newLeafBuffer != InvalidBuffer)
1413 : : {
3616 kgrittn@postgresql.o 1414 : 659 : Page page = BufferGetPage(newLeafBuffer);
1415 : :
5202 tgl@sss.pgh.pa.us 1416 : 659 : PageSetLSN(page, recptr);
1417 : : }
1418 : :
1419 [ + + ]: 681 : if (saveCurrent.buffer != InvalidBuffer)
1420 : : {
3616 kgrittn@postgresql.o 1421 : 671 : Page page = BufferGetPage(saveCurrent.buffer);
1422 : :
5202 tgl@sss.pgh.pa.us 1423 : 671 : PageSetLSN(page, recptr);
1424 : : }
1425 : :
1426 : 681 : PageSetLSN(current->page, recptr);
1427 : :
1428 [ + + ]: 681 : if (parent->buffer != InvalidBuffer)
1429 : : {
1430 : 671 : PageSetLSN(parent->page, recptr);
1431 : : }
1432 : : }
1433 : :
1434 [ - + ]: 2836 : END_CRIT_SECTION();
1435 : :
1436 : : /* Update local free-space cache and unlock buffers */
1437 [ + + ]: 2836 : if (newLeafBuffer != InvalidBuffer)
1438 : : {
1439 : 2802 : SpGistSetLastUsedPage(index, newLeafBuffer);
1440 : 2802 : UnlockReleaseBuffer(newLeafBuffer);
1441 : : }
1442 [ + + ]: 2836 : if (saveCurrent.buffer != InvalidBuffer)
1443 : : {
1444 : 2795 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1445 : 2795 : UnlockReleaseBuffer(saveCurrent.buffer);
1446 : : }
1447 : :
1448 : 2836 : return insertedNew;
1449 : : }
1450 : :
1451 : : /*
1452 : : * spgMatchNode action: descend to N'th child node of current inner tuple
1453 : : */
1454 : : static void
1455 : 9330646 : spgMatchNodeAction(Relation index, SpGistState *state,
1456 : : SpGistInnerTuple innerTuple,
1457 : : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1458 : : {
1459 : : int i;
1460 : : SpGistNodeTuple node;
1461 : :
1462 : : /* Release previous parent buffer if any */
1463 [ + + ]: 9330646 : if (parent->buffer != InvalidBuffer &&
1464 [ + + ]: 8936203 : parent->buffer != current->buffer)
1465 : : {
1466 : 391964 : SpGistSetLastUsedPage(index, parent->buffer);
1467 : 391964 : UnlockReleaseBuffer(parent->buffer);
1468 : : }
1469 : :
1470 : : /* Repoint parent to specified node of current inner tuple */
1471 : 9330646 : parent->blkno = current->blkno;
1472 : 9330646 : parent->buffer = current->buffer;
1473 : 9330646 : parent->page = current->page;
1474 : 9330646 : parent->offnum = current->offnum;
1475 : 9330646 : parent->node = nodeN;
1476 : :
1477 : : /* Locate that node */
1478 [ + - ]: 15631965 : SGITITERATE(innerTuple, i, node)
1479 : : {
1480 [ + + ]: 15631965 : if (i == nodeN)
1481 : 9330646 : break;
1482 : : }
1483 : :
1484 [ - + ]: 9330646 : if (i != nodeN)
5202 tgl@sss.pgh.pa.us 1485 [ # # ]:UBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1486 : : nodeN);
1487 : :
1488 : : /* Point current to the downlink location, if any */
5202 tgl@sss.pgh.pa.us 1489 [ + + ]:CBC 9330646 : if (ItemPointerIsValid(&node->t_tid))
1490 : : {
1491 : 9329601 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1492 : 9329601 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1493 : : }
1494 : : else
1495 : : {
1496 : : /* Downlink is empty, so we'll need to find a new page */
1497 : 1045 : current->blkno = InvalidBlockNumber;
1498 : 1045 : current->offnum = InvalidOffsetNumber;
1499 : : }
1500 : :
1501 : 9330646 : current->buffer = InvalidBuffer;
1502 : 9330646 : current->page = NULL;
1503 : 9330646 : }
1504 : :
1505 : : /*
1506 : : * spgAddNode action: add a node to the inner tuple at current
1507 : : */
1508 : : static void
1509 : 789 : spgAddNodeAction(Relation index, SpGistState *state,
1510 : : SpGistInnerTuple innerTuple,
1511 : : SPPageDesc *current, SPPageDesc *parent,
1512 : : int nodeN, Datum nodeLabel)
1513 : : {
1514 : : SpGistInnerTuple newInnerTuple;
1515 : : spgxlogAddNode xlrec;
1516 : :
1517 : : /* Should not be applied to nulls */
5117 1518 [ - + ]: 789 : Assert(!SpGistPageStoresNulls(current->page));
1519 : :
1520 : : /* Construct new inner tuple with additional node */
5202 1521 : 789 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1522 : :
1523 : : /* Prepare WAL record */
1524 : 789 : STORE_STATE(state, xlrec.stateSrc);
1525 : 789 : xlrec.offnum = current->offnum;
1526 : :
1527 : : /* we don't fill these unless we need to change the parent downlink */
4133 heikki.linnakangas@i 1528 : 789 : xlrec.parentBlk = -1;
5202 tgl@sss.pgh.pa.us 1529 : 789 : xlrec.offnumParent = InvalidOffsetNumber;
1530 : 789 : xlrec.nodeI = 0;
1531 : :
1532 : : /* we don't fill these unless tuple has to be moved */
1533 : 789 : xlrec.offnumNew = InvalidOffsetNumber;
1534 : 789 : xlrec.newPage = false;
1535 : :
1536 : 789 : if (PageGetExactFreeSpace(current->page) >=
1537 [ + + ]: 789 : newInnerTuple->size - innerTuple->size)
1538 : : {
1539 : : /*
1540 : : * We can replace the inner tuple by new version in-place
1541 : : */
1542 : 786 : START_CRIT_SECTION();
1543 : :
1544 : 786 : PageIndexTupleDelete(current->page, current->offnum);
1545 : 786 : if (PageAddItem(current->page,
1546 : : newInnerTuple, newInnerTuple->size,
1547 [ - + ]: 786 : current->offnum, false, false) != current->offnum)
5202 tgl@sss.pgh.pa.us 1548 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1549 : : newInnerTuple->size);
1550 : :
5202 tgl@sss.pgh.pa.us 1551 :CBC 786 : MarkBufferDirty(current->buffer);
1552 : :
2538 heikki.linnakangas@i 1553 [ + - - + : 786 : if (RelationNeedsWAL(index) && !state->isBuild)
- - - - +
+ ]
1554 : : {
1555 : : XLogRecPtr recptr;
1556 : :
4133 1557 : 354 : XLogBeginInsert();
397 peter@eisentraut.org 1558 : 354 : XLogRegisterData(&xlrec, sizeof(xlrec));
1559 : 354 : XLogRegisterData(newInnerTuple, newInnerTuple->size);
1560 : :
4133 heikki.linnakangas@i 1561 : 354 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1562 : :
1563 : 354 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1564 : :
5202 tgl@sss.pgh.pa.us 1565 : 354 : PageSetLSN(current->page, recptr);
1566 : : }
1567 : :
1568 [ - + ]: 786 : END_CRIT_SECTION();
1569 : : }
1570 : : else
1571 : : {
1572 : : /*
1573 : : * move inner tuple to another page, and update parent
1574 : : */
1575 : : SpGistDeadTuple dt;
1576 : : SPPageDesc saveCurrent;
1577 : :
1578 : : /*
1579 : : * It should not be possible to get here for the root page, since we
1580 : : * allow only one inner tuple on the root page, and spgFormInnerTuple
1581 : : * always checks that inner tuples don't exceed the size of a page.
1582 : : */
5117 1583 [ + - - + ]: 3 : if (SpGistBlockIsRoot(current->blkno))
5202 tgl@sss.pgh.pa.us 1584 [ # # ]:UBC 0 : elog(ERROR, "cannot enlarge root tuple any more");
5202 tgl@sss.pgh.pa.us 1585 [ - + ]:CBC 3 : Assert(parent->buffer != InvalidBuffer);
1586 : :
1587 : 3 : saveCurrent = *current;
1588 : :
1589 : 3 : xlrec.offnumParent = parent->offnum;
1590 : 3 : xlrec.nodeI = parent->node;
1591 : :
1592 : : /*
1593 : : * obtain new buffer with the same parity as current, since it will be
1594 : : * a child of same parent tuple
1595 : : */
1596 : 6 : current->buffer = SpGistGetBuffer(index,
1597 : 3 : GBUF_INNER_PARITY(current->blkno),
3189 1598 : 3 : newInnerTuple->size + sizeof(ItemIdData),
1599 : : &xlrec.newPage);
5202 1600 : 3 : current->blkno = BufferGetBlockNumber(current->buffer);
3616 kgrittn@postgresql.o 1601 : 3 : current->page = BufferGetPage(current->buffer);
1602 : :
1603 : : /*
1604 : : * Let's just make real sure new current isn't same as old. Right now
1605 : : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1606 : : * delete placeholder tuples before checking space, maybe it wouldn't
1607 : : * be impossible. The case would appear to work except that WAL
1608 : : * replay would be subtly wrong, so I think a mere assert isn't enough
1609 : : * here.
1610 : : */
4133 heikki.linnakangas@i 1611 [ - + ]: 3 : if (current->blkno == saveCurrent.blkno)
5026 bruce@momjian.us 1612 [ # # ]:UBC 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1613 : :
1614 : : /*
1615 : : * New current and parent buffer will both be modified; but note that
1616 : : * parent buffer could be same as either new or old current.
1617 : : */
4133 heikki.linnakangas@i 1618 [ - + ]:CBC 3 : if (parent->buffer == saveCurrent.buffer)
4133 heikki.linnakangas@i 1619 :UBC 0 : xlrec.parentBlk = 0;
4133 heikki.linnakangas@i 1620 [ - + ]:CBC 3 : else if (parent->buffer == current->buffer)
4133 heikki.linnakangas@i 1621 :UBC 0 : xlrec.parentBlk = 1;
1622 : : else
4133 heikki.linnakangas@i 1623 :CBC 3 : xlrec.parentBlk = 2;
1624 : :
5202 tgl@sss.pgh.pa.us 1625 : 3 : START_CRIT_SECTION();
1626 : :
1627 : : /* insert new ... */
1628 : 3 : xlrec.offnumNew = current->offnum =
1629 : 3 : SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 1630 :GNC 3 : newInnerTuple, newInnerTuple->size,
1631 : : NULL, false);
1632 : :
5202 tgl@sss.pgh.pa.us 1633 :CBC 3 : MarkBufferDirty(current->buffer);
1634 : :
1635 : : /* update parent's downlink and mark parent page dirty */
1636 : 3 : saveNodeLink(index, parent, current->blkno, current->offnum);
1637 : :
1638 : : /*
1639 : : * Replace old tuple with a placeholder or redirection tuple. Unless
1640 : : * doing an index build, we have to insert a redirection tuple for
1641 : : * possible concurrent scans. We can't just delete it in any case,
1642 : : * because that could change the offsets of other tuples on the page,
1643 : : * breaking downlinks from their parents.
1644 : : */
1645 [ - + ]: 3 : if (state->isBuild)
5202 tgl@sss.pgh.pa.us 1646 :UBC 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1647 : : InvalidBlockNumber, InvalidOffsetNumber);
1648 : : else
5202 tgl@sss.pgh.pa.us 1649 :CBC 3 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1650 : 3 : current->blkno, current->offnum);
1651 : :
1652 : 3 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
139 peter@eisentraut.org 1653 :GNC 3 : if (PageAddItem(saveCurrent.page, dt, dt->size,
1654 : : saveCurrent.offnum,
5202 tgl@sss.pgh.pa.us 1655 [ - + ]:CBC 3 : false, false) != saveCurrent.offnum)
5202 tgl@sss.pgh.pa.us 1656 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1657 : : dt->size);
1658 : :
5202 tgl@sss.pgh.pa.us 1659 [ - + ]:CBC 3 : if (state->isBuild)
5202 tgl@sss.pgh.pa.us 1660 :UBC 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1661 : : else
5202 tgl@sss.pgh.pa.us 1662 :CBC 3 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1663 : :
1664 : 3 : MarkBufferDirty(saveCurrent.buffer);
1665 : :
2538 heikki.linnakangas@i 1666 [ + - - + : 3 : if (RelationNeedsWAL(index) && !state->isBuild)
- - - - +
- ]
1667 : : {
1668 : : XLogRecPtr recptr;
1669 : : int flags;
1670 : :
4133 1671 : 3 : XLogBeginInsert();
1672 : :
1673 : : /* orig page */
1674 : 3 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1675 : : /* new page */
3891 1676 : 3 : flags = REGBUF_STANDARD;
1677 [ + - ]: 3 : if (xlrec.newPage)
1678 : 3 : flags |= REGBUF_WILL_INIT;
1679 : 3 : XLogRegisterBuffer(1, current->buffer, flags);
1680 : : /* parent page (if different from orig and new) */
4133 1681 [ + - ]: 3 : if (xlrec.parentBlk == 2)
1682 : 3 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1683 : :
397 peter@eisentraut.org 1684 : 3 : XLogRegisterData(&xlrec, sizeof(xlrec));
1685 : 3 : XLogRegisterData(newInnerTuple, newInnerTuple->size);
1686 : :
4133 heikki.linnakangas@i 1687 : 3 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1688 : :
1689 : : /* we don't bother to check if any of these are redundant */
5202 tgl@sss.pgh.pa.us 1690 : 3 : PageSetLSN(current->page, recptr);
1691 : 3 : PageSetLSN(parent->page, recptr);
1692 : 3 : PageSetLSN(saveCurrent.page, recptr);
1693 : : }
1694 : :
1695 [ - + ]: 3 : END_CRIT_SECTION();
1696 : :
1697 : : /* Release saveCurrent if it's not same as current or parent */
1698 [ + - ]: 3 : if (saveCurrent.buffer != current->buffer &&
1699 [ + - ]: 3 : saveCurrent.buffer != parent->buffer)
1700 : : {
1701 : 3 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1702 : 3 : UnlockReleaseBuffer(saveCurrent.buffer);
1703 : : }
1704 : : }
1705 : 789 : }
1706 : :
1707 : : /*
1708 : : * spgSplitNode action: split inner tuple at current into prefix and postfix
1709 : : */
1710 : : static void
1711 : 321 : spgSplitNodeAction(Relation index, SpGistState *state,
1712 : : SpGistInnerTuple innerTuple,
1713 : : SPPageDesc *current, spgChooseOut *out)
1714 : : {
1715 : : SpGistInnerTuple prefixTuple,
1716 : : postfixTuple;
1717 : : SpGistNodeTuple node,
1718 : : *nodes;
1719 : : BlockNumber postfixBlkno;
1720 : : OffsetNumber postfixOffset;
1721 : : int i;
1722 : : spgxlogSplitTuple xlrec;
1723 : 321 : Buffer newBuffer = InvalidBuffer;
1724 : :
1725 : : /* Should not be applied to nulls */
5117 1726 [ - + ]: 321 : Assert(!SpGistPageStoresNulls(current->page));
1727 : :
1728 : : /* Check opclass gave us sane values */
3491 1729 [ + - ]: 321 : if (out->result.splitTuple.prefixNNodes <= 0 ||
1730 [ - + ]: 321 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
3491 tgl@sss.pgh.pa.us 1731 [ # # ]:UBC 0 : elog(ERROR, "invalid number of prefix nodes: %d",
1732 : : out->result.splitTuple.prefixNNodes);
3491 tgl@sss.pgh.pa.us 1733 [ + - ]:CBC 321 : if (out->result.splitTuple.childNodeN < 0 ||
1734 : 321 : out->result.splitTuple.childNodeN >=
1735 [ - + ]: 321 : out->result.splitTuple.prefixNNodes)
3491 tgl@sss.pgh.pa.us 1736 [ # # ]:UBC 0 : elog(ERROR, "invalid child node number: %d",
1737 : : out->result.splitTuple.childNodeN);
1738 : :
1739 : : /*
1740 : : * Construct new prefix tuple with requested number of nodes. We'll fill
1741 : : * in the childNodeN'th node's downlink below.
1742 : : */
95 michael@paquier.xyz 1743 :GNC 321 : nodes = palloc_array(SpGistNodeTuple, out->result.splitTuple.prefixNNodes);
1744 : :
3491 tgl@sss.pgh.pa.us 1745 [ + + ]:CBC 642 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1746 : : {
1747 : 321 : Datum label = (Datum) 0;
1748 : : bool labelisnull;
1749 : :
1750 : 321 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1751 [ + - ]: 321 : if (!labelisnull)
1752 : 321 : label = out->result.splitTuple.prefixNodeLabels[i];
1753 : 321 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1754 : : }
1755 : :
5202 1756 : 321 : prefixTuple = spgFormInnerTuple(state,
1757 : 321 : out->result.splitTuple.prefixHasPrefix,
1758 : : out->result.splitTuple.prefixPrefixDatum,
1759 : : out->result.splitTuple.prefixNNodes,
1760 : : nodes);
1761 : :
1762 : : /* it must fit in the space that innerTuple now occupies */
1763 [ - + ]: 321 : if (prefixTuple->size > innerTuple->size)
5202 tgl@sss.pgh.pa.us 1764 [ # # ]:UBC 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1765 : :
1766 : : /*
1767 : : * Construct new postfix tuple, containing all nodes of innerTuple with
1768 : : * same node datums, but with the prefix specified by the picksplit
1769 : : * function.
1770 : : */
95 michael@paquier.xyz 1771 :GNC 321 : nodes = palloc_array(SpGistNodeTuple, innerTuple->nNodes);
5202 tgl@sss.pgh.pa.us 1772 [ + + ]:CBC 1111 : SGITITERATE(innerTuple, i, node)
1773 : : {
1774 : 790 : nodes[i] = node;
1775 : : }
1776 : :
1777 : 321 : postfixTuple = spgFormInnerTuple(state,
1778 : 321 : out->result.splitTuple.postfixHasPrefix,
1779 : : out->result.splitTuple.postfixPrefixDatum,
1780 : 321 : innerTuple->nNodes, nodes);
1781 : :
1782 : : /* Postfix tuple is allTheSame if original tuple was */
1783 : 321 : postfixTuple->allTheSame = innerTuple->allTheSame;
1784 : :
1785 : : /* prep data for WAL record */
1786 : 321 : xlrec.newPage = false;
1787 : :
1788 : : /*
1789 : : * If we can't fit both tuples on the current page, get a new page for the
1790 : : * postfix tuple. In particular, can't split to the root page.
1791 : : *
1792 : : * For the space calculation, note that prefixTuple replaces innerTuple
1793 : : * but postfixTuple will be a new entry.
1794 : : */
5117 1795 [ + + + - ]: 321 : if (SpGistBlockIsRoot(current->blkno) ||
5202 1796 [ + - ]: 315 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1797 [ + + ]: 315 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1798 : : {
1799 : : /*
1800 : : * Choose page with next triple parity, because postfix tuple is a
1801 : : * child of prefix one
1802 : : */
1803 : 84 : newBuffer = SpGistGetBuffer(index,
1804 : 84 : GBUF_INNER_PARITY(current->blkno + 1),
1805 : 84 : postfixTuple->size + sizeof(ItemIdData),
1806 : : &xlrec.newPage);
1807 : : }
1808 : :
1809 : 321 : START_CRIT_SECTION();
1810 : :
1811 : : /*
1812 : : * Replace old tuple by prefix tuple
1813 : : */
1814 : 321 : PageIndexTupleDelete(current->page, current->offnum);
1815 : 321 : xlrec.offnumPrefix = PageAddItem(current->page,
1816 : : prefixTuple, prefixTuple->size,
1817 : : current->offnum, false, false);
1818 [ - + ]: 321 : if (xlrec.offnumPrefix != current->offnum)
5202 tgl@sss.pgh.pa.us 1819 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1820 : : prefixTuple->size);
1821 : :
1822 : : /*
1823 : : * put postfix tuple into appropriate page
1824 : : */
5202 tgl@sss.pgh.pa.us 1825 [ + + ]:CBC 321 : if (newBuffer == InvalidBuffer)
1826 : : {
4133 heikki.linnakangas@i 1827 : 237 : postfixBlkno = current->blkno;
5202 tgl@sss.pgh.pa.us 1828 : 237 : xlrec.offnumPostfix = postfixOffset =
1829 : 237 : SpGistPageAddNewItem(state, current->page,
139 peter@eisentraut.org 1830 :GNC 237 : postfixTuple, postfixTuple->size,
1831 : : NULL, false);
4133 heikki.linnakangas@i 1832 :CBC 237 : xlrec.postfixBlkSame = true;
1833 : : }
1834 : : else
1835 : : {
1836 : 84 : postfixBlkno = BufferGetBlockNumber(newBuffer);
5202 tgl@sss.pgh.pa.us 1837 : 84 : xlrec.offnumPostfix = postfixOffset =
3616 kgrittn@postgresql.o 1838 : 84 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
139 peter@eisentraut.org 1839 :GNC 84 : postfixTuple, postfixTuple->size,
1840 : : NULL, false);
5202 tgl@sss.pgh.pa.us 1841 :CBC 84 : MarkBufferDirty(newBuffer);
4133 heikki.linnakangas@i 1842 : 84 : xlrec.postfixBlkSame = false;
1843 : : }
1844 : :
1845 : : /*
1846 : : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1847 : : * (We can't avoid this step by doing the above two steps in opposite
1848 : : * order, because there might not be enough space on the page to insert
1849 : : * the postfix tuple first.) We have to update the local copy of the
1850 : : * prefixTuple too, because that's what will be written to WAL.
1851 : : */
3491 tgl@sss.pgh.pa.us 1852 : 321 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1853 : : postfixBlkno, postfixOffset);
5202 1854 : 321 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
3189 1855 : 321 : PageGetItemId(current->page, current->offnum));
3491 1856 : 321 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1857 : : postfixBlkno, postfixOffset);
1858 : :
5202 1859 : 321 : MarkBufferDirty(current->buffer);
1860 : :
2538 heikki.linnakangas@i 1861 [ + - - + : 321 : if (RelationNeedsWAL(index) && !state->isBuild)
- - - - +
+ ]
1862 : : {
1863 : : XLogRecPtr recptr;
1864 : :
4133 1865 : 305 : XLogBeginInsert();
397 peter@eisentraut.org 1866 : 305 : XLogRegisterData(&xlrec, sizeof(xlrec));
1867 : 305 : XLogRegisterData(prefixTuple, prefixTuple->size);
1868 : 305 : XLogRegisterData(postfixTuple, postfixTuple->size);
1869 : :
4133 heikki.linnakangas@i 1870 : 305 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1871 [ + + ]: 305 : if (newBuffer != InvalidBuffer)
1872 : : {
1873 : : int flags;
1874 : :
1875 : 81 : flags = REGBUF_STANDARD;
1876 [ + + ]: 81 : if (xlrec.newPage)
1877 : 3 : flags |= REGBUF_WILL_INIT;
1878 : 81 : XLogRegisterBuffer(1, newBuffer, flags);
1879 : : }
1880 : :
1881 : 305 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1882 : :
5202 tgl@sss.pgh.pa.us 1883 : 305 : PageSetLSN(current->page, recptr);
1884 : :
1885 [ + + ]: 305 : if (newBuffer != InvalidBuffer)
1886 : : {
3616 kgrittn@postgresql.o 1887 : 81 : PageSetLSN(BufferGetPage(newBuffer), recptr);
1888 : : }
1889 : : }
1890 : :
5202 tgl@sss.pgh.pa.us 1891 [ - + ]: 321 : END_CRIT_SECTION();
1892 : :
1893 : : /* Update local free-space cache and release buffer */
1894 [ + + ]: 321 : if (newBuffer != InvalidBuffer)
1895 : : {
1896 : 84 : SpGistSetLastUsedPage(index, newBuffer);
1897 : 84 : UnlockReleaseBuffer(newBuffer);
1898 : : }
1899 : 321 : }
1900 : :
1901 : : /*
1902 : : * Insert one item into the index.
1903 : : *
1904 : : * Returns true on success, false if we failed to complete the insertion
1905 : : * (typically because of conflict with a concurrent insert). In the latter
1906 : : * case, caller should re-call spgdoinsert() with the same args.
1907 : : */
1908 : : bool
1909 : 402932 : spgdoinsert(Relation index, SpGistState *state,
1910 : : const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
1911 : : {
1766 1912 : 402932 : bool result = true;
1805 1913 : 402932 : TupleDesc leafDescriptor = state->leafTupDesc;
1914 : 402932 : bool isnull = isnulls[spgKeyColumn];
5202 1915 : 402932 : int level = 0;
1916 : : Datum leafDatums[INDEX_MAX_KEYS];
1917 : : int leafSize;
1918 : : int bestLeafSize;
1766 1919 : 402932 : int numNoProgressCycles = 0;
1920 : : SPPageDesc current,
1921 : : parent;
4946 heikki.linnakangas@i 1922 : 402932 : FmgrInfo *procinfo = NULL;
1923 : :
1924 : : /*
1925 : : * Look up FmgrInfo of the user-defined choose function once, to save
1926 : : * cycles in the loop below.
1927 : : */
1928 [ + + ]: 402932 : if (!isnull)
1929 : 402887 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1930 : :
1931 : : /*
1932 : : * Prepare the leaf datum to insert.
1933 : : *
1934 : : * If an optional "compress" method is provided, then call it to form the
1935 : : * leaf key datum from the input datum. Otherwise, store the input datum
1936 : : * as is. Since we don't use index_form_tuple in this AM, we have to make
1937 : : * sure value to be inserted is not toasted; FormIndexDatum doesn't
1938 : : * guarantee that. But we assume the "compress" method to return an
1939 : : * untoasted value.
1940 : : */
3005 teodor@sigaev.ru 1941 [ + + ]: 402932 : if (!isnull)
1942 : : {
1943 [ + + ]: 402887 : if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1944 : : {
1945 : 39921 : FmgrInfo *compressProcinfo = NULL;
1946 : :
1947 : 39921 : compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1805 tgl@sss.pgh.pa.us 1948 : 39921 : leafDatums[spgKeyColumn] =
1949 : 39921 : FunctionCall1Coll(compressProcinfo,
1950 : 39921 : index->rd_indcollation[spgKeyColumn],
1951 : : datums[spgKeyColumn]);
1952 : : }
1953 : : else
1954 : : {
3005 teodor@sigaev.ru 1955 [ - + ]: 362966 : Assert(state->attLeafType.type == state->attType.type);
1956 : :
1957 [ + + ]: 362966 : if (state->attType.attlen == -1)
1805 tgl@sss.pgh.pa.us 1958 : 89744 : leafDatums[spgKeyColumn] =
1959 : 89744 : PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1960 : : else
1961 : 273222 : leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1962 : : }
1963 : : }
1964 : : else
1965 : 45 : leafDatums[spgKeyColumn] = (Datum) 0;
1966 : :
1967 : : /* Likewise, ensure that any INCLUDE values are not toasted */
1968 [ + + ]: 449777 : for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1969 : : {
1970 [ + + ]: 46845 : if (!isnulls[i])
1971 : : {
450 drowley@postgresql.o 1972 [ + + ]: 43256 : if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1805 tgl@sss.pgh.pa.us 1973 : 6918 : leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1974 : : else
1975 : 36338 : leafDatums[i] = datums[i];
1976 : : }
1977 : : else
1978 : 3589 : leafDatums[i] = (Datum) 0;
1979 : : }
1980 : :
1981 : : /*
1982 : : * Compute space needed for a leaf tuple containing the given data.
1983 : : */
1984 : 402932 : leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1985 : : /* Account for an item pointer, too */
1809 1986 : 402932 : leafSize += sizeof(ItemIdData);
1987 : :
1988 : : /*
1989 : : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1990 : : * suffixing, bail out now rather than doing a lot of useless work.
1991 : : */
1766 1992 [ + + + - ]: 402932 : if (leafSize > SPGIST_PAGE_CAPACITY &&
1993 [ - + ]: 2 : (isnull || !state->config.longValuesOK))
5202 tgl@sss.pgh.pa.us 1994 [ # # ]:UBC 0 : ereport(ERROR,
1995 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1996 : : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1997 : : leafSize - sizeof(ItemIdData),
1998 : : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1999 : : RelationGetRelationName(index)),
2000 : : errhint("Values larger than a buffer page cannot be indexed.")));
1766 tgl@sss.pgh.pa.us 2001 :CBC 402932 : bestLeafSize = leafSize;
2002 : :
2003 : : /* Initialize "current" to the appropriate root page */
5117 2004 [ + + ]: 402932 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
5202 2005 : 402932 : current.buffer = InvalidBuffer;
2006 : 402932 : current.page = NULL;
2007 : 402932 : current.offnum = FirstOffsetNumber;
2008 : 402932 : current.node = -1;
2009 : :
2010 : : /* "parent" is invalid for the moment */
2011 : 402932 : parent.blkno = InvalidBlockNumber;
2012 : 402932 : parent.buffer = InvalidBuffer;
2013 : 402932 : parent.page = NULL;
2014 : 402932 : parent.offnum = InvalidOffsetNumber;
2015 : 402932 : parent.node = -1;
2016 : :
2017 : : /*
2018 : : * Before entering the loop, try to clear any pending interrupt condition.
2019 : : * If a query cancel is pending, we might as well accept it now not later;
2020 : : * while if a non-canceling condition is pending, servicing it here avoids
2021 : : * having to restart the insertion and redo all the work so far.
2022 : : */
1766 2023 [ - + ]: 402932 : CHECK_FOR_INTERRUPTS();
2024 : :
2025 : : for (;;)
5202 2026 : 9330644 : {
2027 : 9733576 : bool isNew = false;
2028 : :
2029 : : /*
2030 : : * Bail out if query cancel is pending. We must have this somewhere
2031 : : * in the loop since a broken opclass could produce an infinite
2032 : : * picksplit loop. However, because we'll be holding buffer lock(s)
2033 : : * after the first iteration, ProcessInterrupts() wouldn't be able to
2034 : : * throw a cancel error here. Hence, if we see that an interrupt is
2035 : : * pending, break out of the loop and deal with the situation below.
2036 : : * Set result = false because we must restart the insertion if the
2037 : : * interrupt isn't a query-cancel-or-die case.
2038 : : */
1766 2039 [ - + ]: 9733576 : if (INTERRUPTS_PENDING_CONDITION())
2040 : : {
1766 tgl@sss.pgh.pa.us 2041 :LBC (1) : result = false;
2042 : (1) : break;
2043 : : }
2044 : :
5202 tgl@sss.pgh.pa.us 2045 [ + + ]:CBC 9733576 : if (current.blkno == InvalidBlockNumber)
2046 : : {
2047 : : /*
2048 : : * Create a leaf page. If leafSize is too large to fit on a page,
2049 : : * we won't actually use the page yet, but it simplifies the API
2050 : : * for doPickSplit to always have a leaf page at hand; so just
2051 : : * quietly limit our request to a page size.
2052 : : */
5117 2053 : 1043 : current.buffer =
2054 [ - + ]: 1043 : SpGistGetBuffer(index,
2055 : : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2056 [ + + ]: 1043 : Min(leafSize, SPGIST_PAGE_CAPACITY),
2057 : : &isNew);
5202 2058 : 1043 : current.blkno = BufferGetBlockNumber(current.buffer);
2059 : : }
4657 2060 [ + + ]: 9732533 : else if (parent.buffer == InvalidBuffer)
2061 : : {
2062 : : /* we hold no parent-page lock, so no deadlock is possible */
5202 2063 : 402932 : current.buffer = ReadBuffer(index, current.blkno);
2064 : 402932 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2065 : : }
4657 2066 [ + + ]: 9329601 : else if (current.blkno != parent.blkno)
2067 : : {
2068 : : /* descend to a new child page */
2069 : 785464 : current.buffer = ReadBuffer(index, current.blkno);
2070 : :
2071 : : /*
2072 : : * Attempt to acquire lock on child page. We must beware of
2073 : : * deadlock against another insertion process descending from that
2074 : : * page to our parent page (see README). If we fail to get lock,
2075 : : * abandon the insertion and tell our caller to start over.
2076 : : *
2077 : : * XXX this could be improved, because failing to get lock on a
2078 : : * buffer is not proof of a deadlock situation; the lock might be
2079 : : * held by a reader, or even just background writer/checkpointer
2080 : : * process. Perhaps it'd be worth retrying after sleeping a bit?
2081 : : */
2082 [ + + ]: 785464 : if (!ConditionalLockBuffer(current.buffer))
2083 : : {
2084 : 11 : ReleaseBuffer(current.buffer);
2085 : 11 : UnlockReleaseBuffer(parent.buffer);
2086 : 11 : return false;
2087 : : }
2088 : : }
2089 : : else
2090 : : {
2091 : : /* inner tuple can be stored on the same page as parent one */
5202 2092 : 8544137 : current.buffer = parent.buffer;
2093 : : }
3616 kgrittn@postgresql.o 2094 : 9733565 : current.page = BufferGetPage(current.buffer);
2095 : :
2096 : : /* should not arrive at a page of the wrong type */
5117 tgl@sss.pgh.pa.us 2097 [ + + - + ]: 19467085 : if (isnull ? !SpGistPageStoresNulls(current.page) :
2098 : 9733520 : SpGistPageStoresNulls(current.page))
5117 tgl@sss.pgh.pa.us 2099 [ # # ]:UBC 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2100 : : current.blkno);
2101 : :
5202 tgl@sss.pgh.pa.us 2102 [ + + ]:CBC 9733565 : if (SpGistPageIsLeaf(current.page))
2103 : : {
2104 : : SpGistLeafTuple leafTuple;
2105 : : int nToSplit,
2106 : : sizeToSplit;
2107 : :
1805 2108 : 403061 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
5202 2109 : 806122 : if (leafTuple->size + sizeof(ItemIdData) <=
2110 [ + + + + ]: 403061 : SpGistPageGetFreeSpace(current.page, 1))
2111 : : {
2112 : : /* it fits on page, so insert it and we're done */
2113 : 399104 : addLeafTuple(index, state, leafTuple,
2114 : : ¤t, &parent, isnull, isNew);
2115 : 402919 : break;
2116 : : }
2117 [ + + ]: 3957 : else if ((sizeToSplit =
2118 : 3957 : checkSplitConditions(index, state, ¤t,
3189 2119 : 1794 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
5202 2120 [ + + ]: 1794 : nToSplit < 64 &&
2121 [ + + ]: 1145 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2122 : : {
2123 : : /*
2124 : : * the amount of data is pretty small, so just move the whole
2125 : : * chain to another leaf page rather than splitting it.
2126 : : */
2127 [ - + ]: 1121 : Assert(!isNew);
5117 2128 : 1121 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
5202 2129 : 1121 : break; /* we're done */
2130 : : }
2131 : : else
2132 : : {
2133 : : /* picksplit */
2134 [ + + ]: 2836 : if (doPickSplit(index, state, ¤t, &parent,
2135 : : leafTuple, level, isnull, isNew))
2136 : 2694 : break; /* doPickSplit installed new tuples */
2137 : :
2138 : : /* leaf tuple will not be inserted yet */
2139 : 142 : pfree(leafTuple);
2140 : :
2141 : : /*
2142 : : * current now describes new inner tuple, go insert into it
2143 : : */
2144 [ - + ]: 142 : Assert(!SpGistPageIsLeaf(current.page));
2145 : 142 : goto process_inner_tuple;
2146 : : }
2147 : : }
2148 : : else /* non-leaf page */
2149 : : {
2150 : : /*
2151 : : * Apply the opclass choose function to figure out how to insert
2152 : : * the given datum into the current inner tuple.
2153 : : */
2154 : : SpGistInnerTuple innerTuple;
2155 : : spgChooseIn in;
2156 : : spgChooseOut out;
2157 : :
2158 : : /*
2159 : : * spgAddNode and spgSplitTuple cases will loop back to here to
2160 : : * complete the insertion operation. Just in case the choose
2161 : : * function is broken and produces add or split requests
2162 : : * repeatedly, check for query cancel (see comments above).
2163 : : */
2164 : 9331614 : process_inner_tuple:
1766 2165 [ - + ]: 9331756 : if (INTERRUPTS_PENDING_CONDITION())
2166 : : {
1766 tgl@sss.pgh.pa.us 2167 :UBC 0 : result = false;
2168 : 0 : break;
2169 : : }
2170 : :
5202 tgl@sss.pgh.pa.us 2171 :CBC 9331756 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
3189 2172 : 9331756 : PageGetItemId(current.page, current.offnum));
2173 : :
1805 2174 : 9331756 : in.datum = datums[spgKeyColumn];
2175 : 9331756 : in.leafDatum = leafDatums[spgKeyColumn];
5202 2176 : 9331756 : in.level = level;
2177 : 9331756 : in.allTheSame = innerTuple->allTheSame;
2178 : 9331756 : in.hasPrefix = (innerTuple->prefixSize > 0);
2179 [ + + + + ]: 9331756 : in.prefixDatum = SGITDATUM(innerTuple, state);
2180 : 9331756 : in.nNodes = innerTuple->nNodes;
2181 : 9331756 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2182 : :
2183 : 9331756 : memset(&out, 0, sizeof(out));
2184 : :
5117 2185 [ + - ]: 9331756 : if (!isnull)
2186 : : {
2187 : : /* use user-defined choose method */
2188 : 9331756 : FunctionCall2Coll(procinfo,
2189 : 9331756 : index->rd_indcollation[0],
2190 : : PointerGetDatum(&in),
2191 : : PointerGetDatum(&out));
2192 : : }
2193 : : else
2194 : : {
2195 : : /* force "match" action (to insert to random subnode) */
5117 tgl@sss.pgh.pa.us 2196 :UBC 0 : out.resultType = spgMatchNode;
2197 : : }
2198 : :
5202 tgl@sss.pgh.pa.us 2199 [ + + ]:CBC 9331756 : if (innerTuple->allTheSame)
2200 : : {
2201 : : /*
2202 : : * It's not allowed to do an AddNode at an allTheSame tuple.
2203 : : * Opclass must say "match", in which case we choose a random
2204 : : * one of the nodes to descend into, or "split".
2205 : : */
2206 [ - + ]: 71594 : if (out.resultType == spgAddNode)
5202 tgl@sss.pgh.pa.us 2207 [ # # ]:UBC 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
5202 tgl@sss.pgh.pa.us 2208 [ + + ]:CBC 71594 : else if (out.resultType == spgMatchNode)
1568 2209 : 71588 : out.result.matchNode.nodeN =
2210 : 71588 : pg_prng_uint64_range(&pg_global_prng_state,
2211 : 71588 : 0, innerTuple->nNodes - 1);
2212 : : }
2213 : :
5202 2214 [ + + + - ]: 9331756 : switch (out.resultType)
2215 : : {
2216 : 9330646 : case spgMatchNode:
2217 : : /* Descend to N'th child node */
2218 : 9330646 : spgMatchNodeAction(index, state, innerTuple,
2219 : : ¤t, &parent,
2220 : : out.result.matchNode.nodeN);
2221 : : /* Adjust level as per opclass request */
2222 : 9330646 : level += out.result.matchNode.levelAdd;
2223 : : /* Replace leafDatum and recompute leafSize */
5117 2224 [ + - ]: 9330646 : if (!isnull)
2225 : : {
1805 2226 : 9330646 : leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2227 : 9330646 : leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2228 : : leafDatums, isnulls);
1809 2229 : 9330646 : leafSize += sizeof(ItemIdData);
2230 : : }
2231 : :
2232 : : /*
2233 : : * Check new tuple size; fail if it can't fit, unless the
2234 : : * opclass says it can handle the situation by suffixing.
2235 : : *
2236 : : * However, the opclass can only shorten the leaf datum,
2237 : : * which may not be enough to ever make the tuple fit,
2238 : : * since INCLUDE columns might alone use more than a page.
2239 : : * Depending on the opclass' behavior, that could lead to
2240 : : * an infinite loop --- spgtextproc.c, for example, will
2241 : : * just repeatedly generate an empty-string leaf datum
2242 : : * once it runs out of data. Actual bugs in opclasses
2243 : : * might cause infinite looping, too. To detect such a
2244 : : * loop, check to see if we are making progress by
2245 : : * reducing the leafSize in each pass. This is a bit
2246 : : * tricky though. Because of alignment considerations,
2247 : : * the total tuple size might not decrease on every pass.
2248 : : * Also, there are edge cases where the choose method
2249 : : * might seem to not make progress for a cycle or two.
2250 : : * Somewhat arbitrarily, we allow up to 10 no-progress
2251 : : * iterations before failing. (This limit should be more
2252 : : * than MAXALIGN, to accommodate opclasses that trim one
2253 : : * byte from the leaf datum per pass.)
2254 : : */
1766 2255 [ + + ]: 9330646 : if (leafSize > SPGIST_PAGE_CAPACITY)
2256 : : {
2257 : 26 : bool ok = false;
2258 : :
2259 [ + - + - ]: 26 : if (state->config.longValuesOK && !isnull)
2260 : : {
2261 [ + + ]: 26 : if (leafSize < bestLeafSize)
2262 : : {
2263 : 2 : ok = true;
2264 : 2 : bestLeafSize = leafSize;
2265 : 2 : numNoProgressCycles = 0;
2266 : : }
2267 [ + + ]: 24 : else if (++numNoProgressCycles < 10)
2268 : 22 : ok = true;
2269 : : }
2270 [ + + ]: 26 : if (!ok)
2271 [ + - ]: 2 : ereport(ERROR,
2272 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2273 : : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2274 : : leafSize - sizeof(ItemIdData),
2275 : : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2276 : : RelationGetRelationName(index)),
2277 : : errhint("Values larger than a buffer page cannot be indexed.")));
2278 : : }
2279 : :
2280 : : /*
2281 : : * Loop around and attempt to insert the new leafDatum at
2282 : : * "current" (which might reference an existing child
2283 : : * tuple, or might be invalid to force us to find a new
2284 : : * page for the tuple).
2285 : : */
5202 2286 : 9330644 : break;
2287 : 789 : case spgAddNode:
2288 : : /* AddNode is not sensible if nodes don't have labels */
2289 [ - + ]: 789 : if (in.nodeLabels == NULL)
5202 tgl@sss.pgh.pa.us 2290 [ # # ]:UBC 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
2291 : : /* Add node to inner tuple, per request */
5202 tgl@sss.pgh.pa.us 2292 :CBC 789 : spgAddNodeAction(index, state, innerTuple,
2293 : : ¤t, &parent,
2294 : : out.result.addNode.nodeN,
2295 : : out.result.addNode.nodeLabel);
2296 : :
2297 : : /*
2298 : : * Retry insertion into the enlarged node. We assume that
2299 : : * we'll get a MatchNode result this time.
2300 : : */
2301 : 789 : goto process_inner_tuple;
2302 : : break;
2303 : 321 : case spgSplitTuple:
2304 : : /* Split inner tuple, per request */
2305 : 321 : spgSplitNodeAction(index, state, innerTuple,
2306 : : ¤t, &out);
2307 : :
2308 : : /* Retry insertion into the split node */
2309 : 321 : goto process_inner_tuple;
2310 : : break;
5202 tgl@sss.pgh.pa.us 2311 :UBC 0 : default:
2312 [ # # ]: 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
2313 : : (int) out.resultType);
2314 : : break;
2315 : : }
2316 : : }
2317 : : } /* end loop */
2318 : :
2319 : : /*
2320 : : * Release any buffers we're still holding. Beware of possibility that
2321 : : * current and parent reference same buffer.
2322 : : */
5202 tgl@sss.pgh.pa.us 2323 [ + - ]:CBC 402919 : if (current.buffer != InvalidBuffer)
2324 : : {
2325 : 402919 : SpGistSetLastUsedPage(index, current.buffer);
2326 : 402919 : UnlockReleaseBuffer(current.buffer);
2327 : : }
2328 [ + + ]: 402919 : if (parent.buffer != InvalidBuffer &&
2329 [ + + ]: 394430 : parent.buffer != current.buffer)
2330 : : {
2331 : 392119 : SpGistSetLastUsedPage(index, parent.buffer);
2332 : 392119 : UnlockReleaseBuffer(parent.buffer);
2333 : : }
2334 : :
2335 : : /*
2336 : : * We do not support being called while some outer function is holding a
2337 : : * buffer lock (or any other reason to postpone query cancels). If that
2338 : : * were the case, telling the caller to retry would create an infinite
2339 : : * loop.
2340 : : */
1766 2341 [ + - + - : 402919 : Assert(INTERRUPTS_CAN_BE_PROCESSED());
- + ]
2342 : :
2343 : : /*
2344 : : * Finally, check for interrupts again. If there was a query cancel,
2345 : : * ProcessInterrupts() will be able to throw the error here. If it was
2346 : : * some other kind of interrupt that can just be cleared, return false to
2347 : : * tell our caller to retry.
2348 : : */
2349 [ + + ]: 402919 : CHECK_FOR_INTERRUPTS();
2350 : :
2351 : 402919 : return result;
2352 : : }
|