Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeHashjoin.c
4 : : * Routines to handle hash join nodes
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeHashjoin.c
12 : : *
13 : : * HASH JOIN
14 : : *
15 : : * This is based on the "hybrid hash join" algorithm described shortly in the
16 : : * following page
17 : : *
18 : : * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
19 : : *
20 : : * and in detail in the referenced paper:
21 : : *
22 : : * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 : : * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 : : * Brisbane: 186–197.
25 : : *
26 : : * If the inner side tuples of a hash join do not fit in memory, the hash join
27 : : * can be executed in multiple batches.
28 : : *
29 : : * If the statistics on the inner side relation are accurate, planner chooses a
30 : : * multi-batch strategy and estimates the number of batches.
31 : : *
32 : : * The query executor measures the real size of the hashtable and increases the
33 : : * number of batches if the hashtable grows too large.
34 : : *
35 : : * The number of batches is always a power of two, so an increase in the number
36 : : * of batches doubles it.
37 : : *
38 : : * Serial hash join measures batch size lazily -- waiting until it is loading a
39 : : * batch to determine if it will fit in memory. While inserting tuples into the
40 : : * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 : : * dump out the hashtable and reassign them either to other batch files or the
42 : : * current batch resident in the hashtable.
43 : : *
44 : : * Parallel hash join, on the other hand, completes all changes to the number
45 : : * of batches during the build phase. If it increases the number of batches, it
46 : : * dumps out all the tuples from all batches and reassigns them to entirely new
47 : : * batch files. Then it checks every batch to ensure it will fit in the space
48 : : * budget for the query.
49 : : *
50 : : * In both parallel and serial hash join, the executor currently makes a best
51 : : * effort. If a particular batch will not fit in memory, it tries doubling the
52 : : * number of batches. If after a batch increase, there is a batch which
53 : : * retained all or none of its tuples, the executor disables growth in the
54 : : * number of batches globally. After growth is disabled, all batches that would
55 : : * have previously triggered an increase in the number of batches instead
56 : : * exceed the space allowed.
57 : : *
58 : : * PARALLELISM
59 : : *
60 : : * Hash joins can participate in parallel query execution in several ways. A
61 : : * parallel-oblivious hash join is one where the node is unaware that it is
62 : : * part of a parallel plan. In this case, a copy of the inner plan is used to
63 : : * build a copy of the hash table in every backend, and the outer plan could
64 : : * either be built from a partial or complete path, so that the results of the
65 : : * hash join are correspondingly either partial or complete. A parallel-aware
66 : : * hash join is one that behaves differently, coordinating work between
67 : : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 : : * Hash Join always appears with a Parallel Hash node.
69 : : *
70 : : * Parallel-aware hash joins use the same per-backend state machine to track
71 : : * progress through the hash join algorithm as parallel-oblivious hash joins.
72 : : * In a parallel-aware hash join, there is also a shared state machine that
73 : : * co-operating backends use to synchronize their local state machines and
74 : : * program counters. The shared state machine is managed with a Barrier IPC
75 : : * primitive. When all attached participants arrive at a barrier, the phase
76 : : * advances and all waiting participants are released.
77 : : *
78 : : * When a participant begins working on a parallel hash join, it must first
79 : : * figure out how much progress has already been made, because participants
80 : : * don't wait for each other to begin. For this reason there are switch
81 : : * statements at key points in the code where we have to synchronize our local
82 : : * state machine with the phase, and then jump to the correct part of the
83 : : * algorithm so that we can get started.
84 : : *
85 : : * One barrier called build_barrier is used to coordinate the hashing phases.
86 : : * The phase is represented by an integer which begins at zero and increments
87 : : * one by one, but in the code it is referred to by symbolic names as follows.
88 : : * An asterisk indicates a phase that is performed by a single arbitrarily
89 : : * chosen process.
90 : : *
91 : : * PHJ_BUILD_ELECT -- initial state
92 : : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 : : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 : : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 : : * PHJ_BUILD_RUN -- building done, probing can begin
96 : : * PHJ_BUILD_FREE* -- all work complete, one frees batches
97 : : *
98 : : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 : : * be used repeatedly as required to coordinate expansions in the number of
100 : : * batches or buckets. Their phases are as follows:
101 : : *
102 : : * PHJ_GROW_BATCHES_ELECT -- initial state
103 : : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 : : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 : : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 : : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
107 : : *
108 : : * PHJ_GROW_BUCKETS_ELECT -- initial state
109 : : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 : : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
111 : : *
112 : : * If the planner got the number of batches and buckets right, those won't be
113 : : * necessary, but on the other hand we might finish up needing to expand the
114 : : * buckets or batches multiple times while hashing the inner relation to stay
115 : : * within our memory budget and load factor target. For that reason it's a
116 : : * separate pair of barriers using circular phases.
117 : : *
118 : : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 : : * because we need to divide the outer relation into batches up front in order
120 : : * to be able to process batches entirely independently. In contrast, the
121 : : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 : : * batches whenever it encounters them while scanning and probing, which it
123 : : * can do because it processes batches in serial order.
124 : : *
125 : : * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 : : * different batches, or gang up and work together on probing batches if there
127 : : * aren't enough to go around. For each batch there is a separate barrier
128 : : * with the following phases:
129 : : *
130 : : * PHJ_BATCH_ELECT -- initial state
131 : : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 : : * PHJ_BATCH_LOAD -- all load the hash table from disk
133 : : * PHJ_BATCH_PROBE -- all probe
134 : : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 : : * PHJ_BATCH_FREE* -- one frees memory
136 : : *
137 : : * Batch 0 is a special case, because it starts out in phase
138 : : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 : : * PHJ_BUILD_HASH_INNER so we can skip loading.
140 : : *
141 : : * Initially we try to plan for a single-batch hash join using the combined
142 : : * hash_mem of all participants to create a large shared hash table. If that
143 : : * turns out either at planning or execution time to be impossible then we
144 : : * fall back to regular hash_mem sized hash tables.
145 : : *
146 : : * To avoid deadlocks, we never wait for any barrier unless it is known that
147 : : * all other backends attached to it are actively executing the node or have
148 : : * finished. Practically, that means that we never emit a tuple while attached
149 : : * to a barrier, unless the barrier has reached a phase that means that no
150 : : * process will wait on it again. We emit tuples while attached to the build
151 : : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 : : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 : : * respectively without waiting, using BarrierArriveAndDetach() and
154 : : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 : : * receives a different return value so that it knows that it's safe to
156 : : * clean up. Any straggler process that attaches after that phase is reached
157 : : * will see that it's too late to participate or access the relevant shared
158 : : * memory objects.
159 : : *
160 : : *-------------------------------------------------------------------------
161 : : */
162 : :
163 : : #include "postgres.h"
164 : :
165 : : #include "access/htup_details.h"
166 : : #include "access/parallel.h"
167 : : #include "executor/executor.h"
168 : : #include "executor/hashjoin.h"
169 : : #include "executor/nodeHash.h"
170 : : #include "executor/nodeHashjoin.h"
171 : : #include "miscadmin.h"
172 : : #include "utils/lsyscache.h"
173 : : #include "utils/sharedtuplestore.h"
174 : : #include "utils/wait_event.h"
175 : :
176 : :
177 : : /*
178 : : * States of the ExecHashJoin state machine
179 : : */
180 : : #define HJ_BUILD_HASHTABLE 1
181 : : #define HJ_NEED_NEW_OUTER 2
182 : : #define HJ_SCAN_BUCKET 3
183 : : #define HJ_FILL_OUTER_TUPLE 4
184 : : #define HJ_FILL_INNER_TUPLES 5
185 : : #define HJ_NEED_NEW_BATCH 6
186 : :
187 : : /* Returns true if doing null-fill on outer relation */
188 : : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
189 : : /* Returns true if doing null-fill on inner relation */
190 : : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
191 : :
192 : : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
193 : : HashJoinState *hjstate,
194 : : uint32 *hashvalue);
195 : : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
196 : : HashJoinState *hjstate,
197 : : uint32 *hashvalue);
198 : : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
199 : : BufFile *file,
200 : : uint32 *hashvalue,
201 : : TupleTableSlot *tupleSlot);
202 : : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
203 : : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
204 : : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
205 : :
206 : :
207 : : /* ----------------------------------------------------------------
208 : : * ExecHashJoinImpl
209 : : *
210 : : * This function implements the Hybrid Hashjoin algorithm. It is marked
211 : : * with an always-inline attribute so that ExecHashJoin() and
212 : : * ExecParallelHashJoin() can inline it. Compilers that respect the
213 : : * attribute should create versions specialized for parallel == true and
214 : : * parallel == false with unnecessary branches removed.
215 : : *
216 : : * Note: the relation we build hash table on is the "inner"
217 : : * the other one is "outer".
218 : : * ----------------------------------------------------------------
219 : : */
220 : : static pg_attribute_always_inline TupleTableSlot *
2817 andres@anarazel.de 221 :CBC 5943858 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
222 : : {
2973 223 : 5943858 : HashJoinState *node = castNode(HashJoinState, pstate);
224 : : PlanState *outerNode;
225 : : HashState *hashNode;
226 : : ExprState *joinqual;
227 : : ExprState *otherqual;
228 : : ExprContext *econtext;
229 : : HashJoinTable hashtable;
230 : : TupleTableSlot *outerTupleSlot;
231 : : uint32 hashvalue;
232 : : int batchno;
233 : : ParallelHashJoinState *parallel_state;
234 : :
235 : : /*
236 : : * get information from HashJoin node
237 : : */
8311 tgl@sss.pgh.pa.us 238 : 5943858 : joinqual = node->js.joinqual;
239 : 5943858 : otherqual = node->js.ps.qual;
240 : 5943858 : hashNode = (HashState *) innerPlanState(node);
241 : 5943858 : outerNode = outerPlanState(node);
242 : 5943858 : hashtable = node->hj_HashTable;
243 : 5943858 : econtext = node->js.ps.ps_ExprContext;
2817 andres@anarazel.de 244 : 5943858 : parallel_state = hashNode->parallel_state;
245 : :
246 : : /*
247 : : * Reset per-tuple memory context to free any expression evaluation
248 : : * storage allocated in the previous tuple cycle.
249 : : */
9144 tgl@sss.pgh.pa.us 250 : 5943858 : ResetExprContext(econtext);
251 : :
252 : : /*
253 : : * run the hash join state machine
254 : : */
255 : : for (;;)
256 : : {
257 : : /*
258 : : * It's possible to iterate this loop many times before returning a
259 : : * tuple, in some pathological cases such as needing to move much of
260 : : * the current batch to a later batch. So let's check for interrupts
261 : : * each time through.
262 : : */
2965 andres@anarazel.de 263 [ + + ]: 21340923 : CHECK_FOR_INTERRUPTS();
264 : :
5364 tgl@sss.pgh.pa.us 265 [ + + + + : 21340923 : switch (node->hj_JoinState)
+ + - ]
266 : : {
267 : 13509 : case HJ_BUILD_HASHTABLE:
268 : :
269 : : /*
270 : : * First time through: build hash table for inner relation.
271 : : */
272 [ - + ]: 13509 : Assert(hashtable == NULL);
273 : :
274 : : /*
275 : : * If the outer relation is completely empty, and it's not
276 : : * right/right-anti/full join, we can quit without building
277 : : * the hash table. However, for an inner join it is only a
278 : : * win to check this when the outer relation's startup cost is
279 : : * less than the projected cost of building the hash table.
280 : : * Otherwise it's best to build the hash table first and see
281 : : * if the inner relation is empty. (When it's a left join, we
282 : : * should always make this check, since we aren't going to be
283 : : * able to skip the join on the strength of an empty inner
284 : : * relation anyway.)
285 : : *
286 : : * If we are rescanning the join, we make use of information
287 : : * gained on the previous scan: don't bother to try the
288 : : * prefetch if the previous scan found the outer relation
289 : : * nonempty. This is not 100% reliable since with new
290 : : * parameters the outer relation might yield different
291 : : * results, but it's a good heuristic.
292 : : *
293 : : * The only way to make the check is to try to fetch a tuple
294 : : * from the outer plan node. If we succeed, we have to stash
295 : : * it away for later consumption by ExecHashJoinOuterGetTuple.
296 : : */
297 [ + + ]: 13509 : if (HJ_FILL_INNER(node))
298 : : {
299 : : /* no chance to not build the hash table */
300 : 2228 : node->hj_FirstOuterTupleSlot = NULL;
301 : : }
2817 andres@anarazel.de 302 [ + + ]: 11281 : else if (parallel)
303 : : {
304 : : /*
305 : : * The empty-outer optimization is not implemented for
306 : : * shared hash tables, because no one participant can
307 : : * determine that there are no outer tuples, and it's not
308 : : * yet clear that it's worth the synchronization overhead
309 : : * of reaching consensus to figure that out. So we have
310 : : * to build the hash table.
311 : : */
312 : 162 : node->hj_FirstOuterTupleSlot = NULL;
313 : : }
5364 tgl@sss.pgh.pa.us 314 [ + + ]: 11119 : else if (HJ_FILL_OUTER(node) ||
315 [ + + ]: 8519 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
316 [ + + ]: 7899 : !node->hj_OuterNotEmpty))
317 : : {
318 : 9793 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
319 [ + + + + ]: 9793 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
320 : : {
321 : 1286 : node->hj_OuterNotEmpty = false;
322 : 1286 : return NULL;
323 : : }
324 : : else
325 : 8507 : node->hj_OuterNotEmpty = true;
326 : : }
327 : : else
328 : 1326 : node->hj_FirstOuterTupleSlot = NULL;
329 : :
330 : : /*
331 : : * Create the hash table. If using Parallel Hash, then
332 : : * whoever gets here first will create the hash table and any
333 : : * later arrivals will merely attach to it.
334 : : */
382 drowley@postgresql.o 335 : 12223 : hashtable = ExecHashTableCreate(hashNode);
5364 tgl@sss.pgh.pa.us 336 : 12223 : node->hj_HashTable = hashtable;
337 : :
338 : : /*
339 : : * Execute the Hash node, to build the hash table. If using
340 : : * Parallel Hash, then we'll try to help hashing unless we
341 : : * arrived too late.
342 : : */
343 : 12223 : hashNode->hashtable = hashtable;
344 : 12223 : (void) MultiExecProcNode((PlanState *) hashNode);
345 : :
346 : : /*
347 : : * If the inner relation is completely empty, and we're not
348 : : * doing a left outer join, we can quit without scanning the
349 : : * outer relation.
350 : : */
351 [ + + + + ]: 12221 : if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
352 : : {
900 tmunro@postgresql.or 353 [ - + ]: 926 : if (parallel)
354 : : {
355 : : /*
356 : : * Advance the build barrier to PHJ_BUILD_RUN before
357 : : * proceeding so we can negotiate resource cleanup.
358 : : */
900 tmunro@postgresql.or 359 :UBC 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
360 : :
898 361 [ # # ]: 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
900 362 : 0 : BarrierArriveAndWait(build_barrier, 0);
363 : : }
5364 tgl@sss.pgh.pa.us 364 :CBC 926 : return NULL;
365 : : }
366 : :
367 : : /*
368 : : * need to remember whether nbatch has increased since we
369 : : * began scanning the outer relation
370 : : */
371 : 11295 : hashtable->nbatch_outstart = hashtable->nbatch;
372 : :
373 : : /*
374 : : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
375 : : * tuple above, because ExecHashJoinOuterGetTuple will
376 : : * immediately set it again.)
377 : : */
378 : 11295 : node->hj_OuterNotEmpty = false;
379 : :
2817 andres@anarazel.de 380 [ + + ]: 11295 : if (parallel)
381 : 198 : {
382 : : Barrier *build_barrier;
383 : :
384 : 198 : build_barrier = ¶llel_state->build_barrier;
898 tmunro@postgresql.or 385 [ + + - + : 198 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
- - ]
386 : : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
387 : : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
388 [ + + ]: 198 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
389 : : {
390 : : /*
391 : : * If multi-batch, we need to hash the outer relation
392 : : * up front.
393 : : */
2817 andres@anarazel.de 394 [ + + ]: 171 : if (hashtable->nbatch > 1)
395 : 93 : ExecParallelHashJoinPartitionOuter(node);
396 : 171 : BarrierArriveAndWait(build_barrier,
397 : : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
398 : : }
898 tmunro@postgresql.or 399 [ - + ]: 27 : else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
400 : : {
401 : : /*
402 : : * If we attached so late that the job is finished and
403 : : * the batch state has been freed, we can return
404 : : * immediately.
405 : : */
900 tmunro@postgresql.or 406 :UBC 0 : return NULL;
407 : : }
408 : :
409 : : /* Each backend should now select a batch to work on. */
898 tmunro@postgresql.or 410 [ - + ]:CBC 198 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
2817 andres@anarazel.de 411 : 198 : hashtable->curbatch = -1;
412 : 198 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
413 : :
414 : 198 : continue;
415 : : }
416 : : else
417 : 11097 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
418 : :
419 : : /* FALL THRU */
420 : :
5364 tgl@sss.pgh.pa.us 421 : 10662446 : case HJ_NEED_NEW_OUTER:
422 : :
423 : : /*
424 : : * We don't have an outer tuple, try to get the next one
425 : : */
2817 andres@anarazel.de 426 [ + + ]: 10662446 : if (parallel)
427 : : outerTupleSlot =
428 : 1080428 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
429 : : &hashvalue);
430 : : else
431 : : outerTupleSlot =
432 : 9582018 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
433 : :
5364 tgl@sss.pgh.pa.us 434 [ + + - + ]: 10662446 : if (TupIsNull(outerTupleSlot))
435 : : {
436 : : /* end of batch, or maybe whole join */
437 [ + + ]: 12717 : if (HJ_FILL_INNER(node))
438 : : {
439 : : /* set up to scan for unmatched inner tuples */
890 tmunro@postgresql.or 440 [ + + ]: 2117 : if (parallel)
441 : : {
442 : : /*
443 : : * Only one process is currently allow to handle
444 : : * each batch's unmatched tuples, in a parallel
445 : : * join.
446 : : */
447 [ + + ]: 50 : if (ExecParallelPrepHashTableForUnmatched(node))
448 : 33 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
449 : : else
450 : 17 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
451 : : }
452 : : else
453 : : {
454 : 2067 : ExecPrepHashTableForUnmatched(node);
455 : 2067 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
456 : : }
457 : : }
458 : : else
5364 tgl@sss.pgh.pa.us 459 : 10600 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
460 : 12717 : continue;
461 : : }
462 : :
463 : 10649729 : econtext->ecxt_outertuple = outerTupleSlot;
464 : 10649729 : node->hj_MatchedOuter = false;
465 : :
466 : : /*
467 : : * Find the corresponding bucket for this tuple in the main
468 : : * hash table or skew hash table.
469 : : */
470 : 10649729 : node->hj_CurHashValue = hashvalue;
471 : 10649729 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
472 : : &node->hj_CurBucketNo, &batchno);
473 : 10649729 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
474 : : hashvalue);
475 : 10649729 : node->hj_CurTuple = NULL;
476 : :
477 : : /*
478 : : * The tuple might not belong to the current batch (where
479 : : * "current batch" includes the skew buckets if any).
480 : : */
481 [ + + ]: 10649729 : if (batchno != hashtable->curbatch &&
482 [ + + ]: 697116 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
483 : 696516 : {
484 : : bool shouldFree;
2487 andres@anarazel.de 485 : 696516 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
486 : : &shouldFree);
487 : :
488 : : /*
489 : : * Need to postpone this outer tuple to a later batch.
490 : : * Save it in the corresponding outer-batch file.
491 : : */
2817 492 [ - + ]: 696516 : Assert(parallel_state == NULL);
5364 tgl@sss.pgh.pa.us 493 [ - + ]: 696516 : Assert(batchno > hashtable->curbatch);
2487 andres@anarazel.de 494 : 696516 : ExecHashJoinSaveTuple(mintuple, hashvalue,
841 tomas.vondra@postgre 495 : 696516 : &hashtable->outerBatchFile[batchno],
496 : : hashtable);
497 : :
2487 andres@anarazel.de 498 [ + - ]: 696516 : if (shouldFree)
499 : 696516 : heap_free_minimal_tuple(mintuple);
500 : :
501 : : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
5364 tgl@sss.pgh.pa.us 502 : 696516 : continue;
503 : : }
504 : :
505 : : /* OK, let's scan the bucket for matches */
506 : 9953213 : node->hj_JoinState = HJ_SCAN_BUCKET;
507 : :
508 : : /* FALL THRU */
509 : :
510 : 13111853 : case HJ_SCAN_BUCKET:
511 : :
512 : : /*
513 : : * Scan the selected hash bucket for matches to current outer
514 : : */
2817 andres@anarazel.de 515 [ + + ]: 13111853 : if (parallel)
516 : : {
517 [ + + ]: 2100027 : if (!ExecParallelScanHashBucket(node, econtext))
518 : : {
519 : : /* out of matches; check for possible outer-join fill */
520 : 1080015 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
521 : 1080015 : continue;
522 : : }
523 : : }
524 : : else
525 : : {
526 [ + + ]: 11011826 : if (!ExecScanHashBucket(node, econtext))
527 : : {
528 : : /* out of matches; check for possible outer-join fill */
529 : 6150628 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
530 : 6150628 : continue;
531 : : }
532 : : }
533 : :
534 : : /*
535 : : * In a right-semijoin, we only need the first match for each
536 : : * inner tuple.
537 : : */
428 rguo@postgresql.org 538 [ + + + + ]: 5881380 : if (node->js.jointype == JOIN_RIGHT_SEMI &&
539 : 170 : HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
540 : 36 : continue;
541 : :
542 : : /*
543 : : * We've got a match, but still need to test non-hashed quals.
544 : : * ExecScanHashBucket already set up all the state needed to
545 : : * call ExecQual.
546 : : *
547 : : * If we pass the qual, then save state for next call and have
548 : : * ExecProject form the projection, store it in the tuple
549 : : * table, and return the slot.
550 : : *
551 : : * Only the joinquals determine tuple match status, but all
552 : : * quals must pass to actually return the tuple.
553 : : */
3098 andres@anarazel.de 554 [ + + + + ]: 5881174 : if (joinqual == NULL || ExecQual(joinqual, econtext))
555 : : {
5364 tgl@sss.pgh.pa.us 556 : 5804206 : node->hj_MatchedOuter = true;
557 : :
558 : : /*
559 : : * This is really only needed if HJ_FILL_INNER(node) or if
560 : : * we are in a right-semijoin, but we'll avoid the branch
561 : : * and just set it always.
562 : : */
890 tmunro@postgresql.or 563 [ + + ]: 5804206 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
2049 564 : 2947435 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
565 : :
566 : : /* In an antijoin, we never return a matched tuple */
5364 tgl@sss.pgh.pa.us 567 [ + + ]: 5804206 : if (node->js.jointype == JOIN_ANTI)
568 : : {
569 : 735434 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
570 : 735434 : continue;
571 : : }
572 : :
573 : : /*
574 : : * If we only need to consider the first matching inner
575 : : * tuple, then advance to next outer tuple after we've
576 : : * processed this one.
577 : : */
425 rguo@postgresql.org 578 [ + + ]: 5068772 : if (node->js.single_match)
579 : 1987071 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
580 : :
581 : : /*
582 : : * In a right-antijoin, we never return a matched tuple.
583 : : * If it's not an inner_unique join, we need to stay on
584 : : * the current outer tuple to continue scanning the inner
585 : : * side for matches.
586 : : */
587 [ + + ]: 5068772 : if (node->js.jointype == JOIN_RIGHT_ANTI)
588 : 11312 : continue;
589 : :
3098 andres@anarazel.de 590 [ + + + + ]: 5057460 : if (otherqual == NULL || ExecQual(otherqual, econtext))
3152 591 : 4962186 : return ExecProject(node->js.ps.ps_ProjInfo);
592 : : else
5098 tgl@sss.pgh.pa.us 593 [ - + ]: 95274 : InstrCountFiltered2(node, 1);
594 : : }
595 : : else
596 [ - + ]: 76968 : InstrCountFiltered1(node, 1);
5364 597 : 172242 : break;
598 : :
599 : 7230643 : case HJ_FILL_OUTER_TUPLE:
600 : :
601 : : /*
602 : : * The current outer tuple has run out of matches, so check
603 : : * whether to emit a dummy outer-join tuple. Whether we emit
604 : : * one or not, the next state is NEED_NEW_OUTER.
605 : : */
606 : 7230643 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
607 : :
608 [ + + ]: 7230643 : if (!node->hj_MatchedOuter &&
609 [ + + ]: 4806245 : HJ_FILL_OUTER(node))
610 : : {
611 : : /*
612 : : * Generate a fake join tuple with nulls for the inner
613 : : * tuple, and return it if it passes the non-join quals.
614 : : */
615 : 1349767 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
616 : :
3098 andres@anarazel.de 617 [ + + + + ]: 1349767 : if (otherqual == NULL || ExecQual(otherqual, econtext))
3152 618 : 699273 : return ExecProject(node->js.ps.ps_ProjInfo);
619 : : else
5098 tgl@sss.pgh.pa.us 620 [ - + ]: 650494 : InstrCountFiltered2(node, 1);
621 : : }
5364 622 : 6531370 : break;
623 : :
624 : 273873 : case HJ_FILL_INNER_TUPLES:
625 : :
626 : : /*
627 : : * We have finished a batch, but we are doing
628 : : * right/right-anti/full join, so any unmatched inner tuples
629 : : * in the hashtable have to be emitted before we continue to
630 : : * the next batch.
631 : : */
890 tmunro@postgresql.or 632 [ + + + + ]: 487710 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
633 : 213837 : : ExecScanHashTableForUnmatched(node, econtext)))
634 : : {
635 : : /* no more unmatched tuples */
5364 tgl@sss.pgh.pa.us 636 : 2094 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
637 : 2094 : continue;
638 : : }
639 : :
640 : : /*
641 : : * Generate a fake join tuple with nulls for the outer tuple,
642 : : * and return it if it passes the non-join quals.
643 : : */
644 : 271779 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
645 : :
3098 andres@anarazel.de 646 [ + + + + ]: 271779 : if (otherqual == NULL || ExecQual(otherqual, econtext))
3152 647 : 268166 : return ExecProject(node->js.ps.ps_ProjInfo);
648 : : else
5098 tgl@sss.pgh.pa.us 649 [ - + ]: 3613 : InstrCountFiltered2(node, 1);
5364 650 : 3613 : break;
651 : :
652 : 12909 : case HJ_NEED_NEW_BATCH:
653 : :
654 : : /*
655 : : * Try to advance to next batch. Done if there are no more.
656 : : */
2817 andres@anarazel.de 657 [ + + ]: 12909 : if (parallel)
658 : : {
659 [ + + ]: 611 : if (!ExecParallelHashJoinNewBatch(node))
660 : 198 : return NULL; /* end of parallel-aware join */
661 : : }
662 : : else
663 : : {
664 [ + + ]: 12298 : if (!ExecHashJoinNewBatch(node))
665 : 11821 : return NULL; /* end of parallel-oblivious join */
666 : : }
5364 tgl@sss.pgh.pa.us 667 : 890 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
668 : 890 : break;
669 : :
5364 tgl@sss.pgh.pa.us 670 :UBC 0 : default:
671 [ # # ]: 0 : elog(ERROR, "unrecognized hashjoin state: %d",
672 : : (int) node->hj_JoinState);
673 : : }
674 : : }
675 : : }
676 : :
677 : : /* ----------------------------------------------------------------
678 : : * ExecHashJoin
679 : : *
680 : : * Parallel-oblivious version.
681 : : * ----------------------------------------------------------------
682 : : */
683 : : static TupleTableSlot * /* return: a tuple or NULL */
2817 andres@anarazel.de 684 :CBC 4803642 : ExecHashJoin(PlanState *pstate)
685 : : {
686 : : /*
687 : : * On sufficiently smart compilers this should be inlined with the
688 : : * parallel-aware branches removed.
689 : : */
690 : 4803642 : return ExecHashJoinImpl(pstate, false);
691 : : }
692 : :
693 : : /* ----------------------------------------------------------------
694 : : * ExecParallelHashJoin
695 : : *
696 : : * Parallel-aware version.
697 : : * ----------------------------------------------------------------
698 : : */
699 : : static TupleTableSlot * /* return: a tuple or NULL */
700 : 1140216 : ExecParallelHashJoin(PlanState *pstate)
701 : : {
702 : : /*
703 : : * On sufficiently smart compilers this should be inlined with the
704 : : * parallel-oblivious branches removed.
705 : : */
706 : 1140216 : return ExecHashJoinImpl(pstate, true);
707 : : }
708 : :
709 : : /* ----------------------------------------------------------------
710 : : * ExecInitHashJoin
711 : : *
712 : : * Init routine for HashJoin node.
713 : : * ----------------------------------------------------------------
714 : : */
715 : : HashJoinState *
7130 tgl@sss.pgh.pa.us 716 : 16602 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
717 : : {
718 : : HashJoinState *hjstate;
719 : : Plan *outerNode;
720 : : Hash *hashNode;
721 : : TupleDesc outerDesc,
722 : : innerDesc;
723 : : const TupleTableSlotOps *ops;
724 : :
725 : : /* check for unsupported flags */
726 [ - + ]: 16602 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
727 : :
728 : : /*
729 : : * create state structure
730 : : */
10226 bruce@momjian.us 731 : 16602 : hjstate = makeNode(HashJoinState);
8311 tgl@sss.pgh.pa.us 732 : 16602 : hjstate->js.ps.plan = (Plan *) node;
733 : 16602 : hjstate->js.ps.state = estate;
734 : :
735 : : /*
736 : : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
737 : : * where this function may be replaced with a parallel version, if we
738 : : * managed to launch a parallel query.
739 : : */
2973 andres@anarazel.de 740 : 16602 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
2759 741 : 16602 : hjstate->js.jointype = node->join.jointype;
742 : :
743 : : /*
744 : : * Miscellaneous initialization
745 : : *
746 : : * create expression context for node
747 : : */
8311 tgl@sss.pgh.pa.us 748 : 16602 : ExecAssignExprContext(estate, &hjstate->js.ps);
749 : :
750 : : /*
751 : : * initialize child nodes
752 : : *
753 : : * Note: we could suppress the REWIND flag for the inner input, which
754 : : * would amount to betting that the hash will be a single batch. Not
755 : : * clear if this would be a win or not.
756 : : */
757 : 16602 : outerNode = outerPlan(node);
758 : 16602 : hashNode = (Hash *) innerPlan(node);
759 : :
7130 760 : 16602 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
2759 andres@anarazel.de 761 : 16602 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
7130 tgl@sss.pgh.pa.us 762 : 16602 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
2759 andres@anarazel.de 763 : 16602 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
764 : :
765 : : /*
766 : : * Initialize result slot, type and projection.
767 : : */
2487 768 : 16602 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
2759 769 : 16602 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
770 : :
771 : : /*
772 : : * tuple table initialization
773 : : */
2487 774 : 16602 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
775 : 16602 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
776 : : ops);
777 : :
778 : : /*
779 : : * detect whether we need only consider the first matching inner tuple
780 : : */
3074 tgl@sss.pgh.pa.us 781 [ + + ]: 23794 : hjstate->js.single_match = (node->join.inner_unique ||
782 [ + + ]: 7192 : node->join.jointype == JOIN_SEMI);
783 : :
784 : : /* set up null tuples for outer joins, if needed */
9125 785 [ + + + + : 16602 : switch (node->join.jointype)
- ]
786 : : {
787 : 10939 : case JOIN_INNER:
788 : : case JOIN_SEMI:
789 : : case JOIN_RIGHT_SEMI:
790 : 10939 : break;
791 : 2856 : case JOIN_LEFT:
792 : : case JOIN_ANTI:
793 : 2856 : hjstate->hj_NullInnerTupleSlot =
2487 andres@anarazel.de 794 : 2856 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
9125 tgl@sss.pgh.pa.us 795 : 2856 : break;
5364 796 : 2259 : case JOIN_RIGHT:
797 : : case JOIN_RIGHT_ANTI:
798 : 2259 : hjstate->hj_NullOuterTupleSlot =
2487 andres@anarazel.de 799 : 2259 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
5364 tgl@sss.pgh.pa.us 800 : 2259 : break;
801 : 548 : case JOIN_FULL:
802 : 548 : hjstate->hj_NullOuterTupleSlot =
2487 andres@anarazel.de 803 : 548 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
5364 tgl@sss.pgh.pa.us 804 : 548 : hjstate->hj_NullInnerTupleSlot =
2487 andres@anarazel.de 805 : 548 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
5364 tgl@sss.pgh.pa.us 806 : 548 : break;
9125 tgl@sss.pgh.pa.us 807 :UBC 0 : default:
8083 808 [ # # ]: 0 : elog(ERROR, "unrecognized join type: %d",
809 : : (int) node->join.jointype);
810 : : }
811 : :
812 : : /*
813 : : * now for some voodoo. our temporary tuple slot is actually the result
814 : : * tuple slot of the Hash node (which is our inner plan). we can do this
815 : : * because Hash nodes don't return tuples via ExecProcNode() -- instead
816 : : * the hash join node uses ExecScanHashBucket() to get at the contents of
817 : : * the hash table. -cim 6/9/91
818 : : */
819 : : {
8311 tgl@sss.pgh.pa.us 820 :CBC 16602 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
382 drowley@postgresql.o 821 : 16602 : Hash *hash = (Hash *) hashstate->ps.plan;
8311 tgl@sss.pgh.pa.us 822 : 16602 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
823 : : Oid *outer_hashfuncid;
824 : : Oid *inner_hashfuncid;
825 : : bool *hash_strict;
826 : : ListCell *lc;
827 : : int nkeys;
828 : :
829 : :
10226 bruce@momjian.us 830 : 16602 : hjstate->hj_HashTupleSlot = slot;
831 : :
832 : : /*
833 : : * Build ExprStates to obtain hash values for either side of the join.
834 : : * This must be done here as ExecBuildHash32Expr needs to know how to
835 : : * handle NULL inputs and the required handling of that depends on the
836 : : * jointype. We don't know the join type in ExecInitHash() and we
837 : : * must build the ExprStates before ExecHashTableCreate() so we
838 : : * properly attribute any SubPlans that exist in the hash expressions
839 : : * to the correct PlanState.
840 : : */
382 drowley@postgresql.o 841 : 16602 : nkeys = list_length(node->hashoperators);
842 : :
843 : 16602 : outer_hashfuncid = palloc_array(Oid, nkeys);
844 : 16602 : inner_hashfuncid = palloc_array(Oid, nkeys);
845 : 16602 : hash_strict = palloc_array(bool, nkeys);
846 : :
847 : : /*
848 : : * Determine the hash function for each side of the join for the given
849 : : * hash operator.
850 : : */
851 [ + - + + : 34489 : foreach(lc, node->hashoperators)
+ + ]
852 : : {
853 : 17887 : Oid hashop = lfirst_oid(lc);
854 : 17887 : int i = foreach_current_index(lc);
855 : :
856 [ - + ]: 17887 : if (!get_op_hash_functions(hashop,
857 : 17887 : &outer_hashfuncid[i],
858 : 17887 : &inner_hashfuncid[i]))
382 drowley@postgresql.o 859 [ # # ]:UBC 0 : elog(ERROR,
860 : : "could not find hash function for hash operator %u",
861 : : hashop);
382 drowley@postgresql.o 862 :CBC 17887 : hash_strict[i] = op_strict(hashop);
863 : : }
864 : :
865 : : /*
866 : : * Build an ExprState to generate the hash value for the expressions
867 : : * on the outer of the join. This ExprState must finish generating
868 : : * the hash value when HJ_FILL_OUTER() is true. Otherwise,
869 : : * ExecBuildHash32Expr will set up the ExprState to abort early if it
870 : : * finds a NULL. In these cases, we don't need to store these tuples
871 : : * in the hash table as the jointype does not require it.
872 : : */
873 : 16602 : hjstate->hj_OuterHash =
874 : 16602 : ExecBuildHash32Expr(hjstate->js.ps.ps_ResultTupleDesc,
875 : : hjstate->js.ps.resultops,
876 : : outer_hashfuncid,
877 : 16602 : node->hashcollations,
878 : 16602 : node->hashkeys,
879 : : hash_strict,
880 : : &hjstate->js.ps,
881 : : 0,
882 : 16602 : HJ_FILL_OUTER(hjstate));
883 : :
884 : : /* As above, but for the inner side of the join */
885 : 16602 : hashstate->hash_expr =
886 : 16602 : ExecBuildHash32Expr(hashstate->ps.ps_ResultTupleDesc,
887 : : hashstate->ps.resultops,
888 : : inner_hashfuncid,
889 : 16602 : node->hashcollations,
890 : 16602 : hash->hashkeys,
891 : : hash_strict,
892 : : &hashstate->ps,
893 : : 0,
894 : 16602 : HJ_FILL_INNER(hjstate));
895 : :
896 : : /*
897 : : * Set up the skew table hash function while we have a record of the
898 : : * first key's hash function Oid.
899 : : */
900 [ + + ]: 16602 : if (OidIsValid(hash->skewTable))
901 : : {
902 : 11837 : hashstate->skew_hashfunction = palloc0(sizeof(FmgrInfo));
903 : 11837 : hashstate->skew_collation = linitial_oid(node->hashcollations);
904 : 11837 : fmgr_info(outer_hashfuncid[0], hashstate->skew_hashfunction);
905 : : }
906 : :
907 : : /* no need to keep these */
908 : 16602 : pfree(outer_hashfuncid);
909 : 16602 : pfree(inner_hashfuncid);
910 : 16602 : pfree(hash_strict);
911 : : }
912 : :
913 : : /*
914 : : * initialize child expressions
915 : : */
2759 andres@anarazel.de 916 : 16602 : hjstate->js.ps.qual =
917 : 16602 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
918 : 16602 : hjstate->js.joinqual =
919 : 16602 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
920 : 16602 : hjstate->hashclauses =
921 : 16602 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
922 : :
923 : : /*
924 : : * initialize hash-specific info
925 : : */
7913 neilc@samurai.com 926 : 16602 : hjstate->hj_HashTable = NULL;
7286 tgl@sss.pgh.pa.us 927 : 16602 : hjstate->hj_FirstOuterTupleSlot = NULL;
928 : :
7489 929 : 16602 : hjstate->hj_CurHashValue = 0;
9608 930 : 16602 : hjstate->hj_CurBucketNo = 0;
6013 931 : 16602 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7913 neilc@samurai.com 932 : 16602 : hjstate->hj_CurTuple = NULL;
933 : :
5364 tgl@sss.pgh.pa.us 934 : 16602 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
9125 935 : 16602 : hjstate->hj_MatchedOuter = false;
7222 936 : 16602 : hjstate->hj_OuterNotEmpty = false;
937 : :
8311 938 : 16602 : return hjstate;
939 : : }
940 : :
941 : : /* ----------------------------------------------------------------
942 : : * ExecEndHashJoin
943 : : *
944 : : * clean up routine for HashJoin node
945 : : * ----------------------------------------------------------------
946 : : */
947 : : void
948 : 16544 : ExecEndHashJoin(HashJoinState *node)
949 : : {
950 : : /*
951 : : * Free hash table
952 : : */
953 [ + + ]: 16544 : if (node->hj_HashTable)
954 : : {
955 : 11347 : ExecHashTableDestroy(node->hj_HashTable);
956 : 11347 : node->hj_HashTable = NULL;
957 : : }
958 : :
959 : : /*
960 : : * clean up subtrees
961 : : */
8301 962 : 16544 : ExecEndNode(outerPlanState(node));
963 : 16544 : ExecEndNode(innerPlanState(node));
10651 scrappy@hub.org 964 : 16544 : }
965 : :
966 : : /*
967 : : * ExecHashJoinOuterGetTuple
968 : : *
969 : : * get the next outer tuple for a parallel oblivious hashjoin: either by
970 : : * executing the outer plan node in the first pass, or from the temp
971 : : * files for the hashjoin batches.
972 : : *
973 : : * Returns a null slot if no more outer tuples (within the current batch).
974 : : *
975 : : * On success, the tuple's hash value is stored at *hashvalue --- this is
976 : : * either originally computed, or re-read from the temp file.
977 : : */
978 : : static TupleTableSlot *
7489 tgl@sss.pgh.pa.us 979 : 9582018 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
980 : : HashJoinState *hjstate,
981 : : uint32 *hashvalue)
982 : : {
9601 bruce@momjian.us 983 : 9582018 : HashJoinTable hashtable = hjstate->hj_HashTable;
984 : 9582018 : int curbatch = hashtable->curbatch;
985 : : TupleTableSlot *slot;
986 : :
6796 tgl@sss.pgh.pa.us 987 [ + + ]: 9582018 : if (curbatch == 0) /* if it is the first pass */
988 : : {
989 : : /*
990 : : * Check to see if first outer tuple was already fetched by
991 : : * ExecHashJoin() and not used yet.
992 : : */
7286 993 : 8885025 : slot = hjstate->hj_FirstOuterTupleSlot;
994 [ + + + - ]: 8885025 : if (!TupIsNull(slot))
995 : 7831 : hjstate->hj_FirstOuterTupleSlot = NULL;
996 : : else
997 : 8877194 : slot = ExecProcNode(outerNode);
998 : :
6796 999 [ + + + + ]: 8885444 : while (!TupIsNull(slot))
1000 : : {
1001 : : bool isnull;
1002 : :
1003 : : /*
1004 : : * We have to compute the tuple's hash value.
1005 : : */
7489 1006 : 8873617 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1007 : :
1008 : 8873617 : econtext->ecxt_outertuple = slot;
1009 : :
382 drowley@postgresql.o 1010 : 8873617 : ResetExprContext(econtext);
1011 : :
1012 : 8873617 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1013 : : econtext,
1014 : : &isnull));
1015 : :
1016 [ + + ]: 8873617 : if (!isnull)
1017 : : {
1018 : : /* remember outer relation is not empty for possible rescan */
6796 tgl@sss.pgh.pa.us 1019 : 8873198 : hjstate->hj_OuterNotEmpty = true;
1020 : :
1021 : 8873198 : return slot;
1022 : : }
1023 : :
1024 : : /*
1025 : : * That tuple couldn't match because of a NULL, so discard it and
1026 : : * continue with the next one.
1027 : : */
1028 : 419 : slot = ExecProcNode(outerNode);
1029 : : }
1030 : : }
5364 1031 [ + - ]: 696993 : else if (curbatch < hashtable->nbatch)
1032 : : {
1033 : 696993 : BufFile *file = hashtable->outerBatchFile[curbatch];
1034 : :
1035 : : /*
1036 : : * In outer-join cases, we could get here even though the batch file
1037 : : * is empty.
1038 : : */
1039 [ - + ]: 696993 : if (file == NULL)
5364 tgl@sss.pgh.pa.us 1040 :UBC 0 : return NULL;
1041 : :
9608 tgl@sss.pgh.pa.us 1042 :CBC 696993 : slot = ExecHashJoinGetSavedTuple(hjstate,
1043 : : file,
1044 : : hashvalue,
1045 : : hjstate->hj_OuterTupleSlot);
9601 bruce@momjian.us 1046 [ + + + - ]: 696993 : if (!TupIsNull(slot))
9608 tgl@sss.pgh.pa.us 1047 : 696516 : return slot;
1048 : : }
1049 : :
1050 : : /* End of this batch */
1051 : 12304 : return NULL;
1052 : : }
1053 : :
1054 : : /*
1055 : : * ExecHashJoinOuterGetTuple variant for the parallel case.
1056 : : */
1057 : : static TupleTableSlot *
2817 andres@anarazel.de 1058 : 1080428 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
1059 : : HashJoinState *hjstate,
1060 : : uint32 *hashvalue)
1061 : : {
1062 : 1080428 : HashJoinTable hashtable = hjstate->hj_HashTable;
1063 : 1080428 : int curbatch = hashtable->curbatch;
1064 : : TupleTableSlot *slot;
1065 : :
1066 : : /*
1067 : : * In the Parallel Hash case we only run the outer plan directly for
1068 : : * single-batch hash joins. Otherwise we have to go to batch files, even
1069 : : * for batch 0.
1070 : : */
1071 [ + + + + ]: 1080428 : if (curbatch == 0 && hashtable->nbatch == 1)
1072 : : {
1073 : 480085 : slot = ExecProcNode(outerNode);
1074 : :
1075 [ + + + - ]: 480085 : while (!TupIsNull(slot))
1076 : : {
1077 : : bool isnull;
1078 : :
1079 : 480003 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1080 : :
1081 : 480003 : econtext->ecxt_outertuple = slot;
1082 : :
382 drowley@postgresql.o 1083 : 480003 : ResetExprContext(econtext);
1084 : :
1085 : 480003 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1086 : : econtext,
1087 : : &isnull));
1088 : :
1089 [ + - ]: 480003 : if (!isnull)
2817 andres@anarazel.de 1090 : 480003 : return slot;
1091 : :
1092 : : /*
1093 : : * That tuple couldn't match because of a NULL, so discard it and
1094 : : * continue with the next one.
1095 : : */
2817 andres@anarazel.de 1096 :UBC 0 : slot = ExecProcNode(outerNode);
1097 : : }
1098 : : }
2817 andres@anarazel.de 1099 [ + - ]:CBC 600343 : else if (curbatch < hashtable->nbatch)
1100 : : {
1101 : : MinimalTuple tuple;
1102 : :
1103 : 600343 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1104 : : hashvalue);
1105 [ + + ]: 600343 : if (tuple != NULL)
1106 : : {
2486 1107 : 600012 : ExecForceStoreMinimalTuple(tuple,
1108 : : hjstate->hj_OuterTupleSlot,
1109 : : false);
1110 : 600012 : slot = hjstate->hj_OuterTupleSlot;
2817 1111 : 600012 : return slot;
1112 : : }
1113 : : else
1114 : 331 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
1115 : : }
1116 : :
1117 : : /* End of this batch */
890 tmunro@postgresql.or 1118 : 413 : hashtable->batches[curbatch].outer_eof = true;
1119 : :
2817 andres@anarazel.de 1120 : 413 : return NULL;
1121 : : }
1122 : :
1123 : : /*
1124 : : * ExecHashJoinNewBatch
1125 : : * switch to a new hashjoin batch
1126 : : *
1127 : : * Returns true if successful, false if there are no more batches.
1128 : : */
1129 : : static bool
10225 bruce@momjian.us 1130 : 12298 : ExecHashJoinNewBatch(HashJoinState *hjstate)
1131 : : {
9608 tgl@sss.pgh.pa.us 1132 : 12298 : HashJoinTable hashtable = hjstate->hj_HashTable;
1133 : : int nbatch;
1134 : : int curbatch;
1135 : : BufFile *innerFile;
1136 : : TupleTableSlot *slot;
1137 : : uint32 hashvalue;
1138 : :
7489 1139 : 12298 : nbatch = hashtable->nbatch;
1140 : 12298 : curbatch = hashtable->curbatch;
1141 : :
1142 [ + + ]: 12298 : if (curbatch > 0)
1143 : : {
1144 : : /*
1145 : : * We no longer need the previous outer batch file; close it right
1146 : : * away to free disk space.
1147 : : */
1148 [ + - ]: 477 : if (hashtable->outerBatchFile[curbatch])
1149 : 477 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1150 : 477 : hashtable->outerBatchFile[curbatch] = NULL;
1151 : : }
1152 : : else /* we just finished the first batch */
1153 : : {
1154 : : /*
1155 : : * Reset some of the skew optimization state variables, since we no
1156 : : * longer need to consider skew tuples after the first batch. The
1157 : : * memory context reset we are about to do will release the skew
1158 : : * hashtable itself.
1159 : : */
6013 1160 : 11821 : hashtable->skewEnabled = false;
1161 : 11821 : hashtable->skewBucket = NULL;
1162 : 11821 : hashtable->skewBucketNums = NULL;
5364 1163 : 11821 : hashtable->nSkewBuckets = 0;
6013 1164 : 11821 : hashtable->spaceUsedSkew = 0;
1165 : : }
1166 : :
1167 : : /*
1168 : : * We can always skip over any batches that are completely empty on both
1169 : : * sides. We can sometimes skip over batches that are empty on only one
1170 : : * side, but there are exceptions:
1171 : : *
1172 : : * 1. In a left/full outer join, we have to process outer batches even if
1173 : : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1174 : : * join, we have to process inner batches even if the outer batch is
1175 : : * empty.
1176 : : *
1177 : : * 2. If we have increased nbatch since the initial estimate, we have to
1178 : : * scan inner batches since they might contain tuples that need to be
1179 : : * reassigned to later inner batches.
1180 : : *
1181 : : * 3. Similarly, if we have increased nbatch since starting the outer
1182 : : * scan, we have to rescan outer batches in case they contain tuples that
1183 : : * need to be reassigned.
1184 : : */
7489 1185 : 12298 : curbatch++;
1186 [ + + ]: 12298 : while (curbatch < nbatch &&
1187 [ - + ]: 477 : (hashtable->outerBatchFile[curbatch] == NULL ||
1188 [ - + ]: 477 : hashtable->innerBatchFile[curbatch] == NULL))
1189 : : {
7489 tgl@sss.pgh.pa.us 1190 [ # # ]:UBC 0 : if (hashtable->outerBatchFile[curbatch] &&
5364 1191 [ # # ]: 0 : HJ_FILL_OUTER(hjstate))
1192 : 0 : break; /* must process due to rule 1 */
1193 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch] &&
1194 [ # # ]: 0 : HJ_FILL_INNER(hjstate))
7489 1195 : 0 : break; /* must process due to rule 1 */
1196 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch] &&
1197 [ # # ]: 0 : nbatch != hashtable->nbatch_original)
1198 : 0 : break; /* must process due to rule 2 */
1199 [ # # ]: 0 : if (hashtable->outerBatchFile[curbatch] &&
1200 [ # # ]: 0 : nbatch != hashtable->nbatch_outstart)
1201 : 0 : break; /* must process due to rule 3 */
1202 : : /* We can ignore this batch. */
1203 : : /* Release associated temp files right away. */
1204 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch])
1205 : 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1206 : 0 : hashtable->innerBatchFile[curbatch] = NULL;
1207 [ # # ]: 0 : if (hashtable->outerBatchFile[curbatch])
1208 : 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1209 : 0 : hashtable->outerBatchFile[curbatch] = NULL;
1210 : 0 : curbatch++;
1211 : : }
1212 : :
7489 tgl@sss.pgh.pa.us 1213 [ + + ]:CBC 12298 : if (curbatch >= nbatch)
5263 bruce@momjian.us 1214 : 11821 : return false; /* no more batches */
1215 : :
7489 tgl@sss.pgh.pa.us 1216 : 477 : hashtable->curbatch = curbatch;
1217 : :
1218 : : /*
1219 : : * Reload the hash table with the new inner batch (which could be empty)
1220 : : */
1221 : 477 : ExecHashTableReset(hashtable);
1222 : :
1223 : 477 : innerFile = hashtable->innerBatchFile[curbatch];
1224 : :
1225 [ + - ]: 477 : if (innerFile != NULL)
1226 : : {
892 peter@eisentraut.org 1227 [ - + ]: 477 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
7489 tgl@sss.pgh.pa.us 1228 [ # # ]:UBC 0 : ereport(ERROR,
1229 : : (errcode_for_file_access(),
1230 : : errmsg("could not rewind hash-join temporary file")));
1231 : :
7489 tgl@sss.pgh.pa.us 1232 [ + + ]:CBC 1732812 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1233 : : innerFile,
1234 : : &hashvalue,
1235 : : hjstate->hj_HashTupleSlot)))
1236 : : {
1237 : : /*
1238 : : * NOTE: some tuples may be sent to future batches. Also, it is
1239 : : * possible for hashtable->nbatch to be increased here!
1240 : : */
7011 1241 : 1732335 : ExecHashTableInsert(hashtable, slot, hashvalue);
1242 : : }
1243 : :
1244 : : /*
1245 : : * after we build the hash table, the inner batch file is no longer
1246 : : * needed
1247 : : */
7489 1248 : 477 : BufFileClose(innerFile);
1249 : 477 : hashtable->innerBatchFile[curbatch] = NULL;
1250 : : }
1251 : :
1252 : : /*
1253 : : * Rewind outer batch file (if present), so that we can start reading it.
1254 : : */
5364 1255 [ + - ]: 477 : if (hashtable->outerBatchFile[curbatch] != NULL)
1256 : : {
892 peter@eisentraut.org 1257 [ - + ]: 477 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
5364 tgl@sss.pgh.pa.us 1258 [ # # ]:UBC 0 : ereport(ERROR,
1259 : : (errcode_for_file_access(),
1260 : : errmsg("could not rewind hash-join temporary file")));
1261 : : }
1262 : :
5364 tgl@sss.pgh.pa.us 1263 :CBC 477 : return true;
1264 : : }
1265 : :
1266 : : /*
1267 : : * Choose a batch to work on, and attach to it. Returns true if successful,
1268 : : * false if there are no more batches.
1269 : : */
1270 : : static bool
2817 andres@anarazel.de 1271 : 611 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1272 : : {
1273 : 611 : HashJoinTable hashtable = hjstate->hj_HashTable;
1274 : : int start_batchno;
1275 : : int batchno;
1276 : :
1277 : : /*
1278 : : * If we were already attached to a batch, remember not to bother checking
1279 : : * it again, and detach from it (possibly freeing the hash table if we are
1280 : : * last to detach).
1281 : : */
1282 [ + + ]: 611 : if (hashtable->curbatch >= 0)
1283 : : {
1284 : 396 : hashtable->batches[hashtable->curbatch].done = true;
1285 : 396 : ExecHashTableDetachBatch(hashtable);
1286 : : }
1287 : :
1288 : : /*
1289 : : * Search for a batch that isn't done. We use an atomic counter to start
1290 : : * our search at a different batch in every participant when there are
1291 : : * more batches than participants.
1292 : : */
1293 : 611 : batchno = start_batchno =
1294 : 611 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1295 : 611 : hashtable->nbatch;
1296 : : do
1297 : : {
1298 : : uint32 hashvalue;
1299 : : MinimalTuple tuple;
1300 : : TupleTableSlot *slot;
1301 : :
1302 [ + + ]: 1435 : if (!hashtable->batches[batchno].done)
1303 : : {
1304 : : SharedTuplestoreAccessor *inner_tuples;
1305 : 765 : Barrier *batch_barrier =
841 tgl@sss.pgh.pa.us 1306 : 765 : &hashtable->batches[batchno].shared->batch_barrier;
1307 : :
2817 andres@anarazel.de 1308 [ + - + + : 765 : switch (BarrierAttach(batch_barrier))
+ + - ]
1309 : : {
898 tmunro@postgresql.or 1310 : 231 : case PHJ_BATCH_ELECT:
1311 : :
1312 : : /* One backend allocates the hash table. */
2817 andres@anarazel.de 1313 [ + - ]: 231 : if (BarrierArriveAndWait(batch_barrier,
1314 : : WAIT_EVENT_HASH_BATCH_ELECT))
1315 : 231 : ExecParallelHashTableAlloc(hashtable, batchno);
1316 : : /* Fall through. */
1317 : :
1318 : : case PHJ_BATCH_ALLOCATE:
1319 : : /* Wait for allocation to complete. */
1320 : 231 : BarrierArriveAndWait(batch_barrier,
1321 : : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1322 : : /* Fall through. */
1323 : :
898 tmunro@postgresql.or 1324 : 247 : case PHJ_BATCH_LOAD:
1325 : : /* Start (or join in) loading tuples. */
2817 andres@anarazel.de 1326 : 247 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1327 : 247 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1328 : 247 : sts_begin_parallel_scan(inner_tuples);
1329 [ + + ]: 522226 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1330 : : &hashvalue)))
1331 : : {
2486 1332 : 521979 : ExecForceStoreMinimalTuple(tuple,
1333 : : hjstate->hj_HashTupleSlot,
1334 : : false);
1335 : 521979 : slot = hjstate->hj_HashTupleSlot;
2817 1336 : 521979 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1337 : : hashvalue);
1338 : : }
1339 : 247 : sts_end_parallel_scan(inner_tuples);
1340 : 247 : BarrierArriveAndWait(batch_barrier,
1341 : : WAIT_EVENT_HASH_BATCH_LOAD);
1342 : : /* Fall through. */
1343 : :
898 tmunro@postgresql.or 1344 : 413 : case PHJ_BATCH_PROBE:
1345 : :
1346 : : /*
1347 : : * This batch is ready to probe. Return control to
1348 : : * caller. We stay attached to batch_barrier so that the
1349 : : * hash table stays alive until everyone's finished
1350 : : * probing it, but no participant is allowed to wait at
1351 : : * this barrier again (or else a deadlock could occur).
1352 : : * All attached participants must eventually detach from
1353 : : * the barrier and one worker must advance the phase so
1354 : : * that the final phase is reached.
1355 : : */
2817 andres@anarazel.de 1356 : 413 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1357 : 413 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1358 : :
1359 : 413 : return true;
890 tmunro@postgresql.or 1360 : 3 : case PHJ_BATCH_SCAN:
1361 : :
1362 : : /*
1363 : : * In principle, we could help scan for unmatched tuples,
1364 : : * since that phase is already underway (the thing we
1365 : : * can't do under current deadlock-avoidance rules is wait
1366 : : * for others to arrive at PHJ_BATCH_SCAN, because
1367 : : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1368 : : * got here without waiting). That is not yet done. For
1369 : : * now, we just detach and go around again. We have to
1370 : : * use ExecHashTableDetachBatch() because there's a small
1371 : : * chance we'll be the last to detach, and then we're
1372 : : * responsible for freeing memory.
1373 : : */
1374 : 3 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1375 : 3 : hashtable->batches[batchno].done = true;
1376 : 3 : ExecHashTableDetachBatch(hashtable);
1377 : 3 : break;
1378 : :
898 1379 : 349 : case PHJ_BATCH_FREE:
1380 : :
1381 : : /*
1382 : : * Already done. Detach and go around again (if any
1383 : : * remain).
1384 : : */
2817 andres@anarazel.de 1385 : 349 : BarrierDetach(batch_barrier);
1386 : 349 : hashtable->batches[batchno].done = true;
1387 : 349 : hashtable->curbatch = -1;
1388 : 349 : break;
1389 : :
2817 andres@anarazel.de 1390 :UBC 0 : default:
1391 [ # # ]: 0 : elog(ERROR, "unexpected batch phase %d",
1392 : : BarrierPhase(batch_barrier));
1393 : : }
1394 : : }
2817 andres@anarazel.de 1395 :CBC 1022 : batchno = (batchno + 1) % hashtable->nbatch;
1396 [ + + ]: 1022 : } while (batchno != start_batchno);
1397 : :
1398 : 198 : return false;
1399 : : }
1400 : :
1401 : : /*
1402 : : * ExecHashJoinSaveTuple
1403 : : * save a tuple to a batch file.
1404 : : *
1405 : : * The data recorded in the file for each tuple is its hash value,
1406 : : * then the tuple in MinimalTuple format.
1407 : : *
1408 : : * fileptr points to a batch file in one of the hashtable arrays.
1409 : : *
1410 : : * The batch files (and their buffers) are allocated in the spill context
1411 : : * created for the hashtable.
1412 : : */
1413 : : void
6666 tgl@sss.pgh.pa.us 1414 : 2428851 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1415 : : BufFile **fileptr, HashJoinTable hashtable)
1416 : : {
7266 bruce@momjian.us 1417 : 2428851 : BufFile *file = *fileptr;
1418 : :
1419 : : /*
1420 : : * The batch file is lazily created. If this is the first tuple written to
1421 : : * this batch, the batch file is created and its buffer is allocated in
1422 : : * the spillCxt context, NOT in the batchCxt.
1423 : : *
1424 : : * During the build phase, buffered files are created for inner batches.
1425 : : * Each batch's buffered file is closed (and its buffer freed) after the
1426 : : * batch is loaded into memory during the outer side scan. Therefore, it
1427 : : * is necessary to allocate the batch file buffer in a memory context
1428 : : * which outlives the batch itself.
1429 : : *
1430 : : * Also, we use spillCxt instead of hashCxt for a better accounting of the
1431 : : * spilling memory consumption.
1432 : : */
7489 tgl@sss.pgh.pa.us 1433 [ + + ]: 2428851 : if (file == NULL)
1434 : : {
841 1435 : 954 : MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1436 : :
6666 1437 : 954 : file = BufFileCreateTemp(false);
7489 1438 : 954 : *fileptr = file;
1439 : :
841 tomas.vondra@postgre 1440 : 954 : MemoryContextSwitchTo(oldctx);
1441 : : }
1442 : :
1003 peter@eisentraut.org 1443 : 2428851 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1444 : 2428851 : BufFileWrite(file, tuple, tuple->t_len);
10651 scrappy@hub.org 1445 : 2428851 : }
1446 : :
1447 : : /*
1448 : : * ExecHashJoinGetSavedTuple
1449 : : * read the next tuple from a batch file. Return NULL if no more.
1450 : : *
1451 : : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1452 : : * itself is stored in the given slot.
1453 : : */
1454 : : static TupleTableSlot *
7489 tgl@sss.pgh.pa.us 1455 : 2429805 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1456 : : BufFile *file,
1457 : : uint32 *hashvalue,
1458 : : TupleTableSlot *tupleSlot)
1459 : : {
1460 : : uint32 header[2];
1461 : : size_t nread;
1462 : : MinimalTuple tuple;
1463 : :
1464 : : /*
1465 : : * We check for interrupts here because this is typically taken as an
1466 : : * alternative code path to an ExecProcNode() call, which would include
1467 : : * such a check.
1468 : : */
3125 1469 [ + + ]: 2429805 : CHECK_FOR_INTERRUPTS();
1470 : :
1471 : : /*
1472 : : * Since both the hash value and the MinimalTuple length word are uint32,
1473 : : * we can read them both in one BufFileRead() call without any type
1474 : : * cheating.
1475 : : */
964 peter@eisentraut.org 1476 : 2429805 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
6912 bruce@momjian.us 1477 [ + + ]: 2429805 : if (nread == 0) /* end of file */
1478 : : {
7011 tgl@sss.pgh.pa.us 1479 : 954 : ExecClearTuple(tupleSlot);
1480 : 954 : return NULL;
1481 : : }
1482 : 2428851 : *hashvalue = header[0];
1483 : 2428851 : tuple = (MinimalTuple) palloc(header[1]);
1484 : 2428851 : tuple->t_len = header[1];
964 peter@eisentraut.org 1485 : 2428851 : BufFileReadExact(file,
1486 : : (char *) tuple + sizeof(uint32),
1487 : 2428851 : header[1] - sizeof(uint32));
2486 andres@anarazel.de 1488 : 2428851 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1489 : 2428851 : return tupleSlot;
1490 : : }
1491 : :
1492 : :
1493 : : void
5535 tgl@sss.pgh.pa.us 1494 : 1922 : ExecReScanHashJoin(HashJoinState *node)
1495 : : {
1157 1496 : 1922 : PlanState *outerPlan = outerPlanState(node);
1497 : 1922 : PlanState *innerPlan = innerPlanState(node);
1498 : :
1499 : : /*
1500 : : * In a multi-batch join, we currently have to do rescans the hard way,
1501 : : * primarily because batch temp files may have already been released. But
1502 : : * if it's a single-batch join, and there is no parameter change for the
1503 : : * inner subnode, then we can just re-use the existing hash table without
1504 : : * rebuilding it.
1505 : : */
7222 1506 [ + + ]: 1922 : if (node->hj_HashTable != NULL)
1507 : : {
1508 [ + - ]: 1676 : if (node->hj_HashTable->nbatch == 1 &&
1157 1509 [ + + ]: 1676 : innerPlan->chgParam == NULL)
1510 : : {
1511 : : /*
1512 : : * Okay to reuse the hash table; needn't rescan inner, either.
1513 : : *
1514 : : * However, if it's a right/right-anti/right-semi/full join, we'd
1515 : : * better reset the inner-tuple match flags contained in the
1516 : : * table.
1517 : : */
271 rguo@postgresql.org 1518 [ + + + + ]: 857 : if (HJ_FILL_INNER(node) || node->js.jointype == JOIN_RIGHT_SEMI)
5364 tgl@sss.pgh.pa.us 1519 : 33 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1520 : :
1521 : : /*
1522 : : * Also, we need to reset our state about the emptiness of the
1523 : : * outer relation, so that the new scan of the outer will update
1524 : : * it correctly if it turns out to be empty this time. (There's no
1525 : : * harm in clearing it now because ExecHashJoin won't need the
1526 : : * info. In the other cases, where the hash table doesn't exist
1527 : : * or we are destroying it, we leave this state alone because
1528 : : * ExecHashJoin will need it the first time through.)
1529 : : */
7222 1530 : 857 : node->hj_OuterNotEmpty = false;
1531 : :
1532 : : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
5364 1533 : 857 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1534 : : }
1535 : : else
1536 : : {
1537 : : /* must destroy and rebuild hash table */
1157 1538 : 819 : HashState *hashNode = castNode(HashState, innerPlan);
1539 : :
1974 1540 [ - + ]: 819 : Assert(hashNode->hashtable == node->hj_HashTable);
1541 : : /* accumulate stats from old hash table, if wanted */
1542 : : /* (this should match ExecShutdownHash) */
1543 [ - + - - ]: 819 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1974 tgl@sss.pgh.pa.us 1544 :UBC 0 : hashNode->hinstrument = (HashInstrumentation *)
1545 : 0 : palloc0(sizeof(HashInstrumentation));
1974 tgl@sss.pgh.pa.us 1546 [ - + ]:CBC 819 : if (hashNode->hinstrument)
1974 tgl@sss.pgh.pa.us 1547 :UBC 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1548 : : hashNode->hashtable);
1549 : : /* for safety, be sure to clear child plan node's pointer too */
1974 tgl@sss.pgh.pa.us 1550 :CBC 819 : hashNode->hashtable = NULL;
1551 : :
7222 1552 : 819 : ExecHashTableDestroy(node->hj_HashTable);
1553 : 819 : node->hj_HashTable = NULL;
5364 1554 : 819 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1555 : :
1556 : : /*
1557 : : * if chgParam of subnode is not null then plan will be re-scanned
1558 : : * by first ExecProcNode.
1559 : : */
1157 1560 [ - + ]: 819 : if (innerPlan->chgParam == NULL)
1157 tgl@sss.pgh.pa.us 1561 :UBC 0 : ExecReScan(innerPlan);
1562 : : }
1563 : : }
1564 : :
1565 : : /* Always reset intra-tuple state */
7489 tgl@sss.pgh.pa.us 1566 :CBC 1922 : node->hj_CurHashValue = 0;
8311 1567 : 1922 : node->hj_CurBucketNo = 0;
6013 1568 : 1922 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7913 neilc@samurai.com 1569 : 1922 : node->hj_CurTuple = NULL;
1570 : :
8311 tgl@sss.pgh.pa.us 1571 : 1922 : node->hj_MatchedOuter = false;
7222 1572 : 1922 : node->hj_FirstOuterTupleSlot = NULL;
1573 : :
1574 : : /*
1575 : : * if chgParam of subnode is not null then plan will be re-scanned by
1576 : : * first ExecProcNode.
1577 : : */
1157 1578 [ + + ]: 1922 : if (outerPlan->chgParam == NULL)
1579 : 1313 : ExecReScan(outerPlan);
10067 vadim4o@yahoo.com 1580 : 1922 : }
1581 : :
1582 : : void
2817 andres@anarazel.de 1583 : 14663 : ExecShutdownHashJoin(HashJoinState *node)
1584 : : {
1585 [ + + ]: 14663 : if (node->hj_HashTable)
1586 : : {
1587 : : /*
1588 : : * Detach from shared state before DSM memory goes away. This makes
1589 : : * sure that we don't have any pointers into DSM memory by the time
1590 : : * ExecEndHashJoin runs.
1591 : : */
1592 : 11336 : ExecHashTableDetachBatch(node->hj_HashTable);
1593 : 11336 : ExecHashTableDetach(node->hj_HashTable);
1594 : : }
1595 : 14663 : }
1596 : :
1597 : : static void
1598 : 93 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1599 : : {
1600 : 93 : PlanState *outerState = outerPlanState(hjstate);
1601 : 93 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1602 : 93 : HashJoinTable hashtable = hjstate->hj_HashTable;
1603 : : TupleTableSlot *slot;
1604 : : uint32 hashvalue;
1605 : : int i;
1606 : :
1607 [ - + ]: 93 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1608 : :
1609 : : /* Execute outer plan, writing all tuples to shared tuplestores. */
1610 : : for (;;)
1611 : 600012 : {
1612 : : bool isnull;
1613 : :
1614 : 600105 : slot = ExecProcNode(outerState);
1615 [ + + + + ]: 600105 : if (TupIsNull(slot))
1616 : : break;
1617 : 600012 : econtext->ecxt_outertuple = slot;
1618 : :
382 drowley@postgresql.o 1619 : 600012 : ResetExprContext(econtext);
1620 : :
1621 : 600012 : hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1622 : : econtext,
1623 : : &isnull));
1624 : :
1625 [ + - ]: 600012 : if (!isnull)
1626 : : {
1627 : : int batchno;
1628 : : int bucketno;
1629 : : bool shouldFree;
2487 andres@anarazel.de 1630 : 600012 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1631 : :
2817 1632 : 600012 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1633 : : &batchno);
1634 : 600012 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1635 : : &hashvalue, mintup);
1636 : :
2487 1637 [ + - ]: 600012 : if (shouldFree)
1638 : 600012 : heap_free_minimal_tuple(mintup);
1639 : : }
2817 1640 [ - + ]: 600012 : CHECK_FOR_INTERRUPTS();
1641 : : }
1642 : :
1643 : : /* Make sure all outer partitions are readable by any backend. */
1644 [ + + ]: 669 : for (i = 0; i < hashtable->nbatch; ++i)
1645 : 576 : sts_end_write(hashtable->batches[i].outer_tuples);
1646 : 93 : }
1647 : :
1648 : : void
1649 : 60 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1650 : : {
1651 : 60 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1652 : 60 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1653 : 60 : }
1654 : :
1655 : : void
1656 : 60 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1657 : : {
1658 : 60 : int plan_node_id = state->js.ps.plan->plan_node_id;
1659 : : HashState *hashNode;
1660 : : ParallelHashJoinState *pstate;
1661 : :
1662 : : /*
1663 : : * Disable shared hash table mode if we failed to create a real DSM
1664 : : * segment, because that means that we don't have a DSA area to work with.
1665 : : */
1666 [ - + ]: 60 : if (pcxt->seg == NULL)
2817 andres@anarazel.de 1667 :UBC 0 : return;
1668 : :
2817 andres@anarazel.de 1669 :CBC 60 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1670 : :
1671 : : /*
1672 : : * Set up the state needed to coordinate access to the shared hash
1673 : : * table(s), using the plan node ID as the toc key.
1674 : : */
1675 : 60 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1676 : 60 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1677 : :
1678 : : /*
1679 : : * Set up the shared hash join state with no batches initially.
1680 : : * ExecHashTableCreate() will prepare at least one later and set nbatch
1681 : : * and space_allowed.
1682 : : */
1683 : 60 : pstate->nbatch = 0;
1684 : 60 : pstate->space_allowed = 0;
1685 : 60 : pstate->batches = InvalidDsaPointer;
1686 : 60 : pstate->old_batches = InvalidDsaPointer;
1687 : 60 : pstate->nbuckets = 0;
1688 : 60 : pstate->growth = PHJ_GROWTH_OK;
1689 : 60 : pstate->chunk_work_queue = InvalidDsaPointer;
1690 : 60 : pg_atomic_init_u32(&pstate->distributor, 0);
1691 : 60 : pstate->nparticipants = pcxt->nworkers + 1;
1692 : 60 : pstate->total_tuples = 0;
1693 : 60 : LWLockInitialize(&pstate->lock,
1694 : : LWTRANCHE_PARALLEL_HASH_JOIN);
1695 : 60 : BarrierInit(&pstate->build_barrier, 0);
1696 : 60 : BarrierInit(&pstate->grow_batches_barrier, 0);
1697 : 60 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1698 : :
1699 : : /* Set up the space we'll use for shared temporary files. */
1700 : 60 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1701 : :
1702 : : /* Initialize the shared state in the hash node. */
1703 : 60 : hashNode = (HashState *) innerPlanState(state);
1704 : 60 : hashNode->parallel_state = pstate;
1705 : : }
1706 : :
1707 : : /* ----------------------------------------------------------------
1708 : : * ExecHashJoinReInitializeDSM
1709 : : *
1710 : : * Reset shared state before beginning a fresh scan.
1711 : : * ----------------------------------------------------------------
1712 : : */
1713 : : void
1083 pg@bowt.ie 1714 : 24 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1715 : : {
2817 andres@anarazel.de 1716 : 24 : int plan_node_id = state->js.ps.plan->plan_node_id;
1717 : : ParallelHashJoinState *pstate;
1718 : :
1719 : : /* Nothing to do if we failed to create a DSM segment. */
302 tgl@sss.pgh.pa.us 1720 [ - + ]: 24 : if (pcxt->seg == NULL)
302 tgl@sss.pgh.pa.us 1721 :UBC 0 : return;
1722 : :
302 tgl@sss.pgh.pa.us 1723 :CBC 24 : pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
1724 : :
1725 : : /*
1726 : : * It would be possible to reuse the shared hash table in single-batch
1727 : : * cases by resetting and then fast-forwarding build_barrier to
1728 : : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1729 : : * currently shared hash tables are already freed by now (by the last
1730 : : * participant to detach from the batch). We could consider keeping it
1731 : : * around for single-batch joins. We'd also need to adjust
1732 : : * finalize_plan() so that it doesn't record a dummy dependency for
1733 : : * Parallel Hash nodes, preventing the rescan optimization. For now we
1734 : : * don't try.
1735 : : */
1736 : :
1737 : : /* Detach, freeing any remaining shared memory. */
2817 andres@anarazel.de 1738 [ - + ]: 24 : if (state->hj_HashTable != NULL)
1739 : : {
2817 andres@anarazel.de 1740 :UBC 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1741 : 0 : ExecHashTableDetach(state->hj_HashTable);
1742 : : }
1743 : :
1744 : : /* Clear any shared batch files. */
2817 andres@anarazel.de 1745 :CBC 24 : SharedFileSetDeleteAll(&pstate->fileset);
1746 : :
1747 : : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1748 : 24 : BarrierInit(&pstate->build_barrier, 0);
1749 : : }
1750 : :
1751 : : void
1752 : 153 : ExecHashJoinInitializeWorker(HashJoinState *state,
1753 : : ParallelWorkerContext *pwcxt)
1754 : : {
1755 : : HashState *hashNode;
1756 : 153 : int plan_node_id = state->js.ps.plan->plan_node_id;
1757 : : ParallelHashJoinState *pstate =
841 tgl@sss.pgh.pa.us 1758 : 153 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1759 : :
1760 : : /* Attach to the space for shared temporary files. */
2817 andres@anarazel.de 1761 : 153 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1762 : :
1763 : : /* Attach to the shared state in the hash node. */
1764 : 153 : hashNode = (HashState *) innerPlanState(state);
1765 : 153 : hashNode->parallel_state = pstate;
1766 : :
1767 : 153 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1768 : 153 : }
|