Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * vacuumparallel.c
4 : : * Support routines for parallel vacuum execution.
5 : : *
6 : : * This file contains routines that are intended to support setting up, using,
7 : : * and tearing down a ParallelVacuumState.
8 : : *
9 : : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : : * with parallel worker processes. Individual indexes are processed by one
11 : : * vacuum process. ParallelVacuumState contains shared information as well as
12 : : * the memory space for storing dead items allocated in the DSA area. We
13 : : * launch parallel worker processes at the start of parallel index
14 : : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : : * parallel worker processes exit. Each time we process indexes in parallel,
16 : : * the parallel context is re-initialized so that the same DSM can be used for
17 : : * multiple passes of index bulk-deletion and index cleanup.
18 : : *
19 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
20 : : * Portions Copyright (c) 1994, Regents of the University of California
21 : : *
22 : : * IDENTIFICATION
23 : : * src/backend/commands/vacuumparallel.c
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : : #include "postgres.h"
28 : :
29 : : #include "access/amapi.h"
30 : : #include "access/table.h"
31 : : #include "access/xact.h"
32 : : #include "commands/progress.h"
33 : : #include "commands/vacuum.h"
34 : : #include "executor/instrument.h"
35 : : #include "optimizer/paths.h"
36 : : #include "pgstat.h"
37 : : #include "storage/bufmgr.h"
38 : : #include "tcop/tcopprot.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : :
42 : : /*
43 : : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44 : : * we don't need to worry about DSM keys conflicting with plan_node_id we can
45 : : * use small integers.
46 : : */
47 : : #define PARALLEL_VACUUM_KEY_SHARED 1
48 : : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 : : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 : : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 : : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
52 : :
53 : : /*
54 : : * Shared information among parallel workers. So this is allocated in the DSM
55 : : * segment.
56 : : */
57 : : typedef struct PVShared
58 : : {
59 : : /*
60 : : * Target table relid, log level (for messages about parallel workers
61 : : * launched during VACUUM VERBOSE) and query ID. These fields are not
62 : : * modified during the parallel vacuum.
63 : : */
64 : : Oid relid;
65 : : int elevel;
66 : : int64 queryid;
67 : :
68 : : /*
69 : : * Fields for both index vacuum and cleanup.
70 : : *
71 : : * reltuples is the total number of input heap tuples. We set either old
72 : : * live tuples in the index vacuum case or the new live tuples in the
73 : : * index cleanup case.
74 : : *
75 : : * estimated_count is true if reltuples is an estimated value. (Note that
76 : : * reltuples could be -1 in this case, indicating we have no idea.)
77 : : */
78 : : double reltuples;
79 : : bool estimated_count;
80 : :
81 : : /*
82 : : * In single process vacuum we could consume more memory during index
83 : : * vacuuming or cleanup apart from the memory for heap scanning. In
84 : : * parallel vacuum, since individual vacuum workers can consume memory
85 : : * equal to maintenance_work_mem, the new maintenance_work_mem for each
86 : : * worker is set such that the parallel operation doesn't consume more
87 : : * memory than single process vacuum.
88 : : */
89 : : int maintenance_work_mem_worker;
90 : :
91 : : /*
92 : : * The number of buffers each worker's Buffer Access Strategy ring should
93 : : * contain.
94 : : */
95 : : int ring_nbuffers;
96 : :
97 : : /*
98 : : * Shared vacuum cost balance. During parallel vacuum,
99 : : * VacuumSharedCostBalance points to this value and it accumulates the
100 : : * balance of each parallel vacuum worker.
101 : : */
102 : : pg_atomic_uint32 cost_balance;
103 : :
104 : : /*
105 : : * Number of active parallel workers. This is used for computing the
106 : : * minimum threshold of the vacuum cost balance before a worker sleeps for
107 : : * cost-based delay.
108 : : */
109 : : pg_atomic_uint32 active_nworkers;
110 : :
111 : : /* Counter for vacuuming and cleanup */
112 : : pg_atomic_uint32 idx;
113 : :
114 : : /* DSA handle where the TidStore lives */
115 : : dsa_handle dead_items_dsa_handle;
116 : :
117 : : /* DSA pointer to the shared TidStore */
118 : : dsa_pointer dead_items_handle;
119 : :
120 : : /* Statistics of shared dead items */
121 : : VacDeadItemsInfo dead_items_info;
122 : : } PVShared;
123 : :
124 : : /* Status used during parallel index vacuum or cleanup */
125 : : typedef enum PVIndVacStatus
126 : : {
127 : : PARALLEL_INDVAC_STATUS_INITIAL = 0,
128 : : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
129 : : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
130 : : PARALLEL_INDVAC_STATUS_COMPLETED,
131 : : } PVIndVacStatus;
132 : :
133 : : /*
134 : : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
135 : : * This includes the status of parallel index vacuum as well as index statistics.
136 : : */
137 : : typedef struct PVIndStats
138 : : {
139 : : /*
140 : : * The following two fields are set by leader process before executing
141 : : * parallel index vacuum or parallel index cleanup. These fields are not
142 : : * fixed for the entire VACUUM operation. They are only fixed for an
143 : : * individual parallel index vacuum and cleanup.
144 : : *
145 : : * parallel_workers_can_process is true if both leader and worker can
146 : : * process the index, otherwise only leader can process it.
147 : : */
148 : : PVIndVacStatus status;
149 : : bool parallel_workers_can_process;
150 : :
151 : : /*
152 : : * Individual worker or leader stores the result of index vacuum or
153 : : * cleanup.
154 : : */
155 : : bool istat_updated; /* are the stats updated? */
156 : : IndexBulkDeleteResult istat;
157 : : } PVIndStats;
158 : :
159 : : /*
160 : : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
161 : : */
162 : : struct ParallelVacuumState
163 : : {
164 : : /* NULL for worker processes */
165 : : ParallelContext *pcxt;
166 : :
167 : : /* Parent Heap Relation */
168 : : Relation heaprel;
169 : :
170 : : /* Target indexes */
171 : : Relation *indrels;
172 : : int nindexes;
173 : :
174 : : /* Shared information among parallel vacuum workers */
175 : : PVShared *shared;
176 : :
177 : : /*
178 : : * Shared index statistics among parallel vacuum workers. The array
179 : : * element is allocated for every index, even those indexes where parallel
180 : : * index vacuuming is unsafe or not worthwhile (e.g.,
181 : : * will_parallel_vacuum[] is false). During parallel vacuum,
182 : : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
183 : : * local memory at the end of parallel vacuum.
184 : : */
185 : : PVIndStats *indstats;
186 : :
187 : : /* Shared dead items space among parallel vacuum workers */
188 : : TidStore *dead_items;
189 : :
190 : : /* Points to buffer usage area in DSM */
191 : : BufferUsage *buffer_usage;
192 : :
193 : : /* Points to WAL usage area in DSM */
194 : : WalUsage *wal_usage;
195 : :
196 : : /*
197 : : * False if the index is totally unsuitable target for all parallel
198 : : * processing. For example, the index could be <
199 : : * min_parallel_index_scan_size cutoff.
200 : : */
201 : : bool *will_parallel_vacuum;
202 : :
203 : : /*
204 : : * The number of indexes that support parallel index bulk-deletion and
205 : : * parallel index cleanup respectively.
206 : : */
207 : : int nindexes_parallel_bulkdel;
208 : : int nindexes_parallel_cleanup;
209 : : int nindexes_parallel_condcleanup;
210 : :
211 : : /* Buffer access strategy used by leader process */
212 : : BufferAccessStrategy bstrategy;
213 : :
214 : : /*
215 : : * Error reporting state. The error callback is set only for workers
216 : : * processes during parallel index vacuum.
217 : : */
218 : : char *relnamespace;
219 : : char *relname;
220 : : char *indname;
221 : : PVIndVacStatus status;
222 : : };
223 : :
224 : : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
225 : : bool *will_parallel_vacuum);
226 : : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
227 : : bool vacuum);
228 : : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
229 : : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
230 : : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
231 : : PVIndStats *indstats);
232 : : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
233 : : bool vacuum);
234 : : static void parallel_vacuum_error_callback(void *arg);
235 : :
236 : : /*
237 : : * Try to enter parallel mode and create a parallel context. Then initialize
238 : : * shared memory state.
239 : : *
240 : : * On success, return parallel vacuum state. Otherwise return NULL.
241 : : */
242 : : ParallelVacuumState *
1353 akapila@postgresql.o 243 :CBC 5131 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
244 : : int nrequested_workers, int vac_work_mem,
245 : : int elevel, BufferAccessStrategy bstrategy)
246 : : {
247 : : ParallelVacuumState *pvs;
248 : : ParallelContext *pcxt;
249 : : PVShared *shared;
250 : : TidStore *dead_items;
251 : : PVIndStats *indstats;
252 : : BufferUsage *buffer_usage;
253 : : WalUsage *wal_usage;
254 : : bool *will_parallel_vacuum;
255 : : Size est_indstats_len;
256 : : Size est_shared_len;
257 : 5131 : int nindexes_mwm = 0;
258 : 5131 : int parallel_workers = 0;
259 : : int querylen;
260 : :
261 : : /*
262 : : * A parallel vacuum must be requested and there must be indexes on the
263 : : * relation
264 : : */
265 [ - + ]: 5131 : Assert(nrequested_workers >= 0);
266 [ - + ]: 5131 : Assert(nindexes > 0);
267 : :
268 : : /*
269 : : * Compute the number of parallel vacuum workers to launch
270 : : */
271 : 5131 : will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
272 : 5131 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
273 : : nrequested_workers,
274 : : will_parallel_vacuum);
275 [ + + ]: 5131 : if (parallel_workers <= 0)
276 : : {
277 : : /* Can't perform vacuum in parallel -- return NULL */
278 : 5114 : pfree(will_parallel_vacuum);
279 : 5114 : return NULL;
280 : : }
281 : :
282 : 17 : pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
283 : 17 : pvs->indrels = indrels;
284 : 17 : pvs->nindexes = nindexes;
285 : 17 : pvs->will_parallel_vacuum = will_parallel_vacuum;
286 : 17 : pvs->bstrategy = bstrategy;
889 andres@anarazel.de 287 : 17 : pvs->heaprel = rel;
288 : :
1353 akapila@postgresql.o 289 : 17 : EnterParallelMode();
290 : 17 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
291 : : parallel_workers);
292 [ - + ]: 17 : Assert(pcxt->nworkers > 0);
293 : 17 : pvs->pcxt = pcxt;
294 : :
295 : : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
296 : 17 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
297 : 17 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
298 : 17 : shm_toc_estimate_keys(&pcxt->estimator, 1);
299 : :
300 : : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
301 : 17 : est_shared_len = sizeof(PVShared);
302 : 17 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
303 : 17 : shm_toc_estimate_keys(&pcxt->estimator, 1);
304 : :
305 : : /*
306 : : * Estimate space for BufferUsage and WalUsage --
307 : : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
308 : : *
309 : : * If there are no extensions loaded that care, we could skip this. We
310 : : * have no way of knowing whether anyone's looking at pgBufferUsage or
311 : : * pgWalUsage, so do it unconditionally.
312 : : */
313 : 17 : shm_toc_estimate_chunk(&pcxt->estimator,
314 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
315 : 17 : shm_toc_estimate_keys(&pcxt->estimator, 1);
316 : 17 : shm_toc_estimate_chunk(&pcxt->estimator,
317 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
318 : 17 : shm_toc_estimate_keys(&pcxt->estimator, 1);
319 : :
320 : : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
321 [ + - ]: 17 : if (debug_query_string)
322 : : {
323 : 17 : querylen = strlen(debug_query_string);
324 : 17 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
325 : 17 : shm_toc_estimate_keys(&pcxt->estimator, 1);
326 : : }
327 : : else
1353 akapila@postgresql.o 328 :UBC 0 : querylen = 0; /* keep compiler quiet */
329 : :
1353 akapila@postgresql.o 330 :CBC 17 : InitializeParallelDSM(pcxt);
331 : :
332 : : /* Prepare index vacuum stats */
333 : 17 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
1115 pg@bowt.ie 334 [ + - + - : 383 : MemSet(indstats, 0, est_indstats_len);
+ - + - +
+ ]
1353 akapila@postgresql.o 335 [ + + ]: 78 : for (int i = 0; i < nindexes; i++)
336 : : {
337 : 61 : Relation indrel = indrels[i];
338 : 61 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
339 : :
340 : : /*
341 : : * Cleanup option should be either disabled, always performing in
342 : : * parallel or conditionally performing in parallel.
343 : : */
344 [ + + - + ]: 61 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
345 : : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
346 [ - + ]: 61 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
347 : :
348 [ + + ]: 61 : if (!will_parallel_vacuum[i])
349 : 3 : continue;
350 : :
351 [ + + ]: 58 : if (indrel->rd_indam->amusemaintenanceworkmem)
352 : 6 : nindexes_mwm++;
353 : :
354 : : /*
355 : : * Remember the number of indexes that support parallel operation for
356 : : * each phase.
357 : : */
358 [ + + ]: 58 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
359 : 52 : pvs->nindexes_parallel_bulkdel++;
360 [ + + ]: 58 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
361 : 12 : pvs->nindexes_parallel_cleanup++;
362 [ + + ]: 58 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
363 : 40 : pvs->nindexes_parallel_condcleanup++;
364 : : }
365 : 17 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
366 : 17 : pvs->indstats = indstats;
367 : :
368 : : /* Prepare shared information */
369 : 17 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
370 [ + - + - : 187 : MemSet(shared, 0, est_shared_len);
+ - + - +
+ ]
371 : 17 : shared->relid = RelationGetRelid(rel);
372 : 17 : shared->elevel = elevel;
341 michael@paquier.xyz 373 : 17 : shared->queryid = pgstat_get_my_query_id();
1353 akapila@postgresql.o 374 : 17 : shared->maintenance_work_mem_worker =
375 : : (nindexes_mwm > 0) ?
376 [ + + ]: 17 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
377 : : maintenance_work_mem;
218 tgl@sss.pgh.pa.us 378 : 17 : shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
379 : :
380 : : /* Prepare DSA space for dead items */
522 msawada@postgresql.o 381 : 17 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
382 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
383 : 17 : pvs->dead_items = dead_items;
384 : 17 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
385 : 17 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
386 : :
387 : : /* Use the same buffer size for all workers */
883 drowley@postgresql.o 388 : 17 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
389 : :
1353 akapila@postgresql.o 390 : 17 : pg_atomic_init_u32(&(shared->cost_balance), 0);
391 : 17 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
392 : 17 : pg_atomic_init_u32(&(shared->idx), 0);
393 : :
394 : 17 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
395 : 17 : pvs->shared = shared;
396 : :
397 : : /*
398 : : * Allocate space for each worker's BufferUsage and WalUsage; no need to
399 : : * initialize
400 : : */
401 : 17 : buffer_usage = shm_toc_allocate(pcxt->toc,
402 : 17 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
403 : 17 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
404 : 17 : pvs->buffer_usage = buffer_usage;
405 : 17 : wal_usage = shm_toc_allocate(pcxt->toc,
406 : 17 : mul_size(sizeof(WalUsage), pcxt->nworkers));
407 : 17 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
408 : 17 : pvs->wal_usage = wal_usage;
409 : :
410 : : /* Store query string for workers */
411 [ + - ]: 17 : if (debug_query_string)
412 : : {
413 : : char *sharedquery;
414 : :
415 : 17 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
416 : 17 : memcpy(sharedquery, debug_query_string, querylen + 1);
417 : 17 : sharedquery[querylen] = '\0';
418 : 17 : shm_toc_insert(pcxt->toc,
419 : : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
420 : : }
421 : :
422 : : /* Success -- return parallel vacuum state */
423 : 17 : return pvs;
424 : : }
425 : :
426 : : /*
427 : : * Destroy the parallel context, and end parallel mode.
428 : : *
429 : : * Since writes are not allowed during parallel mode, copy the
430 : : * updated index statistics from DSM into local memory and then later use that
431 : : * to update the index statistics. One might think that we can exit from
432 : : * parallel mode, update the index statistics and then destroy parallel
433 : : * context, but that won't be safe (see ExitParallelMode).
434 : : */
435 : : void
436 : 17 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
437 : : {
438 [ - + ]: 17 : Assert(!IsParallelWorker());
439 : :
440 : : /* Copy the updated statistics */
441 [ + + ]: 78 : for (int i = 0; i < pvs->nindexes; i++)
442 : : {
443 : 61 : PVIndStats *indstats = &(pvs->indstats[i]);
444 : :
445 [ + + ]: 61 : if (indstats->istat_updated)
446 : : {
447 : 41 : istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
448 : 41 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
449 : : }
450 : : else
451 : 20 : istats[i] = NULL;
452 : : }
453 : :
522 msawada@postgresql.o 454 : 17 : TidStoreDestroy(pvs->dead_items);
455 : :
1353 akapila@postgresql.o 456 : 17 : DestroyParallelContext(pvs->pcxt);
457 : 17 : ExitParallelMode();
458 : :
459 : 17 : pfree(pvs->will_parallel_vacuum);
460 : 17 : pfree(pvs);
461 : 17 : }
462 : :
463 : : /*
464 : : * Returns the dead items space and dead items information.
465 : : */
466 : : TidStore *
522 msawada@postgresql.o 467 : 17 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
468 : : {
469 : 17 : *dead_items_info_p = &(pvs->shared->dead_items_info);
1353 akapila@postgresql.o 470 : 17 : return pvs->dead_items;
471 : : }
472 : :
473 : : /* Forget all items in dead_items */
474 : : void
522 msawada@postgresql.o 475 : 15 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
476 : : {
477 : 15 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
478 : :
479 : : /*
480 : : * Free the current tidstore and return allocated DSA segments to the
481 : : * operating system. Then we recreate the tidstore with the same max_bytes
482 : : * limitation we just used.
483 : : */
276 john.naylor@postgres 484 : 15 : TidStoreDestroy(pvs->dead_items);
522 msawada@postgresql.o 485 : 15 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
486 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
487 : :
488 : : /* Update the DSA pointer for dead_items to the new one */
276 john.naylor@postgres 489 : 15 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
490 : 15 : pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
491 : :
492 : : /* Reset the counter */
522 msawada@postgresql.o 493 : 15 : dead_items_info->num_items = 0;
494 : 15 : }
495 : :
496 : : /*
497 : : * Do parallel index bulk-deletion with parallel workers.
498 : : */
499 : : void
1353 akapila@postgresql.o 500 : 15 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
501 : : int num_index_scans)
502 : : {
503 [ - + ]: 15 : Assert(!IsParallelWorker());
504 : :
505 : : /*
506 : : * We can only provide an approximate value of num_heap_tuples, at least
507 : : * for now.
508 : : */
509 : 15 : pvs->shared->reltuples = num_table_tuples;
510 : 15 : pvs->shared->estimated_count = true;
511 : :
512 : 15 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
513 : 15 : }
514 : :
515 : : /*
516 : : * Do parallel index cleanup with parallel workers.
517 : : */
518 : : void
519 : 17 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
520 : : int num_index_scans, bool estimated_count)
521 : : {
522 [ - + ]: 17 : Assert(!IsParallelWorker());
523 : :
524 : : /*
525 : : * We can provide a better estimate of total number of surviving tuples
526 : : * (we assume indexes are more interested in that than in the number of
527 : : * nominally live tuples).
528 : : */
529 : 17 : pvs->shared->reltuples = num_table_tuples;
530 : 17 : pvs->shared->estimated_count = estimated_count;
531 : :
532 : 17 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
533 : 17 : }
534 : :
535 : : /*
536 : : * Compute the number of parallel worker processes to request. Both index
537 : : * vacuum and index cleanup can be executed with parallel workers.
538 : : * The index is eligible for parallel vacuum iff its size is greater than
539 : : * min_parallel_index_scan_size as invoking workers for very small indexes
540 : : * can hurt performance.
541 : : *
542 : : * nrequested is the number of parallel workers that user requested. If
543 : : * nrequested is 0, we compute the parallel degree based on nindexes, that is
544 : : * the number of indexes that support parallel vacuum. This function also
545 : : * sets will_parallel_vacuum to remember indexes that participate in parallel
546 : : * vacuum.
547 : : */
548 : : static int
549 : 5131 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
550 : : bool *will_parallel_vacuum)
551 : : {
552 : 5131 : int nindexes_parallel = 0;
553 : 5131 : int nindexes_parallel_bulkdel = 0;
554 : 5131 : int nindexes_parallel_cleanup = 0;
555 : : int parallel_workers;
556 : :
557 : : /*
558 : : * We don't allow performing parallel operation in standalone backend or
559 : : * when parallelism is disabled.
560 : : */
561 [ + + - + ]: 5131 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
562 : 2352 : return 0;
563 : :
564 : : /*
565 : : * Compute the number of indexes that can participate in parallel vacuum.
566 : : */
567 [ + + ]: 9058 : for (int i = 0; i < nindexes; i++)
568 : : {
569 : 6279 : Relation indrel = indrels[i];
570 : 6279 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
571 : :
572 : : /* Skip index that is not a suitable target for parallel index vacuum */
573 [ + - ]: 6279 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
574 [ + + ]: 6279 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
575 : 6212 : continue;
576 : :
577 : 67 : will_parallel_vacuum[i] = true;
578 : :
579 [ + + ]: 67 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
580 : 61 : nindexes_parallel_bulkdel++;
581 [ + + ]: 67 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
582 [ + + ]: 55 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
583 : 61 : nindexes_parallel_cleanup++;
584 : : }
585 : :
586 : 2779 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
587 : : nindexes_parallel_cleanup);
588 : :
589 : : /* The leader process takes one index */
590 : 2779 : nindexes_parallel--;
591 : :
592 : : /* No index supports parallel vacuum */
593 [ + + ]: 2779 : if (nindexes_parallel <= 0)
594 : 2762 : return 0;
595 : :
596 : : /* Compute the parallel degree */
597 : 17 : parallel_workers = (nrequested > 0) ?
598 [ + + ]: 17 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
599 : :
600 : : /* Cap by max_parallel_maintenance_workers */
601 : 17 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
602 : :
603 : 17 : return parallel_workers;
604 : : }
605 : :
606 : : /*
607 : : * Perform index vacuum or index cleanup with parallel workers. This function
608 : : * must be used by the parallel vacuum leader process.
609 : : */
610 : : static void
611 : 32 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
612 : : bool vacuum)
613 : : {
614 : : int nworkers;
615 : : PVIndVacStatus new_status;
616 : :
617 [ - + ]: 32 : Assert(!IsParallelWorker());
618 : :
619 [ + + ]: 32 : if (vacuum)
620 : : {
621 : 15 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
622 : :
623 : : /* Determine the number of parallel workers to launch */
624 : 15 : nworkers = pvs->nindexes_parallel_bulkdel;
625 : : }
626 : : else
627 : : {
628 : 17 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
629 : :
630 : : /* Determine the number of parallel workers to launch */
631 : 17 : nworkers = pvs->nindexes_parallel_cleanup;
632 : :
633 : : /* Add conditionally parallel-aware indexes if in the first time call */
634 [ + + ]: 17 : if (num_index_scans == 0)
635 : 10 : nworkers += pvs->nindexes_parallel_condcleanup;
636 : : }
637 : :
638 : : /* The leader process will participate */
639 : 32 : nworkers--;
640 : :
641 : : /*
642 : : * It is possible that parallel context is initialized with fewer workers
643 : : * than the number of indexes that need a separate worker in the current
644 : : * phase, so we need to consider it. See
645 : : * parallel_vacuum_compute_workers().
646 : : */
647 : 32 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
648 : :
649 : : /*
650 : : * Set index vacuum status and mark whether parallel vacuum worker can
651 : : * process it.
652 : : */
653 [ + + ]: 126 : for (int i = 0; i < pvs->nindexes; i++)
654 : : {
655 : 94 : PVIndStats *indstats = &(pvs->indstats[i]);
656 : :
657 [ - + ]: 94 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
658 : 94 : indstats->status = new_status;
659 : 94 : indstats->parallel_workers_can_process =
1111 660 [ + + + + ]: 182 : (pvs->will_parallel_vacuum[i] &&
1353 661 : 88 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
662 : : num_index_scans,
663 : : vacuum));
664 : : }
665 : :
666 : : /* Reset the parallel index processing and progress counters */
667 : 32 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
668 : :
669 : : /* Setup the shared cost-based vacuum delay and launch workers */
670 [ + + ]: 32 : if (nworkers > 0)
671 : : {
672 : : /* Reinitialize parallel context to relaunch parallel workers */
673 [ + + ]: 25 : if (num_index_scans > 0)
674 : 8 : ReinitializeParallelDSM(pvs->pcxt);
675 : :
676 : : /*
677 : : * Set up shared cost balance and the number of active workers for
678 : : * vacuum delay. We need to do this before launching workers as
679 : : * otherwise, they might not see the updated values for these
680 : : * parameters.
681 : : */
682 : 25 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
683 : 25 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
684 : :
685 : : /*
686 : : * The number of workers can vary between bulkdelete and cleanup
687 : : * phase.
688 : : */
689 : 25 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
690 : :
691 : 25 : LaunchParallelWorkers(pvs->pcxt);
692 : :
693 [ + - ]: 25 : if (pvs->pcxt->nworkers_launched > 0)
694 : : {
695 : : /*
696 : : * Reset the local cost values for leader backend as we have
697 : : * already accumulated the remaining balance of heap.
698 : : */
699 : 25 : VacuumCostBalance = 0;
700 : 25 : VacuumCostBalanceLocal = 0;
701 : :
702 : : /* Enable shared cost balance for leader backend */
703 : 25 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
704 : 25 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
705 : : }
706 : :
707 [ + + ]: 25 : if (vacuum)
708 [ - + ]: 15 : ereport(pvs->shared->elevel,
709 : : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
710 : : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
711 : : pvs->pcxt->nworkers_launched),
712 : : pvs->pcxt->nworkers_launched, nworkers)));
713 : : else
714 [ - + ]: 10 : ereport(pvs->shared->elevel,
715 : : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
716 : : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
717 : : pvs->pcxt->nworkers_launched),
718 : : pvs->pcxt->nworkers_launched, nworkers)));
719 : : }
720 : :
721 : : /* Vacuum the indexes that can be processed by only leader process */
722 : 32 : parallel_vacuum_process_unsafe_indexes(pvs);
723 : :
724 : : /*
725 : : * Join as a parallel worker. The leader vacuums alone processes all
726 : : * parallel-safe indexes in the case where no workers are launched.
727 : : */
728 : 32 : parallel_vacuum_process_safe_indexes(pvs);
729 : :
730 : : /*
731 : : * Next, accumulate buffer and WAL usage. (This must wait for the workers
732 : : * to finish, or we might get incomplete data.)
733 : : */
734 [ + + ]: 32 : if (nworkers > 0)
735 : : {
736 : : /* Wait for all vacuum workers to finish */
737 : 25 : WaitForParallelWorkersToFinish(pvs->pcxt);
738 : :
739 [ + + ]: 56 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
740 : 31 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
741 : : }
742 : :
743 : : /*
744 : : * Reset all index status back to initial (while checking that we have
745 : : * vacuumed all indexes).
746 : : */
747 [ + + ]: 126 : for (int i = 0; i < pvs->nindexes; i++)
748 : : {
749 : 94 : PVIndStats *indstats = &(pvs->indstats[i]);
750 : :
751 [ - + ]: 94 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
1353 akapila@postgresql.o 752 [ # # ]:UBC 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
753 : : RelationGetRelationName(pvs->indrels[i]));
754 : :
1353 akapila@postgresql.o 755 :CBC 94 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
756 : : }
757 : :
758 : : /*
759 : : * Carry the shared balance value to heap scan and disable shared costing
760 : : */
761 [ + + ]: 32 : if (VacuumSharedCostBalance)
762 : : {
763 : 25 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
764 : 25 : VacuumSharedCostBalance = NULL;
765 : 25 : VacuumActiveNWorkers = NULL;
766 : : }
767 : 32 : }
768 : :
769 : : /*
770 : : * Index vacuum/cleanup routine used by the leader process and parallel
771 : : * vacuum worker processes to vacuum the indexes in parallel.
772 : : */
773 : : static void
774 : 63 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
775 : : {
776 : : /*
777 : : * Increment the active worker count if we are able to launch any worker.
778 : : */
779 [ + + ]: 63 : if (VacuumActiveNWorkers)
780 : 56 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
781 : :
782 : : /* Loop until all indexes are vacuumed */
783 : : for (;;)
784 : 94 : {
785 : : int idx;
786 : : PVIndStats *indstats;
787 : :
788 : : /* Get an index number to process */
789 : 157 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
790 : :
791 : : /* Done for all indexes? */
792 [ + + ]: 157 : if (idx >= pvs->nindexes)
793 : 63 : break;
794 : :
795 : 94 : indstats = &(pvs->indstats[idx]);
796 : :
797 : : /*
798 : : * Skip vacuuming index that is unsafe for workers or has an
799 : : * unsuitable target for parallel index vacuum (this is vacuumed in
800 : : * parallel_vacuum_process_unsafe_indexes() by the leader).
801 : : */
802 [ + + ]: 94 : if (!indstats->parallel_workers_can_process)
803 : 26 : continue;
804 : :
805 : : /* Do vacuum or cleanup of the index */
806 : 68 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
807 : : }
808 : :
809 : : /*
810 : : * We have completed the index vacuum so decrement the active worker
811 : : * count.
812 : : */
813 [ + + ]: 63 : if (VacuumActiveNWorkers)
814 : 56 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
815 : 63 : }
816 : :
817 : : /*
818 : : * Perform parallel vacuuming of indexes in leader process.
819 : : *
820 : : * Handles index vacuuming (or index cleanup) for indexes that are not
821 : : * parallel safe. It's possible that this will vary for a given index, based
822 : : * on details like whether we're performing index cleanup right now.
823 : : *
824 : : * Also performs vacuuming of smaller indexes that fell under the size cutoff
825 : : * enforced by parallel_vacuum_compute_workers().
826 : : */
827 : : static void
828 : 32 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
829 : : {
830 [ - + ]: 32 : Assert(!IsParallelWorker());
831 : :
832 : : /*
833 : : * Increment the active worker count if we are able to launch any worker.
834 : : */
835 [ + + ]: 32 : if (VacuumActiveNWorkers)
836 : 25 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
837 : :
838 [ + + ]: 126 : for (int i = 0; i < pvs->nindexes; i++)
839 : : {
840 : 94 : PVIndStats *indstats = &(pvs->indstats[i]);
841 : :
842 : : /* Skip, indexes that are safe for workers */
843 [ + + ]: 94 : if (indstats->parallel_workers_can_process)
844 : 68 : continue;
845 : :
846 : : /* Do vacuum or cleanup of the index */
847 : 26 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
848 : : }
849 : :
850 : : /*
851 : : * We have completed the index vacuum so decrement the active worker
852 : : * count.
853 : : */
854 [ + + ]: 32 : if (VacuumActiveNWorkers)
855 : 25 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
856 : 32 : }
857 : :
858 : : /*
859 : : * Vacuum or cleanup index either by leader process or by one of the worker
860 : : * process. After vacuuming the index this function copies the index
861 : : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
862 : : * segment.
863 : : */
864 : : static void
865 : 94 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
866 : : PVIndStats *indstats)
867 : : {
868 : 94 : IndexBulkDeleteResult *istat = NULL;
869 : : IndexBulkDeleteResult *istat_res;
870 : : IndexVacuumInfo ivinfo;
871 : :
872 : : /*
873 : : * Update the pointer to the corresponding bulk-deletion result if someone
874 : : * has already updated it
875 : : */
876 [ + + ]: 94 : if (indstats->istat_updated)
877 : 33 : istat = &(indstats->istat);
878 : :
879 : 94 : ivinfo.index = indrel;
887 pg@bowt.ie 880 : 94 : ivinfo.heaprel = pvs->heaprel;
1353 akapila@postgresql.o 881 : 94 : ivinfo.analyze_only = false;
882 : 94 : ivinfo.report_progress = false;
1331 pg@bowt.ie 883 : 94 : ivinfo.message_level = DEBUG2;
1353 akapila@postgresql.o 884 : 94 : ivinfo.estimated_count = pvs->shared->estimated_count;
885 : 94 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
886 : 94 : ivinfo.strategy = pvs->bstrategy;
887 : :
888 : : /* Update error traceback information */
889 : 94 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
890 : 94 : pvs->status = indstats->status;
891 : :
892 [ + + - ]: 94 : switch (indstats->status)
893 : : {
894 : 33 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
522 msawada@postgresql.o 895 : 33 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
896 : 33 : &pvs->shared->dead_items_info);
1353 akapila@postgresql.o 897 : 33 : break;
898 : 61 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
899 : 61 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
900 : 61 : break;
1353 akapila@postgresql.o 901 :UBC 0 : default:
902 [ # # ]: 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
903 : : indstats->status,
904 : : RelationGetRelationName(indrel));
905 : : }
906 : :
907 : : /*
908 : : * Copy the index bulk-deletion result returned from ambulkdelete and
909 : : * amvacuumcleanup to the DSM segment if it's the first cycle because they
910 : : * allocate locally and it's possible that an index will be vacuumed by a
911 : : * different vacuum process the next cycle. Copying the result normally
912 : : * happens only the first time an index is vacuumed. For any additional
913 : : * vacuum pass, we directly point to the result on the DSM segment and
914 : : * pass it to vacuum index APIs so that workers can update it directly.
915 : : *
916 : : * Since all vacuum workers write the bulk-deletion result at different
917 : : * slots we can write them without locking.
918 : : */
1353 akapila@postgresql.o 919 [ + + + + ]:CBC 94 : if (!indstats->istat_updated && istat_res != NULL)
920 : : {
921 : 41 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
922 : 41 : indstats->istat_updated = true;
923 : :
924 : : /* Free the locally-allocated bulk-deletion result */
925 : 41 : pfree(istat_res);
926 : : }
927 : :
928 : : /*
929 : : * Update the status to completed. No need to lock here since each worker
930 : : * touches different indexes.
931 : : */
932 : 94 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
933 : :
934 : : /* Reset error traceback information */
935 : 94 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
936 : 94 : pfree(pvs->indname);
937 : 94 : pvs->indname = NULL;
938 : :
939 : : /*
940 : : * Call the parallel variant of pgstat_progress_incr_param so workers can
941 : : * report progress of index vacuum to the leader.
942 : : */
788 msawada@postgresql.o 943 : 94 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
1353 akapila@postgresql.o 944 : 94 : }
945 : :
946 : : /*
947 : : * Returns false, if the given index can't participate in the next execution of
948 : : * parallel index vacuum or parallel index cleanup.
949 : : */
950 : : static bool
951 : 88 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
952 : : bool vacuum)
953 : : {
954 : : uint8 vacoptions;
955 : :
956 : 88 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
957 : :
958 : : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
959 [ + + ]: 88 : if (vacuum)
960 : 30 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
961 : :
962 : : /* Not safe, if the index does not support parallel cleanup */
963 [ + + ]: 58 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
964 [ + + ]: 46 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
965 : 6 : return false;
966 : :
967 : : /*
968 : : * Not safe, if the index supports parallel cleanup conditionally, but we
969 : : * have already processed the index (for bulkdelete). We do this to avoid
970 : : * the need to invoke workers when parallel index cleanup doesn't need to
971 : : * scan the index. See the comments for option
972 : : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
973 : : * parallel cleanup conditionally.
974 : : */
975 [ + + ]: 52 : if (num_index_scans > 0 &&
976 [ + - ]: 14 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
977 : 14 : return false;
978 : :
979 : 38 : return true;
980 : : }
981 : :
982 : : /*
983 : : * Perform work within a launched parallel process.
984 : : *
985 : : * Since parallel vacuum workers perform only index vacuum or index cleanup,
986 : : * we don't need to report progress information.
987 : : */
988 : : void
989 : 31 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
990 : : {
991 : : ParallelVacuumState pvs;
992 : : Relation rel;
993 : : Relation *indrels;
994 : : PVIndStats *indstats;
995 : : PVShared *shared;
996 : : TidStore *dead_items;
997 : : BufferUsage *buffer_usage;
998 : : WalUsage *wal_usage;
999 : : int nindexes;
1000 : : char *sharedquery;
1001 : : ErrorContextCallback errcallback;
1002 : :
1003 : : /*
1004 : : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1005 : : * don't support parallel vacuum for autovacuum as of now.
1006 : : */
1007 [ - + ]: 31 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1008 : :
1009 [ - + ]: 31 : elog(DEBUG1, "starting parallel vacuum worker");
1010 : :
1011 : 31 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1012 : :
1013 : : /* Set debug_query_string for individual workers */
1014 : 31 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1015 : 31 : debug_query_string = sharedquery;
1016 : 31 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1017 : :
1018 : : /* Track query ID */
341 michael@paquier.xyz 1019 : 31 : pgstat_report_query_id(shared->queryid, false);
1020 : :
1021 : : /*
1022 : : * Open table. The lock mode is the same as the leader process. It's
1023 : : * okay because the lock mode does not conflict among the parallel
1024 : : * workers.
1025 : : */
1353 akapila@postgresql.o 1026 : 31 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1027 : :
1028 : : /*
1029 : : * Open all indexes. indrels are sorted in order by OID, which should be
1030 : : * matched to the leader's one.
1031 : : */
1032 : 31 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1033 [ - + ]: 31 : Assert(nindexes > 0);
1034 : :
1035 : : /*
1036 : : * Apply the desired value of maintenance_work_mem within this process.
1037 : : * Really we should use SetConfigOption() to change a GUC, but since we're
1038 : : * already in parallel mode guc.c would complain about that. Fortunately,
1039 : : * by the same token guc.c will not let any user-defined code change it.
1040 : : * So just avert your eyes while we do this:
1041 : : */
1042 [ + - ]: 31 : if (shared->maintenance_work_mem_worker > 0)
1043 : 31 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1044 : :
1045 : : /* Set index statistics */
1046 : 31 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1047 : : PARALLEL_VACUUM_KEY_INDEX_STATS,
1048 : : false);
1049 : :
1050 : : /* Find dead_items in shared memory */
522 msawada@postgresql.o 1051 : 31 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1052 : : shared->dead_items_handle);
1053 : :
1054 : : /* Set cost-based vacuum delay */
883 dgustafsson@postgres 1055 : 31 : VacuumUpdateCosts();
1353 akapila@postgresql.o 1056 : 31 : VacuumCostBalance = 0;
1057 : 31 : VacuumCostBalanceLocal = 0;
1058 : 31 : VacuumSharedCostBalance = &(shared->cost_balance);
1059 : 31 : VacuumActiveNWorkers = &(shared->active_nworkers);
1060 : :
1061 : : /* Set parallel vacuum state */
1062 : 31 : pvs.indrels = indrels;
1063 : 31 : pvs.nindexes = nindexes;
1064 : 31 : pvs.indstats = indstats;
1065 : 31 : pvs.shared = shared;
1066 : 31 : pvs.dead_items = dead_items;
1067 : 31 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1068 : 31 : pvs.relname = pstrdup(RelationGetRelationName(rel));
889 andres@anarazel.de 1069 : 31 : pvs.heaprel = rel;
1070 : :
1071 : : /* These fields will be filled during index vacuum or cleanup */
1353 akapila@postgresql.o 1072 : 31 : pvs.indname = NULL;
1073 : 31 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1074 : :
1075 : : /* Each parallel VACUUM worker gets its own access strategy. */
883 drowley@postgresql.o 1076 : 62 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1077 : 31 : shared->ring_nbuffers * (BLCKSZ / 1024));
1078 : :
1079 : : /* Setup error traceback support for ereport() */
1353 akapila@postgresql.o 1080 : 31 : errcallback.callback = parallel_vacuum_error_callback;
1081 : 31 : errcallback.arg = &pvs;
1082 : 31 : errcallback.previous = error_context_stack;
1083 : 31 : error_context_stack = &errcallback;
1084 : :
1085 : : /* Prepare to track buffer usage during parallel execution */
1086 : 31 : InstrStartParallelQuery();
1087 : :
1088 : : /* Process indexes to perform vacuum/cleanup */
1089 : 31 : parallel_vacuum_process_safe_indexes(&pvs);
1090 : :
1091 : : /* Report buffer/WAL usage during parallel execution */
1092 : 31 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1093 : 31 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1094 : 31 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1095 : 31 : &wal_usage[ParallelWorkerNumber]);
1096 : :
1097 : : /* Report any remaining cost-based vacuum delay time */
207 nathan@postgresql.or 1098 [ - + ]: 31 : if (track_cost_delay_timing)
207 nathan@postgresql.or 1099 :UBC 0 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
1100 : : parallel_vacuum_worker_delay_ns);
1101 : :
522 msawada@postgresql.o 1102 :CBC 31 : TidStoreDetach(dead_items);
1103 : :
1104 : : /* Pop the error context stack */
1353 akapila@postgresql.o 1105 : 31 : error_context_stack = errcallback.previous;
1106 : :
1107 : 31 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1108 : 31 : table_close(rel, ShareUpdateExclusiveLock);
1109 : 31 : FreeAccessStrategy(pvs.bstrategy);
1110 : 31 : }
1111 : :
1112 : : /*
1113 : : * Error context callback for errors occurring during parallel index vacuum.
1114 : : * The error context messages should match the messages set in the lazy vacuum
1115 : : * error context. If you change this function, change vacuum_error_callback()
1116 : : * as well.
1117 : : */
1118 : : static void
1353 akapila@postgresql.o 1119 :UBC 0 : parallel_vacuum_error_callback(void *arg)
1120 : : {
1121 : 0 : ParallelVacuumState *errinfo = arg;
1122 : :
1123 [ # # # ]: 0 : switch (errinfo->status)
1124 : : {
1125 : 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1126 : 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1127 : : errinfo->indname,
1128 : : errinfo->relnamespace,
1129 : : errinfo->relname);
1130 : 0 : break;
1131 : 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1132 : 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1133 : : errinfo->indname,
1134 : : errinfo->relnamespace,
1135 : : errinfo->relname);
1136 : 0 : break;
1137 : 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1138 : : case PARALLEL_INDVAC_STATUS_COMPLETED:
1139 : : default:
1140 : 0 : return;
1141 : : }
1142 : : }
|