Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * vacuumparallel.c
4 : : * Support routines for parallel vacuum execution.
5 : : *
6 : : * This file contains routines that are intended to support setting up, using,
7 : : * and tearing down a ParallelVacuumState.
8 : : *
9 : : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : : * with parallel worker processes. Individual indexes are processed by one
11 : : * vacuum process. ParallelVacuumState contains shared information as well as
12 : : * the memory space for storing dead items allocated in the DSA area. We
13 : : * launch parallel worker processes at the start of parallel index
14 : : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : : * parallel worker processes exit. Each time we process indexes in parallel,
16 : : * the parallel context is re-initialized so that the same DSM can be used for
17 : : * multiple passes of index bulk-deletion and index cleanup.
18 : : *
19 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 : : * Portions Copyright (c) 1994, Regents of the University of California
21 : : *
22 : : * IDENTIFICATION
23 : : * src/backend/commands/vacuumparallel.c
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : : #include "postgres.h"
28 : :
29 : : #include "access/amapi.h"
30 : : #include "access/table.h"
31 : : #include "access/xact.h"
32 : : #include "commands/progress.h"
33 : : #include "commands/vacuum.h"
34 : : #include "executor/instrument.h"
35 : : #include "optimizer/paths.h"
36 : : #include "pgstat.h"
37 : : #include "storage/bufmgr.h"
38 : : #include "storage/proc.h"
39 : : #include "tcop/tcopprot.h"
40 : : #include "utils/lsyscache.h"
41 : : #include "utils/rel.h"
42 : :
43 : : /*
44 : : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
45 : : * we don't need to worry about DSM keys conflicting with plan_node_id we can
46 : : * use small integers.
47 : : */
48 : : #define PARALLEL_VACUUM_KEY_SHARED 1
49 : : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
50 : : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
51 : : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
52 : : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
53 : :
54 : : /*
55 : : * Shared information among parallel workers. So this is allocated in the DSM
56 : : * segment.
57 : : */
58 : : typedef struct PVShared
59 : : {
60 : : /*
61 : : * Target table relid, log level (for messages about parallel workers
62 : : * launched during VACUUM VERBOSE) and query ID. These fields are not
63 : : * modified during the parallel vacuum.
64 : : */
65 : : Oid relid;
66 : : int elevel;
67 : : int64 queryid;
68 : :
69 : : /*
70 : : * Fields for both index vacuum and cleanup.
71 : : *
72 : : * reltuples is the total number of input heap tuples. We set either old
73 : : * live tuples in the index vacuum case or the new live tuples in the
74 : : * index cleanup case.
75 : : *
76 : : * estimated_count is true if reltuples is an estimated value. (Note that
77 : : * reltuples could be -1 in this case, indicating we have no idea.)
78 : : */
79 : : double reltuples;
80 : : bool estimated_count;
81 : :
82 : : /*
83 : : * In single process vacuum we could consume more memory during index
84 : : * vacuuming or cleanup apart from the memory for heap scanning. In
85 : : * parallel vacuum, since individual vacuum workers can consume memory
86 : : * equal to maintenance_work_mem, the new maintenance_work_mem for each
87 : : * worker is set such that the parallel operation doesn't consume more
88 : : * memory than single process vacuum.
89 : : */
90 : : int maintenance_work_mem_worker;
91 : :
92 : : /*
93 : : * The number of buffers each worker's Buffer Access Strategy ring should
94 : : * contain.
95 : : */
96 : : int ring_nbuffers;
97 : :
98 : : /*
99 : : * Shared vacuum cost balance. During parallel vacuum,
100 : : * VacuumSharedCostBalance points to this value and it accumulates the
101 : : * balance of each parallel vacuum worker.
102 : : */
103 : : pg_atomic_uint32 cost_balance;
104 : :
105 : : /*
106 : : * Number of active parallel workers. This is used for computing the
107 : : * minimum threshold of the vacuum cost balance before a worker sleeps for
108 : : * cost-based delay.
109 : : */
110 : : pg_atomic_uint32 active_nworkers;
111 : :
112 : : /* Counter for vacuuming and cleanup */
113 : : pg_atomic_uint32 idx;
114 : :
115 : : /* DSA handle where the TidStore lives */
116 : : dsa_handle dead_items_dsa_handle;
117 : :
118 : : /* DSA pointer to the shared TidStore */
119 : : dsa_pointer dead_items_handle;
120 : :
121 : : /* Statistics of shared dead items */
122 : : VacDeadItemsInfo dead_items_info;
123 : : } PVShared;
124 : :
125 : : /* Status used during parallel index vacuum or cleanup */
126 : : typedef enum PVIndVacStatus
127 : : {
128 : : PARALLEL_INDVAC_STATUS_INITIAL = 0,
129 : : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
130 : : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
131 : : PARALLEL_INDVAC_STATUS_COMPLETED,
132 : : } PVIndVacStatus;
133 : :
134 : : /*
135 : : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
136 : : * This includes the status of parallel index vacuum as well as index statistics.
137 : : */
138 : : typedef struct PVIndStats
139 : : {
140 : : /*
141 : : * The following two fields are set by leader process before executing
142 : : * parallel index vacuum or parallel index cleanup. These fields are not
143 : : * fixed for the entire VACUUM operation. They are only fixed for an
144 : : * individual parallel index vacuum and cleanup.
145 : : *
146 : : * parallel_workers_can_process is true if both leader and worker can
147 : : * process the index, otherwise only leader can process it.
148 : : */
149 : : PVIndVacStatus status;
150 : : bool parallel_workers_can_process;
151 : :
152 : : /*
153 : : * Individual worker or leader stores the result of index vacuum or
154 : : * cleanup.
155 : : */
156 : : bool istat_updated; /* are the stats updated? */
157 : : IndexBulkDeleteResult istat;
158 : : } PVIndStats;
159 : :
160 : : /*
161 : : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
162 : : */
163 : : struct ParallelVacuumState
164 : : {
165 : : /* NULL for worker processes */
166 : : ParallelContext *pcxt;
167 : :
168 : : /* Parent Heap Relation */
169 : : Relation heaprel;
170 : :
171 : : /* Target indexes */
172 : : Relation *indrels;
173 : : int nindexes;
174 : :
175 : : /* Shared information among parallel vacuum workers */
176 : : PVShared *shared;
177 : :
178 : : /*
179 : : * Shared index statistics among parallel vacuum workers. The array
180 : : * element is allocated for every index, even those indexes where parallel
181 : : * index vacuuming is unsafe or not worthwhile (e.g.,
182 : : * will_parallel_vacuum[] is false). During parallel vacuum,
183 : : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
184 : : * local memory at the end of parallel vacuum.
185 : : */
186 : : PVIndStats *indstats;
187 : :
188 : : /* Shared dead items space among parallel vacuum workers */
189 : : TidStore *dead_items;
190 : :
191 : : /* Points to buffer usage area in DSM */
192 : : BufferUsage *buffer_usage;
193 : :
194 : : /* Points to WAL usage area in DSM */
195 : : WalUsage *wal_usage;
196 : :
197 : : /*
198 : : * False if the index is totally unsuitable target for all parallel
199 : : * processing. For example, the index could be <
200 : : * min_parallel_index_scan_size cutoff.
201 : : */
202 : : bool *will_parallel_vacuum;
203 : :
204 : : /*
205 : : * The number of indexes that support parallel index bulk-deletion and
206 : : * parallel index cleanup respectively.
207 : : */
208 : : int nindexes_parallel_bulkdel;
209 : : int nindexes_parallel_cleanup;
210 : : int nindexes_parallel_condcleanup;
211 : :
212 : : /* Buffer access strategy used by leader process */
213 : : BufferAccessStrategy bstrategy;
214 : :
215 : : /*
216 : : * Error reporting state. The error callback is set only for workers
217 : : * processes during parallel index vacuum.
218 : : */
219 : : char *relnamespace;
220 : : char *relname;
221 : : char *indname;
222 : : PVIndVacStatus status;
223 : : };
224 : :
225 : : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
226 : : bool *will_parallel_vacuum);
227 : : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
228 : : bool vacuum);
229 : : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
230 : : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
231 : : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
232 : : PVIndStats *indstats);
233 : : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
234 : : bool vacuum);
235 : : static void parallel_vacuum_error_callback(void *arg);
236 : :
237 : : /*
238 : : * Try to enter parallel mode and create a parallel context. Then initialize
239 : : * shared memory state.
240 : : *
241 : : * On success, return parallel vacuum state. Otherwise return NULL.
242 : : */
243 : : ParallelVacuumState *
1543 akapila@postgresql.o 244 :CBC 5438 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
245 : : int nrequested_workers, int vac_work_mem,
246 : : int elevel, BufferAccessStrategy bstrategy)
247 : : {
248 : : ParallelVacuumState *pvs;
249 : : ParallelContext *pcxt;
250 : : PVShared *shared;
251 : : TidStore *dead_items;
252 : : PVIndStats *indstats;
253 : : BufferUsage *buffer_usage;
254 : : WalUsage *wal_usage;
255 : : bool *will_parallel_vacuum;
256 : : Size est_indstats_len;
257 : : Size est_shared_len;
258 : 5438 : int nindexes_mwm = 0;
259 : 5438 : int parallel_workers = 0;
260 : : int querylen;
261 : :
262 : : /*
263 : : * A parallel vacuum must be requested and there must be indexes on the
264 : : * relation
265 : : */
266 [ - + ]: 5438 : Assert(nrequested_workers >= 0);
267 [ - + ]: 5438 : Assert(nindexes > 0);
268 : :
269 : : /*
270 : : * Compute the number of parallel vacuum workers to launch
271 : : */
95 michael@paquier.xyz 272 :GNC 5438 : will_parallel_vacuum = palloc0_array(bool, nindexes);
1543 akapila@postgresql.o 273 :CBC 5438 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
274 : : nrequested_workers,
275 : : will_parallel_vacuum);
276 [ + + ]: 5438 : if (parallel_workers <= 0)
277 : : {
278 : : /* Can't perform vacuum in parallel -- return NULL */
279 : 5420 : pfree(will_parallel_vacuum);
280 : 5420 : return NULL;
281 : : }
282 : :
95 michael@paquier.xyz 283 :GNC 18 : pvs = palloc0_object(ParallelVacuumState);
1543 akapila@postgresql.o 284 :CBC 18 : pvs->indrels = indrels;
285 : 18 : pvs->nindexes = nindexes;
286 : 18 : pvs->will_parallel_vacuum = will_parallel_vacuum;
287 : 18 : pvs->bstrategy = bstrategy;
1079 andres@anarazel.de 288 : 18 : pvs->heaprel = rel;
289 : :
1543 akapila@postgresql.o 290 : 18 : EnterParallelMode();
291 : 18 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
292 : : parallel_workers);
293 [ - + ]: 18 : Assert(pcxt->nworkers > 0);
294 : 18 : pvs->pcxt = pcxt;
295 : :
296 : : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
297 : 18 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
298 : 18 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
299 : 18 : shm_toc_estimate_keys(&pcxt->estimator, 1);
300 : :
301 : : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
302 : 18 : est_shared_len = sizeof(PVShared);
303 : 18 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
304 : 18 : shm_toc_estimate_keys(&pcxt->estimator, 1);
305 : :
306 : : /*
307 : : * Estimate space for BufferUsage and WalUsage --
308 : : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
309 : : *
310 : : * If there are no extensions loaded that care, we could skip this. We
311 : : * have no way of knowing whether anyone's looking at pgBufferUsage or
312 : : * pgWalUsage, so do it unconditionally.
313 : : */
314 : 18 : shm_toc_estimate_chunk(&pcxt->estimator,
315 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
316 : 18 : shm_toc_estimate_keys(&pcxt->estimator, 1);
317 : 18 : shm_toc_estimate_chunk(&pcxt->estimator,
318 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
319 : 18 : shm_toc_estimate_keys(&pcxt->estimator, 1);
320 : :
321 : : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
322 [ + - ]: 18 : if (debug_query_string)
323 : : {
324 : 18 : querylen = strlen(debug_query_string);
325 : 18 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
326 : 18 : shm_toc_estimate_keys(&pcxt->estimator, 1);
327 : : }
328 : : else
1543 akapila@postgresql.o 329 :UBC 0 : querylen = 0; /* keep compiler quiet */
330 : :
1543 akapila@postgresql.o 331 :CBC 18 : InitializeParallelDSM(pcxt);
332 : :
333 : : /* Prepare index vacuum stats */
334 : 18 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
1305 pg@bowt.ie 335 [ + - + - : 396 : MemSet(indstats, 0, est_indstats_len);
+ - + - +
+ ]
1543 akapila@postgresql.o 336 [ + + ]: 81 : for (int i = 0; i < nindexes; i++)
337 : : {
338 : 63 : Relation indrel = indrels[i];
339 : 63 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
340 : :
341 : : /*
342 : : * Cleanup option should be either disabled, always performing in
343 : : * parallel or conditionally performing in parallel.
344 : : */
345 [ + + - + ]: 63 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
346 : : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
347 [ - + ]: 63 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
348 : :
349 [ + + ]: 63 : if (!will_parallel_vacuum[i])
350 : 3 : continue;
351 : :
352 [ + + ]: 60 : if (indrel->rd_indam->amusemaintenanceworkmem)
353 : 6 : nindexes_mwm++;
354 : :
355 : : /*
356 : : * Remember the number of indexes that support parallel operation for
357 : : * each phase.
358 : : */
359 [ + + ]: 60 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
360 : 54 : pvs->nindexes_parallel_bulkdel++;
361 [ + + ]: 60 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
362 : 12 : pvs->nindexes_parallel_cleanup++;
363 [ + + ]: 60 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
364 : 42 : pvs->nindexes_parallel_condcleanup++;
365 : : }
366 : 18 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
367 : 18 : pvs->indstats = indstats;
368 : :
369 : : /* Prepare shared information */
370 : 18 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
371 [ + - + - : 198 : MemSet(shared, 0, est_shared_len);
+ - + - +
+ ]
372 : 18 : shared->relid = RelationGetRelid(rel);
373 : 18 : shared->elevel = elevel;
531 michael@paquier.xyz 374 : 18 : shared->queryid = pgstat_get_my_query_id();
1543 akapila@postgresql.o 375 : 18 : shared->maintenance_work_mem_worker =
376 : : (nindexes_mwm > 0) ?
377 [ + + ]: 18 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
378 : : maintenance_work_mem;
408 tgl@sss.pgh.pa.us 379 : 18 : shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
380 : :
381 : : /* Prepare DSA space for dead items */
712 msawada@postgresql.o 382 : 18 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
383 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
384 : 18 : pvs->dead_items = dead_items;
385 : 18 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
386 : 18 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
387 : :
388 : : /* Use the same buffer size for all workers */
1073 drowley@postgresql.o 389 : 18 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
390 : :
1543 akapila@postgresql.o 391 : 18 : pg_atomic_init_u32(&(shared->cost_balance), 0);
392 : 18 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
393 : 18 : pg_atomic_init_u32(&(shared->idx), 0);
394 : :
395 : 18 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
396 : 18 : pvs->shared = shared;
397 : :
398 : : /*
399 : : * Allocate space for each worker's BufferUsage and WalUsage; no need to
400 : : * initialize
401 : : */
402 : 18 : buffer_usage = shm_toc_allocate(pcxt->toc,
403 : 18 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
404 : 18 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
405 : 18 : pvs->buffer_usage = buffer_usage;
406 : 18 : wal_usage = shm_toc_allocate(pcxt->toc,
407 : 18 : mul_size(sizeof(WalUsage), pcxt->nworkers));
408 : 18 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
409 : 18 : pvs->wal_usage = wal_usage;
410 : :
411 : : /* Store query string for workers */
412 [ + - ]: 18 : if (debug_query_string)
413 : : {
414 : : char *sharedquery;
415 : :
416 : 18 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
417 : 18 : memcpy(sharedquery, debug_query_string, querylen + 1);
418 : 18 : sharedquery[querylen] = '\0';
419 : 18 : shm_toc_insert(pcxt->toc,
420 : : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
421 : : }
422 : :
423 : : /* Success -- return parallel vacuum state */
424 : 18 : return pvs;
425 : : }
426 : :
427 : : /*
428 : : * Destroy the parallel context, and end parallel mode.
429 : : *
430 : : * Since writes are not allowed during parallel mode, copy the
431 : : * updated index statistics from DSM into local memory and then later use that
432 : : * to update the index statistics. One might think that we can exit from
433 : : * parallel mode, update the index statistics and then destroy parallel
434 : : * context, but that won't be safe (see ExitParallelMode).
435 : : */
436 : : void
437 : 18 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
438 : : {
439 [ - + ]: 18 : Assert(!IsParallelWorker());
440 : :
441 : : /* Copy the updated statistics */
442 [ + + ]: 81 : for (int i = 0; i < pvs->nindexes; i++)
443 : : {
444 : 63 : PVIndStats *indstats = &(pvs->indstats[i]);
445 : :
446 [ + + ]: 63 : if (indstats->istat_updated)
447 : : {
95 michael@paquier.xyz 448 :GNC 38 : istats[i] = palloc0_object(IndexBulkDeleteResult);
1543 akapila@postgresql.o 449 :CBC 38 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
450 : : }
451 : : else
452 : 25 : istats[i] = NULL;
453 : : }
454 : :
712 msawada@postgresql.o 455 : 18 : TidStoreDestroy(pvs->dead_items);
456 : :
1543 akapila@postgresql.o 457 : 18 : DestroyParallelContext(pvs->pcxt);
458 : 18 : ExitParallelMode();
459 : :
460 : 18 : pfree(pvs->will_parallel_vacuum);
461 : 18 : pfree(pvs);
462 : 18 : }
463 : :
464 : : /*
465 : : * Returns the dead items space and dead items information.
466 : : */
467 : : TidStore *
712 msawada@postgresql.o 468 : 28 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
469 : : {
470 : 28 : *dead_items_info_p = &(pvs->shared->dead_items_info);
1543 akapila@postgresql.o 471 : 28 : return pvs->dead_items;
472 : : }
473 : :
474 : : /* Forget all items in dead_items */
475 : : void
712 msawada@postgresql.o 476 : 10 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
477 : : {
478 : 10 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479 : :
480 : : /*
481 : : * Free the current tidstore and return allocated DSA segments to the
482 : : * operating system. Then we recreate the tidstore with the same max_bytes
483 : : * limitation we just used.
484 : : */
466 john.naylor@postgres 485 : 10 : TidStoreDestroy(pvs->dead_items);
712 msawada@postgresql.o 486 : 10 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
487 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
488 : :
489 : : /* Update the DSA pointer for dead_items to the new one */
466 john.naylor@postgres 490 : 10 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
491 : 10 : pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
492 : :
493 : : /* Reset the counter */
712 msawada@postgresql.o 494 : 10 : dead_items_info->num_items = 0;
495 : 10 : }
496 : :
497 : : /*
498 : : * Do parallel index bulk-deletion with parallel workers.
499 : : */
500 : : void
1543 akapila@postgresql.o 501 : 10 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
502 : : int num_index_scans)
503 : : {
504 [ - + ]: 10 : Assert(!IsParallelWorker());
505 : :
506 : : /*
507 : : * We can only provide an approximate value of num_heap_tuples, at least
508 : : * for now.
509 : : */
510 : 10 : pvs->shared->reltuples = num_table_tuples;
511 : 10 : pvs->shared->estimated_count = true;
512 : :
513 : 10 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
514 : 10 : }
515 : :
516 : : /*
517 : : * Do parallel index cleanup with parallel workers.
518 : : */
519 : : void
520 : 18 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
521 : : int num_index_scans, bool estimated_count)
522 : : {
523 [ - + ]: 18 : Assert(!IsParallelWorker());
524 : :
525 : : /*
526 : : * We can provide a better estimate of total number of surviving tuples
527 : : * (we assume indexes are more interested in that than in the number of
528 : : * nominally live tuples).
529 : : */
530 : 18 : pvs->shared->reltuples = num_table_tuples;
531 : 18 : pvs->shared->estimated_count = estimated_count;
532 : :
533 : 18 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
534 : 18 : }
535 : :
536 : : /*
537 : : * Compute the number of parallel worker processes to request. Both index
538 : : * vacuum and index cleanup can be executed with parallel workers.
539 : : * The index is eligible for parallel vacuum iff its size is greater than
540 : : * min_parallel_index_scan_size as invoking workers for very small indexes
541 : : * can hurt performance.
542 : : *
543 : : * nrequested is the number of parallel workers that user requested. If
544 : : * nrequested is 0, we compute the parallel degree based on nindexes, that is
545 : : * the number of indexes that support parallel vacuum. This function also
546 : : * sets will_parallel_vacuum to remember indexes that participate in parallel
547 : : * vacuum.
548 : : */
549 : : static int
550 : 5438 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
551 : : bool *will_parallel_vacuum)
552 : : {
553 : 5438 : int nindexes_parallel = 0;
554 : 5438 : int nindexes_parallel_bulkdel = 0;
555 : 5438 : int nindexes_parallel_cleanup = 0;
556 : : int parallel_workers;
557 : :
558 : : /*
559 : : * We don't allow performing parallel operation in standalone backend or
560 : : * when parallelism is disabled.
561 : : */
562 [ + + - + ]: 5438 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
563 : 2401 : return 0;
564 : :
565 : : /*
566 : : * Compute the number of indexes that can participate in parallel vacuum.
567 : : */
568 [ + + ]: 9905 : for (int i = 0; i < nindexes; i++)
569 : : {
570 : 6868 : Relation indrel = indrels[i];
571 : 6868 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
572 : :
573 : : /* Skip index that is not a suitable target for parallel index vacuum */
574 [ + - ]: 6868 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
575 [ + + ]: 6868 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
576 : 6798 : continue;
577 : :
578 : 70 : will_parallel_vacuum[i] = true;
579 : :
580 [ + + ]: 70 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
581 : 64 : nindexes_parallel_bulkdel++;
582 [ + + ]: 70 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
583 [ + + ]: 58 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
584 : 64 : nindexes_parallel_cleanup++;
585 : : }
586 : :
587 : 3037 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
588 : : nindexes_parallel_cleanup);
589 : :
590 : : /* The leader process takes one index */
591 : 3037 : nindexes_parallel--;
592 : :
593 : : /* No index supports parallel vacuum */
594 [ + + ]: 3037 : if (nindexes_parallel <= 0)
595 : 3019 : return 0;
596 : :
597 : : /* Compute the parallel degree */
598 : 18 : parallel_workers = (nrequested > 0) ?
599 [ + + ]: 18 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
600 : :
601 : : /* Cap by max_parallel_maintenance_workers */
602 : 18 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
603 : :
604 : 18 : return parallel_workers;
605 : : }
606 : :
607 : : /*
608 : : * Perform index vacuum or index cleanup with parallel workers. This function
609 : : * must be used by the parallel vacuum leader process.
610 : : */
611 : : static void
612 : 28 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
613 : : bool vacuum)
614 : : {
615 : : int nworkers;
616 : : PVIndVacStatus new_status;
617 : :
618 [ - + ]: 28 : Assert(!IsParallelWorker());
619 : :
620 [ + + ]: 28 : if (vacuum)
621 : : {
622 : 10 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
623 : :
624 : : /* Determine the number of parallel workers to launch */
625 : 10 : nworkers = pvs->nindexes_parallel_bulkdel;
626 : : }
627 : : else
628 : : {
629 : 18 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
630 : :
631 : : /* Determine the number of parallel workers to launch */
632 : 18 : nworkers = pvs->nindexes_parallel_cleanup;
633 : :
634 : : /* Add conditionally parallel-aware indexes if in the first time call */
635 [ + + ]: 18 : if (num_index_scans == 0)
636 : 12 : nworkers += pvs->nindexes_parallel_condcleanup;
637 : : }
638 : :
639 : : /* The leader process will participate */
640 : 28 : nworkers--;
641 : :
642 : : /*
643 : : * It is possible that parallel context is initialized with fewer workers
644 : : * than the number of indexes that need a separate worker in the current
645 : : * phase, so we need to consider it. See
646 : : * parallel_vacuum_compute_workers().
647 : : */
648 : 28 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
649 : :
650 : : /*
651 : : * Set index vacuum status and mark whether parallel vacuum worker can
652 : : * process it.
653 : : */
654 [ + + ]: 117 : for (int i = 0; i < pvs->nindexes; i++)
655 : : {
656 : 89 : PVIndStats *indstats = &(pvs->indstats[i]);
657 : :
658 [ - + ]: 89 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
659 : 89 : indstats->status = new_status;
660 : 89 : indstats->parallel_workers_can_process =
1301 661 [ + + + + ]: 173 : (pvs->will_parallel_vacuum[i] &&
1543 662 : 84 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
663 : : num_index_scans,
664 : : vacuum));
665 : : }
666 : :
667 : : /* Reset the parallel index processing and progress counters */
668 : 28 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
669 : :
670 : : /* Setup the shared cost-based vacuum delay and launch workers */
671 [ + + ]: 28 : if (nworkers > 0)
672 : : {
673 : : /* Reinitialize parallel context to relaunch parallel workers */
674 [ + + ]: 23 : if (num_index_scans > 0)
675 : 5 : ReinitializeParallelDSM(pvs->pcxt);
676 : :
677 : : /*
678 : : * Set up shared cost balance and the number of active workers for
679 : : * vacuum delay. We need to do this before launching workers as
680 : : * otherwise, they might not see the updated values for these
681 : : * parameters.
682 : : */
683 : 23 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
684 : 23 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
685 : :
686 : : /*
687 : : * The number of workers can vary between bulkdelete and cleanup
688 : : * phase.
689 : : */
690 : 23 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
691 : :
692 : 23 : LaunchParallelWorkers(pvs->pcxt);
693 : :
694 [ + - ]: 23 : if (pvs->pcxt->nworkers_launched > 0)
695 : : {
696 : : /*
697 : : * Reset the local cost values for leader backend as we have
698 : : * already accumulated the remaining balance of heap.
699 : : */
700 : 23 : VacuumCostBalance = 0;
701 : 23 : VacuumCostBalanceLocal = 0;
702 : :
703 : : /* Enable shared cost balance for leader backend */
704 : 23 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
705 : 23 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
706 : : }
707 : :
708 [ + + ]: 23 : if (vacuum)
709 [ - + ]: 10 : ereport(pvs->shared->elevel,
710 : : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
711 : : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
712 : : pvs->pcxt->nworkers_launched),
713 : : pvs->pcxt->nworkers_launched, nworkers)));
714 : : else
715 [ - + ]: 13 : ereport(pvs->shared->elevel,
716 : : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
717 : : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
718 : : pvs->pcxt->nworkers_launched),
719 : : pvs->pcxt->nworkers_launched, nworkers)));
720 : : }
721 : :
722 : : /* Vacuum the indexes that can be processed by only leader process */
723 : 28 : parallel_vacuum_process_unsafe_indexes(pvs);
724 : :
725 : : /*
726 : : * Join as a parallel worker. The leader vacuums alone processes all
727 : : * parallel-safe indexes in the case where no workers are launched.
728 : : */
729 : 28 : parallel_vacuum_process_safe_indexes(pvs);
730 : :
731 : : /*
732 : : * Next, accumulate buffer and WAL usage. (This must wait for the workers
733 : : * to finish, or we might get incomplete data.)
734 : : */
735 [ + + ]: 28 : if (nworkers > 0)
736 : : {
737 : : /* Wait for all vacuum workers to finish */
738 : 23 : WaitForParallelWorkersToFinish(pvs->pcxt);
739 : :
740 [ + + ]: 52 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
741 : 29 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
742 : : }
743 : :
744 : : /*
745 : : * Reset all index status back to initial (while checking that we have
746 : : * vacuumed all indexes).
747 : : */
748 [ + + ]: 117 : for (int i = 0; i < pvs->nindexes; i++)
749 : : {
750 : 89 : PVIndStats *indstats = &(pvs->indstats[i]);
751 : :
752 [ - + ]: 89 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
1543 akapila@postgresql.o 753 [ # # ]:UBC 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
754 : : RelationGetRelationName(pvs->indrels[i]));
755 : :
1543 akapila@postgresql.o 756 :CBC 89 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
757 : : }
758 : :
759 : : /*
760 : : * Carry the shared balance value to heap scan and disable shared costing
761 : : */
762 [ + + ]: 28 : if (VacuumSharedCostBalance)
763 : : {
764 : 23 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
765 : 23 : VacuumSharedCostBalance = NULL;
766 : 23 : VacuumActiveNWorkers = NULL;
767 : : }
768 : 28 : }
769 : :
770 : : /*
771 : : * Index vacuum/cleanup routine used by the leader process and parallel
772 : : * vacuum worker processes to vacuum the indexes in parallel.
773 : : */
774 : : static void
775 : 57 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
776 : : {
777 : : /*
778 : : * Increment the active worker count if we are able to launch any worker.
779 : : */
780 [ + + ]: 57 : if (VacuumActiveNWorkers)
781 : 52 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
782 : :
783 : : /* Loop until all indexes are vacuumed */
784 : : for (;;)
785 : 89 : {
786 : : int idx;
787 : : PVIndStats *indstats;
788 : :
789 : : /* Get an index number to process */
790 : 146 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
791 : :
792 : : /* Done for all indexes? */
793 [ + + ]: 146 : if (idx >= pvs->nindexes)
794 : 57 : break;
795 : :
796 : 89 : indstats = &(pvs->indstats[idx]);
797 : :
798 : : /*
799 : : * Skip vacuuming index that is unsafe for workers or has an
800 : : * unsuitable target for parallel index vacuum (this is vacuumed in
801 : : * parallel_vacuum_process_unsafe_indexes() by the leader).
802 : : */
803 [ + + ]: 89 : if (!indstats->parallel_workers_can_process)
804 : 25 : continue;
805 : :
806 : : /* Do vacuum or cleanup of the index */
807 : 64 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
808 : : }
809 : :
810 : : /*
811 : : * We have completed the index vacuum so decrement the active worker
812 : : * count.
813 : : */
814 [ + + ]: 57 : if (VacuumActiveNWorkers)
815 : 52 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
816 : 57 : }
817 : :
818 : : /*
819 : : * Perform parallel vacuuming of indexes in leader process.
820 : : *
821 : : * Handles index vacuuming (or index cleanup) for indexes that are not
822 : : * parallel safe. It's possible that this will vary for a given index, based
823 : : * on details like whether we're performing index cleanup right now.
824 : : *
825 : : * Also performs vacuuming of smaller indexes that fell under the size cutoff
826 : : * enforced by parallel_vacuum_compute_workers().
827 : : */
828 : : static void
829 : 28 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
830 : : {
831 [ - + ]: 28 : Assert(!IsParallelWorker());
832 : :
833 : : /*
834 : : * Increment the active worker count if we are able to launch any worker.
835 : : */
836 [ + + ]: 28 : if (VacuumActiveNWorkers)
837 : 23 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
838 : :
839 [ + + ]: 117 : for (int i = 0; i < pvs->nindexes; i++)
840 : : {
841 : 89 : PVIndStats *indstats = &(pvs->indstats[i]);
842 : :
843 : : /* Skip, indexes that are safe for workers */
844 [ + + ]: 89 : if (indstats->parallel_workers_can_process)
845 : 64 : continue;
846 : :
847 : : /* Do vacuum or cleanup of the index */
848 : 25 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
849 : : }
850 : :
851 : : /*
852 : : * We have completed the index vacuum so decrement the active worker
853 : : * count.
854 : : */
855 [ + + ]: 28 : if (VacuumActiveNWorkers)
856 : 23 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
857 : 28 : }
858 : :
859 : : /*
860 : : * Vacuum or cleanup index either by leader process or by one of the worker
861 : : * process. After vacuuming the index this function copies the index
862 : : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
863 : : * segment.
864 : : */
865 : : static void
866 : 89 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
867 : : PVIndStats *indstats)
868 : : {
869 : 89 : IndexBulkDeleteResult *istat = NULL;
870 : : IndexBulkDeleteResult *istat_res;
871 : : IndexVacuumInfo ivinfo;
872 : :
873 : : /*
874 : : * Update the pointer to the corresponding bulk-deletion result if someone
875 : : * has already updated it
876 : : */
877 [ + + ]: 89 : if (indstats->istat_updated)
878 : 26 : istat = &(indstats->istat);
879 : :
880 : 89 : ivinfo.index = indrel;
1077 pg@bowt.ie 881 : 89 : ivinfo.heaprel = pvs->heaprel;
1543 akapila@postgresql.o 882 : 89 : ivinfo.analyze_only = false;
883 : 89 : ivinfo.report_progress = false;
1521 pg@bowt.ie 884 : 89 : ivinfo.message_level = DEBUG2;
1543 akapila@postgresql.o 885 : 89 : ivinfo.estimated_count = pvs->shared->estimated_count;
886 : 89 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
887 : 89 : ivinfo.strategy = pvs->bstrategy;
888 : :
889 : : /* Update error traceback information */
890 : 89 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
891 : 89 : pvs->status = indstats->status;
892 : :
893 [ + + - ]: 89 : switch (indstats->status)
894 : : {
895 : 26 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
712 msawada@postgresql.o 896 : 26 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
897 : 26 : &pvs->shared->dead_items_info);
1543 akapila@postgresql.o 898 : 26 : break;
899 : 63 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
900 : 63 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
901 : 63 : break;
1543 akapila@postgresql.o 902 :UBC 0 : default:
903 [ # # ]: 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
904 : : indstats->status,
905 : : RelationGetRelationName(indrel));
906 : : }
907 : :
908 : : /*
909 : : * Copy the index bulk-deletion result returned from ambulkdelete and
910 : : * amvacuumcleanup to the DSM segment if it's the first cycle because they
911 : : * allocate locally and it's possible that an index will be vacuumed by a
912 : : * different vacuum process the next cycle. Copying the result normally
913 : : * happens only the first time an index is vacuumed. For any additional
914 : : * vacuum pass, we directly point to the result on the DSM segment and
915 : : * pass it to vacuum index APIs so that workers can update it directly.
916 : : *
917 : : * Since all vacuum workers write the bulk-deletion result at different
918 : : * slots we can write them without locking.
919 : : */
1543 akapila@postgresql.o 920 [ + + + + ]:CBC 89 : if (!indstats->istat_updated && istat_res != NULL)
921 : : {
922 : 38 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
923 : 38 : indstats->istat_updated = true;
924 : :
925 : : /* Free the locally-allocated bulk-deletion result */
926 : 38 : pfree(istat_res);
927 : : }
928 : :
929 : : /*
930 : : * Update the status to completed. No need to lock here since each worker
931 : : * touches different indexes.
932 : : */
933 : 89 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
934 : :
935 : : /* Reset error traceback information */
936 : 89 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
937 : 89 : pfree(pvs->indname);
938 : 89 : pvs->indname = NULL;
939 : :
940 : : /*
941 : : * Call the parallel variant of pgstat_progress_incr_param so workers can
942 : : * report progress of index vacuum to the leader.
943 : : */
978 msawada@postgresql.o 944 : 89 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
1543 akapila@postgresql.o 945 : 89 : }
946 : :
947 : : /*
948 : : * Returns false, if the given index can't participate in the next execution of
949 : : * parallel index vacuum or parallel index cleanup.
950 : : */
951 : : static bool
952 : 84 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
953 : : bool vacuum)
954 : : {
955 : : uint8 vacoptions;
956 : :
957 : 84 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
958 : :
959 : : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
960 [ + + ]: 84 : if (vacuum)
961 : 24 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
962 : :
963 : : /* Not safe, if the index does not support parallel cleanup */
964 [ + + ]: 60 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
965 [ + + ]: 48 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
966 : 6 : return false;
967 : :
968 : : /*
969 : : * Not safe, if the index supports parallel cleanup conditionally, but we
970 : : * have already processed the index (for bulkdelete). We do this to avoid
971 : : * the need to invoke workers when parallel index cleanup doesn't need to
972 : : * scan the index. See the comments for option
973 : : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
974 : : * parallel cleanup conditionally.
975 : : */
976 [ + + ]: 54 : if (num_index_scans > 0 &&
977 [ + + ]: 15 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
978 : 13 : return false;
979 : :
980 : 41 : return true;
981 : : }
982 : :
983 : : /*
984 : : * Perform work within a launched parallel process.
985 : : *
986 : : * Since parallel vacuum workers perform only index vacuum or index cleanup,
987 : : * we don't need to report progress information.
988 : : */
989 : : void
990 : 29 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
991 : : {
992 : : ParallelVacuumState pvs;
993 : : Relation rel;
994 : : Relation *indrels;
995 : : PVIndStats *indstats;
996 : : PVShared *shared;
997 : : TidStore *dead_items;
998 : : BufferUsage *buffer_usage;
999 : : WalUsage *wal_usage;
1000 : : int nindexes;
1001 : : char *sharedquery;
1002 : : ErrorContextCallback errcallback;
1003 : :
1004 : : /*
1005 : : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1006 : : * don't support parallel vacuum for autovacuum as of now.
1007 : : */
1008 [ - + ]: 29 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1009 : :
1010 [ - + ]: 29 : elog(DEBUG1, "starting parallel vacuum worker");
1011 : :
1012 : 29 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1013 : :
1014 : : /* Set debug_query_string for individual workers */
1015 : 29 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1016 : 29 : debug_query_string = sharedquery;
1017 : 29 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1018 : :
1019 : : /* Track query ID */
531 michael@paquier.xyz 1020 : 29 : pgstat_report_query_id(shared->queryid, false);
1021 : :
1022 : : /*
1023 : : * Open table. The lock mode is the same as the leader process. It's
1024 : : * okay because the lock mode does not conflict among the parallel
1025 : : * workers.
1026 : : */
1543 akapila@postgresql.o 1027 : 29 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1028 : :
1029 : : /*
1030 : : * Open all indexes. indrels are sorted in order by OID, which should be
1031 : : * matched to the leader's one.
1032 : : */
1033 : 29 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1034 [ - + ]: 29 : Assert(nindexes > 0);
1035 : :
1036 : : /*
1037 : : * Apply the desired value of maintenance_work_mem within this process.
1038 : : * Really we should use SetConfigOption() to change a GUC, but since we're
1039 : : * already in parallel mode guc.c would complain about that. Fortunately,
1040 : : * by the same token guc.c will not let any user-defined code change it.
1041 : : * So just avert your eyes while we do this:
1042 : : */
1043 [ + - ]: 29 : if (shared->maintenance_work_mem_worker > 0)
1044 : 29 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1045 : :
1046 : : /* Set index statistics */
1047 : 29 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1048 : : PARALLEL_VACUUM_KEY_INDEX_STATS,
1049 : : false);
1050 : :
1051 : : /* Find dead_items in shared memory */
712 msawada@postgresql.o 1052 : 29 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1053 : : shared->dead_items_handle);
1054 : :
1055 : : /* Set cost-based vacuum delay */
1073 dgustafsson@postgres 1056 : 29 : VacuumUpdateCosts();
1543 akapila@postgresql.o 1057 : 29 : VacuumCostBalance = 0;
1058 : 29 : VacuumCostBalanceLocal = 0;
1059 : 29 : VacuumSharedCostBalance = &(shared->cost_balance);
1060 : 29 : VacuumActiveNWorkers = &(shared->active_nworkers);
1061 : :
1062 : : /* Set parallel vacuum state */
1063 : 29 : pvs.indrels = indrels;
1064 : 29 : pvs.nindexes = nindexes;
1065 : 29 : pvs.indstats = indstats;
1066 : 29 : pvs.shared = shared;
1067 : 29 : pvs.dead_items = dead_items;
1068 : 29 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1069 : 29 : pvs.relname = pstrdup(RelationGetRelationName(rel));
1079 andres@anarazel.de 1070 : 29 : pvs.heaprel = rel;
1071 : :
1072 : : /* These fields will be filled during index vacuum or cleanup */
1543 akapila@postgresql.o 1073 : 29 : pvs.indname = NULL;
1074 : 29 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1075 : :
1076 : : /* Each parallel VACUUM worker gets its own access strategy. */
1073 drowley@postgresql.o 1077 : 58 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1078 : 29 : shared->ring_nbuffers * (BLCKSZ / 1024));
1079 : :
1080 : : /* Setup error traceback support for ereport() */
1543 akapila@postgresql.o 1081 : 29 : errcallback.callback = parallel_vacuum_error_callback;
1082 : 29 : errcallback.arg = &pvs;
1083 : 29 : errcallback.previous = error_context_stack;
1084 : 29 : error_context_stack = &errcallback;
1085 : :
1086 : : /* Prepare to track buffer usage during parallel execution */
1087 : 29 : InstrStartParallelQuery();
1088 : :
1089 : : /* Process indexes to perform vacuum/cleanup */
1090 : 29 : parallel_vacuum_process_safe_indexes(&pvs);
1091 : :
1092 : : /* Report buffer/WAL usage during parallel execution */
1093 : 29 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1094 : 29 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1095 : 29 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1096 : 29 : &wal_usage[ParallelWorkerNumber]);
1097 : :
1098 : : /* Report any remaining cost-based vacuum delay time */
397 nathan@postgresql.or 1099 [ - + ]: 29 : if (track_cost_delay_timing)
397 nathan@postgresql.or 1100 :UBC 0 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
1101 : : parallel_vacuum_worker_delay_ns);
1102 : :
712 msawada@postgresql.o 1103 :CBC 29 : TidStoreDetach(dead_items);
1104 : :
1105 : : /* Pop the error context stack */
1543 akapila@postgresql.o 1106 : 29 : error_context_stack = errcallback.previous;
1107 : :
1108 : 29 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1109 : 29 : table_close(rel, ShareUpdateExclusiveLock);
1110 : 29 : FreeAccessStrategy(pvs.bstrategy);
1111 : 29 : }
1112 : :
1113 : : /*
1114 : : * Error context callback for errors occurring during parallel index vacuum.
1115 : : * The error context messages should match the messages set in the lazy vacuum
1116 : : * error context. If you change this function, change vacuum_error_callback()
1117 : : * as well.
1118 : : */
1119 : : static void
1543 akapila@postgresql.o 1120 :UBC 0 : parallel_vacuum_error_callback(void *arg)
1121 : : {
1122 : 0 : ParallelVacuumState *errinfo = arg;
1123 : :
1124 [ # # # ]: 0 : switch (errinfo->status)
1125 : : {
1126 : 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1127 : 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1128 : : errinfo->indname,
1129 : : errinfo->relnamespace,
1130 : : errinfo->relname);
1131 : 0 : break;
1132 : 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1133 : 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1134 : : errinfo->indname,
1135 : : errinfo->relnamespace,
1136 : : errinfo->relname);
1137 : 0 : break;
1138 : 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1139 : : case PARALLEL_INDVAC_STATUS_COMPLETED:
1140 : : default:
1141 : 0 : return;
1142 : : }
1143 : : }
|