Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * vacuumparallel.c
4 : : * Support routines for parallel vacuum and autovacuum execution. In the
5 : : * comments below, the word "vacuum" will refer to both vacuum and
6 : : * autovacuum.
7 : : *
8 : : * This file contains routines that are intended to support setting up, using,
9 : : * and tearing down a ParallelVacuumState.
10 : : *
11 : : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
12 : : * with parallel worker processes. Individual indexes are processed by one
13 : : * vacuum process. ParallelVacuumState contains shared information as well as
14 : : * the memory space for storing dead items allocated in the DSA area. We
15 : : * launch parallel worker processes at the start of parallel index
16 : : * bulk-deletion and index cleanup and once all indexes are processed, the
17 : : * parallel worker processes exit. Each time we process indexes in parallel,
18 : : * the parallel context is re-initialized so that the same DSM can be used for
19 : : * multiple passes of index bulk-deletion and index cleanup.
20 : : *
21 : : * For parallel autovacuum, we need to propagate cost-based vacuum delay
22 : : * parameters from the leader to its workers, as the leader's parameters can
23 : : * change even while processing a table (e.g., due to a config reload).
24 : : * The PVSharedCostParams struct manages these parameters using a
25 : : * generation counter. Each parallel worker polls this shared state and
26 : : * refreshes its local delay parameters whenever a change is detected.
27 : : *
28 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
29 : : * Portions Copyright (c) 1994, Regents of the University of California
30 : : *
31 : : * IDENTIFICATION
32 : : * src/backend/commands/vacuumparallel.c
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : : #include "postgres.h"
37 : :
38 : : #include "access/amapi.h"
39 : : #include "access/table.h"
40 : : #include "access/xact.h"
41 : : #include "commands/progress.h"
42 : : #include "commands/vacuum.h"
43 : : #include "executor/instrument.h"
44 : : #include "optimizer/paths.h"
45 : : #include "pgstat.h"
46 : : #include "storage/bufmgr.h"
47 : : #include "storage/proc.h"
48 : : #include "tcop/tcopprot.h"
49 : : #include "utils/lsyscache.h"
50 : : #include "utils/rel.h"
51 : :
52 : : /*
53 : : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
54 : : * we don't need to worry about DSM keys conflicting with plan_node_id we can
55 : : * use small integers.
56 : : */
57 : : #define PARALLEL_VACUUM_KEY_SHARED 1
58 : : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
59 : : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
60 : : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
61 : : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
62 : :
63 : : /*
64 : : * Struct for cost-based vacuum delay related parameters to share among an
65 : : * autovacuum worker and its parallel vacuum workers.
66 : : */
67 : : typedef struct PVSharedCostParams
68 : : {
69 : : /*
70 : : * The generation counter is incremented by the leader process each time
71 : : * it updates the shared cost-based vacuum delay parameters. Parallel
72 : : * vacuum workers compare it with their local generation,
73 : : * shared_params_generation_local, to detect whether they need to refresh
74 : : * their local parameters. The generation starts from 1 so that a freshly
75 : : * started worker (whose local copy is 0) will always load the initial
76 : : * parameters on its first check.
77 : : */
78 : : pg_atomic_uint32 generation;
79 : :
80 : : slock_t mutex; /* protects all fields below */
81 : :
82 : : /* Parameters to share with parallel workers */
83 : : double cost_delay;
84 : : int cost_limit;
85 : : int cost_page_dirty;
86 : : int cost_page_hit;
87 : : int cost_page_miss;
88 : : } PVSharedCostParams;
89 : :
90 : : /*
91 : : * Shared information among parallel workers. So this is allocated in the DSM
92 : : * segment.
93 : : */
94 : : typedef struct PVShared
95 : : {
96 : : /*
97 : : * Target table relid, log level (for messages about parallel workers
98 : : * launched during VACUUM VERBOSE) and query ID. These fields are not
99 : : * modified during the parallel vacuum.
100 : : */
101 : : Oid relid;
102 : : int elevel;
103 : : int64 queryid;
104 : :
105 : : /*
106 : : * Fields for both index vacuum and cleanup.
107 : : *
108 : : * reltuples is the total number of input heap tuples. We set either old
109 : : * live tuples in the index vacuum case or the new live tuples in the
110 : : * index cleanup case.
111 : : *
112 : : * estimated_count is true if reltuples is an estimated value. (Note that
113 : : * reltuples could be -1 in this case, indicating we have no idea.)
114 : : */
115 : : double reltuples;
116 : : bool estimated_count;
117 : :
118 : : /*
119 : : * In single process vacuum we could consume more memory during index
120 : : * vacuuming or cleanup apart from the memory for heap scanning. In
121 : : * parallel vacuum, since individual vacuum workers can consume memory
122 : : * equal to maintenance_work_mem, the new maintenance_work_mem for each
123 : : * worker is set such that the parallel operation doesn't consume more
124 : : * memory than single process vacuum.
125 : : */
126 : : int maintenance_work_mem_worker;
127 : :
128 : : /*
129 : : * The number of buffers each worker's Buffer Access Strategy ring should
130 : : * contain.
131 : : */
132 : : int ring_nbuffers;
133 : :
134 : : /*
135 : : * Shared vacuum cost balance. During parallel vacuum,
136 : : * VacuumSharedCostBalance points to this value and it accumulates the
137 : : * balance of each parallel vacuum worker.
138 : : */
139 : : pg_atomic_uint32 cost_balance;
140 : :
141 : : /*
142 : : * Number of active parallel workers. This is used for computing the
143 : : * minimum threshold of the vacuum cost balance before a worker sleeps for
144 : : * cost-based delay.
145 : : */
146 : : pg_atomic_uint32 active_nworkers;
147 : :
148 : : /* Counter for vacuuming and cleanup */
149 : : pg_atomic_uint32 idx;
150 : :
151 : : /* DSA handle where the TidStore lives */
152 : : dsa_handle dead_items_dsa_handle;
153 : :
154 : : /* DSA pointer to the shared TidStore */
155 : : dsa_pointer dead_items_handle;
156 : :
157 : : /* Statistics of shared dead items */
158 : : VacDeadItemsInfo dead_items_info;
159 : :
160 : : /*
161 : : * If 'true' then we are running parallel autovacuum. Otherwise, we are
162 : : * running parallel maintenance VACUUM.
163 : : */
164 : : bool is_autovacuum;
165 : :
166 : : /*
167 : : * Cost-based vacuum delay parameters shared between the autovacuum leader
168 : : * and its parallel workers.
169 : : */
170 : : PVSharedCostParams cost_params;
171 : : } PVShared;
172 : :
173 : : /* Status used during parallel index vacuum or cleanup */
174 : : typedef enum PVIndVacStatus
175 : : {
176 : : PARALLEL_INDVAC_STATUS_INITIAL = 0,
177 : : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
178 : : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
179 : : PARALLEL_INDVAC_STATUS_COMPLETED,
180 : : } PVIndVacStatus;
181 : :
182 : : /*
183 : : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
184 : : * This includes the status of parallel index vacuum as well as index statistics.
185 : : */
186 : : typedef struct PVIndStats
187 : : {
188 : : /*
189 : : * The following two fields are set by leader process before executing
190 : : * parallel index vacuum or parallel index cleanup. These fields are not
191 : : * fixed for the entire VACUUM operation. They are only fixed for an
192 : : * individual parallel index vacuum and cleanup.
193 : : *
194 : : * parallel_workers_can_process is true if both leader and worker can
195 : : * process the index, otherwise only leader can process it.
196 : : */
197 : : PVIndVacStatus status;
198 : : bool parallel_workers_can_process;
199 : :
200 : : /*
201 : : * Individual worker or leader stores the result of index vacuum or
202 : : * cleanup.
203 : : */
204 : : bool istat_updated; /* are the stats updated? */
205 : : IndexBulkDeleteResult istat;
206 : : } PVIndStats;
207 : :
208 : : /*
209 : : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
210 : : */
211 : : struct ParallelVacuumState
212 : : {
213 : : /* NULL for worker processes */
214 : : ParallelContext *pcxt;
215 : :
216 : : /* Parent Heap Relation */
217 : : Relation heaprel;
218 : :
219 : : /* Target indexes */
220 : : Relation *indrels;
221 : : int nindexes;
222 : :
223 : : /* Shared information among parallel vacuum workers */
224 : : PVShared *shared;
225 : :
226 : : /*
227 : : * Shared index statistics among parallel vacuum workers. The array
228 : : * element is allocated for every index, even those indexes where parallel
229 : : * index vacuuming is unsafe or not worthwhile (e.g.,
230 : : * will_parallel_vacuum[] is false). During parallel vacuum,
231 : : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
232 : : * local memory at the end of parallel vacuum.
233 : : */
234 : : PVIndStats *indstats;
235 : :
236 : : /* Shared dead items space among parallel vacuum workers */
237 : : TidStore *dead_items;
238 : :
239 : : /* Points to buffer usage area in DSM */
240 : : BufferUsage *buffer_usage;
241 : :
242 : : /* Points to WAL usage area in DSM */
243 : : WalUsage *wal_usage;
244 : :
245 : : /*
246 : : * False if the index is totally unsuitable target for all parallel
247 : : * processing. For example, the index could be <
248 : : * min_parallel_index_scan_size cutoff.
249 : : */
250 : : bool *will_parallel_vacuum;
251 : :
252 : : /*
253 : : * The number of indexes that support parallel index bulk-deletion and
254 : : * parallel index cleanup respectively.
255 : : */
256 : : int nindexes_parallel_bulkdel;
257 : : int nindexes_parallel_cleanup;
258 : : int nindexes_parallel_condcleanup;
259 : :
260 : : /* Buffer access strategy used by leader process */
261 : : BufferAccessStrategy bstrategy;
262 : :
263 : : /*
264 : : * Error reporting state. The error callback is set only for workers
265 : : * processes during parallel index vacuum.
266 : : */
267 : : char *relnamespace;
268 : : char *relname;
269 : : char *indname;
270 : : PVIndVacStatus status;
271 : : };
272 : :
273 : : static PVSharedCostParams *pv_shared_cost_params = NULL;
274 : :
275 : : /*
276 : : * Worker-local copy of the last cost-parameter generation this worker has
277 : : * applied. Initialized to 0; since the leader initializes the shared
278 : : * generation counter to 1, the first call to
279 : : * parallel_vacuum_update_shared_delay_params() will always detect a
280 : : * mismatch and read the initial parameters from shared memory.
281 : : */
282 : : static uint32 shared_params_generation_local = 0;
283 : :
284 : : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
285 : : bool *will_parallel_vacuum);
286 : : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
287 : : bool vacuum, PVWorkerStats *wstats);
288 : : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
289 : : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
290 : : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
291 : : PVIndStats *indstats);
292 : : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
293 : : bool vacuum);
294 : : static void parallel_vacuum_error_callback(void *arg);
295 : : static inline void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params);
296 : : static void parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg);
297 : :
298 : : /*
299 : : * Try to enter parallel mode and create a parallel context. Then initialize
300 : : * shared memory state.
301 : : *
302 : : * On success, return parallel vacuum state. Otherwise return NULL.
303 : : */
304 : : ParallelVacuumState *
1594 akapila@postgresql.o 305 :CBC 6723 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
306 : : int nrequested_workers, int vac_work_mem,
307 : : int elevel, BufferAccessStrategy bstrategy)
308 : : {
309 : : ParallelVacuumState *pvs;
310 : : ParallelContext *pcxt;
311 : : PVShared *shared;
312 : : TidStore *dead_items;
313 : : PVIndStats *indstats;
314 : : BufferUsage *buffer_usage;
315 : : WalUsage *wal_usage;
316 : : bool *will_parallel_vacuum;
317 : : Size est_indstats_len;
318 : : Size est_shared_len;
319 : 6723 : int nindexes_mwm = 0;
320 : 6723 : int parallel_workers = 0;
321 : : int querylen;
322 : :
323 : : /*
324 : : * A parallel vacuum must be requested and there must be indexes on the
325 : : * relation
326 : : */
327 [ - + ]: 6723 : Assert(nrequested_workers >= 0);
328 [ - + ]: 6723 : Assert(nindexes > 0);
329 : :
330 : : /*
331 : : * Compute the number of parallel vacuum workers to launch
332 : : */
146 michael@paquier.xyz 333 :GNC 6723 : will_parallel_vacuum = palloc0_array(bool, nindexes);
1594 akapila@postgresql.o 334 :CBC 6723 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
335 : : nrequested_workers,
336 : : will_parallel_vacuum);
337 [ + + ]: 6723 : if (parallel_workers <= 0)
338 : : {
339 : : /* Can't perform vacuum in parallel -- return NULL */
340 : 6698 : pfree(will_parallel_vacuum);
341 : 6698 : return NULL;
342 : : }
343 : :
146 michael@paquier.xyz 344 :GNC 25 : pvs = palloc0_object(ParallelVacuumState);
1594 akapila@postgresql.o 345 :CBC 25 : pvs->indrels = indrels;
346 : 25 : pvs->nindexes = nindexes;
347 : 25 : pvs->will_parallel_vacuum = will_parallel_vacuum;
348 : 25 : pvs->bstrategy = bstrategy;
1130 andres@anarazel.de 349 : 25 : pvs->heaprel = rel;
350 : :
1594 akapila@postgresql.o 351 : 25 : EnterParallelMode();
352 : 25 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
353 : : parallel_workers);
354 [ - + ]: 25 : Assert(pcxt->nworkers > 0);
355 : 25 : pvs->pcxt = pcxt;
356 : :
357 : : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
358 : 25 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
359 : 25 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
360 : 25 : shm_toc_estimate_keys(&pcxt->estimator, 1);
361 : :
362 : : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
363 : 25 : est_shared_len = sizeof(PVShared);
364 : 25 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
365 : 25 : shm_toc_estimate_keys(&pcxt->estimator, 1);
366 : :
367 : : /*
368 : : * Estimate space for BufferUsage and WalUsage --
369 : : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
370 : : *
371 : : * If there are no extensions loaded that care, we could skip this. We
372 : : * have no way of knowing whether anyone's looking at pgBufferUsage or
373 : : * pgWalUsage, so do it unconditionally.
374 : : */
375 : 25 : shm_toc_estimate_chunk(&pcxt->estimator,
376 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
377 : 25 : shm_toc_estimate_keys(&pcxt->estimator, 1);
378 : 25 : shm_toc_estimate_chunk(&pcxt->estimator,
379 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
380 : 25 : shm_toc_estimate_keys(&pcxt->estimator, 1);
381 : :
382 : : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
383 [ + + ]: 25 : if (debug_query_string)
384 : : {
385 : 23 : querylen = strlen(debug_query_string);
386 : 23 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
387 : 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
388 : : }
389 : : else
1594 akapila@postgresql.o 390 :GBC 2 : querylen = 0; /* keep compiler quiet */
391 : :
1594 akapila@postgresql.o 392 :CBC 25 : InitializeParallelDSM(pcxt);
393 : :
394 : : /* Prepare index vacuum stats */
395 : 25 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
1356 pg@bowt.ie 396 [ + - + - : 565 : MemSet(indstats, 0, est_indstats_len);
+ - + - +
+ ]
1594 akapila@postgresql.o 397 [ + + ]: 115 : for (int i = 0; i < nindexes; i++)
398 : : {
399 : 90 : Relation indrel = indrels[i];
400 : 90 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
401 : :
402 : : /*
403 : : * Cleanup option should be either disabled, always performing in
404 : : * parallel or conditionally performing in parallel.
405 : : */
406 [ + + - + ]: 90 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
407 : : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
408 [ - + ]: 90 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
409 : :
410 [ + + ]: 90 : if (!will_parallel_vacuum[i])
411 : 4 : continue;
412 : :
413 [ + + ]: 86 : if (indrel->rd_indam->amusemaintenanceworkmem)
414 : 8 : nindexes_mwm++;
415 : :
416 : : /*
417 : : * Remember the number of indexes that support parallel operation for
418 : : * each phase.
419 : : */
420 [ + + ]: 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
421 : 78 : pvs->nindexes_parallel_bulkdel++;
422 [ + + ]: 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
423 : 16 : pvs->nindexes_parallel_cleanup++;
424 [ + + ]: 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
425 : 62 : pvs->nindexes_parallel_condcleanup++;
426 : : }
427 : 25 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
428 : 25 : pvs->indstats = indstats;
429 : :
430 : : /* Prepare shared information */
431 : 25 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
432 [ + - + - : 400 : MemSet(shared, 0, est_shared_len);
+ - + - +
+ ]
433 : 25 : shared->relid = RelationGetRelid(rel);
434 : 25 : shared->elevel = elevel;
582 michael@paquier.xyz 435 : 25 : shared->queryid = pgstat_get_my_query_id();
1594 akapila@postgresql.o 436 : 25 : shared->maintenance_work_mem_worker =
437 : : (nindexes_mwm > 0) ?
29 msawada@postgresql.o 438 [ + + ]:GNC 25 : vac_work_mem / Min(parallel_workers, nindexes_mwm) :
439 : : vac_work_mem;
440 : :
459 tgl@sss.pgh.pa.us 441 :CBC 25 : shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
442 : :
443 : : /* Prepare DSA space for dead items */
763 msawada@postgresql.o 444 : 25 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
445 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
446 : 25 : pvs->dead_items = dead_items;
447 : 25 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
448 : 25 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
449 : :
450 : : /* Use the same buffer size for all workers */
1124 drowley@postgresql.o 451 : 25 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
452 : :
1594 akapila@postgresql.o 453 : 25 : pg_atomic_init_u32(&(shared->cost_balance), 0);
454 : 25 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
455 : 25 : pg_atomic_init_u32(&(shared->idx), 0);
456 : :
29 msawada@postgresql.o 457 :GNC 25 : shared->is_autovacuum = AmAutoVacuumWorkerProcess();
458 : :
459 : : /*
460 : : * Initialize shared cost-based vacuum delay parameters if it's for
461 : : * autovacuum.
462 : : */
463 [ + + ]: 25 : if (shared->is_autovacuum)
464 : : {
465 : 2 : parallel_vacuum_set_cost_parameters(&shared->cost_params);
466 : 2 : pg_atomic_init_u32(&shared->cost_params.generation, 1);
467 : 2 : SpinLockInit(&shared->cost_params.mutex);
468 : :
469 : 2 : pv_shared_cost_params = &(shared->cost_params);
470 : 2 : on_dsm_detach(pcxt->seg, parallel_vacuum_dsm_detach, (Datum) 0);
471 : : }
472 : :
1594 akapila@postgresql.o 473 :CBC 25 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
474 : 25 : pvs->shared = shared;
475 : :
476 : : /*
477 : : * Allocate space for each worker's BufferUsage and WalUsage; no need to
478 : : * initialize
479 : : */
480 : 25 : buffer_usage = shm_toc_allocate(pcxt->toc,
481 : 25 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
482 : 25 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
483 : 25 : pvs->buffer_usage = buffer_usage;
484 : 25 : wal_usage = shm_toc_allocate(pcxt->toc,
485 : 25 : mul_size(sizeof(WalUsage), pcxt->nworkers));
486 : 25 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
487 : 25 : pvs->wal_usage = wal_usage;
488 : :
489 : : /* Store query string for workers */
490 [ + + ]: 25 : if (debug_query_string)
491 : : {
492 : : char *sharedquery;
493 : :
494 : 23 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
495 : 23 : memcpy(sharedquery, debug_query_string, querylen + 1);
496 : 23 : sharedquery[querylen] = '\0';
497 : 23 : shm_toc_insert(pcxt->toc,
498 : : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
499 : : }
500 : :
501 : : /* Success -- return parallel vacuum state */
502 : 25 : return pvs;
503 : : }
504 : :
505 : : /*
506 : : * Destroy the parallel context, and end parallel mode.
507 : : *
508 : : * Since writes are not allowed during parallel mode, copy the
509 : : * updated index statistics from DSM into local memory and then later use that
510 : : * to update the index statistics. One might think that we can exit from
511 : : * parallel mode, update the index statistics and then destroy parallel
512 : : * context, but that won't be safe (see ExitParallelMode).
513 : : */
514 : : void
515 : 24 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
516 : : {
517 [ - + ]: 24 : Assert(!IsParallelWorker());
518 : :
519 : : /* Copy the updated statistics */
520 [ + + ]: 110 : for (int i = 0; i < pvs->nindexes; i++)
521 : : {
522 : 86 : PVIndStats *indstats = &(pvs->indstats[i]);
523 : :
524 [ + + ]: 86 : if (indstats->istat_updated)
525 : : {
146 michael@paquier.xyz 526 :GNC 56 : istats[i] = palloc0_object(IndexBulkDeleteResult);
1594 akapila@postgresql.o 527 :CBC 56 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
528 : : }
529 : : else
530 : 30 : istats[i] = NULL;
531 : : }
532 : :
763 msawada@postgresql.o 533 : 24 : TidStoreDestroy(pvs->dead_items);
534 : :
1594 akapila@postgresql.o 535 : 24 : DestroyParallelContext(pvs->pcxt);
536 : 24 : ExitParallelMode();
537 : :
29 msawada@postgresql.o 538 [ + + ]:GNC 24 : if (AmAutoVacuumWorkerProcess())
539 : 1 : pv_shared_cost_params = NULL;
540 : :
1594 akapila@postgresql.o 541 :CBC 24 : pfree(pvs->will_parallel_vacuum);
542 : 24 : pfree(pvs);
543 : 24 : }
544 : :
545 : : /*
546 : : * DSM detach callback. This is invoked when an autovacuum worker detaches
547 : : * from the DSM segment holding PVShared. It ensures to reset the local pointer
548 : : * to the shared state even if parallel vacuum raises an error and doesn't
549 : : * call parallel_vacuum_end().
550 : : */
551 : : static void
29 msawada@postgresql.o 552 :GNC 2 : parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg)
553 : : {
554 [ - + ]: 2 : Assert(AmAutoVacuumWorkerProcess());
555 : 2 : pv_shared_cost_params = NULL;
556 : 2 : }
557 : :
558 : : /*
559 : : * Returns the dead items space and dead items information.
560 : : */
561 : : TidStore *
763 msawada@postgresql.o 562 :CBC 38 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
563 : : {
564 : 38 : *dead_items_info_p = &(pvs->shared->dead_items_info);
1594 akapila@postgresql.o 565 : 38 : return pvs->dead_items;
566 : : }
567 : :
568 : : /* Forget all items in dead_items */
569 : : void
763 msawada@postgresql.o 570 : 13 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
571 : : {
572 : 13 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
573 : :
574 : : /*
575 : : * Free the current tidstore and return allocated DSA segments to the
576 : : * operating system. Then we recreate the tidstore with the same max_bytes
577 : : * limitation we just used.
578 : : */
517 john.naylor@postgres 579 : 13 : TidStoreDestroy(pvs->dead_items);
763 msawada@postgresql.o 580 : 13 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
581 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
582 : :
583 : : /* Update the DSA pointer for dead_items to the new one */
517 john.naylor@postgres 584 : 13 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
585 : 13 : pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
586 : :
587 : : /* Reset the counter */
763 msawada@postgresql.o 588 : 13 : dead_items_info->num_items = 0;
589 : 13 : }
590 : :
591 : : /*
592 : : * Do parallel index bulk-deletion with parallel workers.
593 : : */
594 : : void
1594 akapila@postgresql.o 595 : 14 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
596 : : int num_index_scans, PVWorkerStats *wstats)
597 : : {
598 [ - + ]: 14 : Assert(!IsParallelWorker());
599 : :
600 : : /*
601 : : * We can only provide an approximate value of num_heap_tuples, at least
602 : : * for now.
603 : : */
604 : 14 : pvs->shared->reltuples = num_table_tuples;
605 : 14 : pvs->shared->estimated_count = true;
606 : :
47 msawada@postgresql.o 607 :GNC 14 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
1594 akapila@postgresql.o 608 :CBC 13 : }
609 : :
610 : : /*
611 : : * Do parallel index cleanup with parallel workers.
612 : : */
613 : : void
614 : 24 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
615 : : int num_index_scans, bool estimated_count,
616 : : PVWorkerStats *wstats)
617 : : {
618 [ - + ]: 24 : Assert(!IsParallelWorker());
619 : :
620 : : /*
621 : : * We can provide a better estimate of total number of surviving tuples
622 : : * (we assume indexes are more interested in that than in the number of
623 : : * nominally live tuples).
624 : : */
625 : 24 : pvs->shared->reltuples = num_table_tuples;
626 : 24 : pvs->shared->estimated_count = estimated_count;
627 : :
47 msawada@postgresql.o 628 :GNC 24 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
1594 akapila@postgresql.o 629 : 24 : }
630 : :
631 : : /*
632 : : * Fill in the given structure with cost-based vacuum delay parameter values.
633 : : */
634 : : static inline void
29 msawada@postgresql.o 635 : 3 : parallel_vacuum_set_cost_parameters(PVSharedCostParams *params)
636 : : {
637 : 3 : params->cost_delay = vacuum_cost_delay;
638 : 3 : params->cost_limit = vacuum_cost_limit;
639 : 3 : params->cost_page_dirty = VacuumCostPageDirty;
640 : 3 : params->cost_page_hit = VacuumCostPageHit;
641 : 3 : params->cost_page_miss = VacuumCostPageMiss;
642 : 3 : }
643 : :
644 : : /*
645 : : * Updates the cost-based vacuum delay parameters for parallel autovacuum
646 : : * workers.
647 : : *
648 : : * For non-autovacuum parallel workers, this function will have no effect.
649 : : */
650 : : void
651 : 277 : parallel_vacuum_update_shared_delay_params(void)
652 : : {
653 : : uint32 params_generation;
654 : :
655 [ - + ]: 277 : Assert(IsParallelWorker());
656 : :
657 : : /* Quick return if the worker is not running for the autovacuum */
658 [ + + ]: 277 : if (pv_shared_cost_params == NULL)
659 : 18 : return;
660 : :
661 : 259 : params_generation = pg_atomic_read_u32(&pv_shared_cost_params->generation);
662 [ - + ]: 259 : Assert(shared_params_generation_local <= params_generation);
663 : :
664 : : /* Return if parameters had not changed in the leader */
665 [ + + ]: 259 : if (params_generation == shared_params_generation_local)
666 : 256 : return;
667 : :
668 : 3 : SpinLockAcquire(&pv_shared_cost_params->mutex);
669 : 3 : VacuumCostDelay = pv_shared_cost_params->cost_delay;
670 : 3 : VacuumCostLimit = pv_shared_cost_params->cost_limit;
671 : 3 : VacuumCostPageDirty = pv_shared_cost_params->cost_page_dirty;
672 : 3 : VacuumCostPageHit = pv_shared_cost_params->cost_page_hit;
673 : 3 : VacuumCostPageMiss = pv_shared_cost_params->cost_page_miss;
674 : 3 : SpinLockRelease(&pv_shared_cost_params->mutex);
675 : :
676 : 3 : VacuumUpdateCosts();
677 : :
678 : 3 : shared_params_generation_local = params_generation;
679 : :
680 [ + - ]: 3 : elog(DEBUG2,
681 : : "parallel autovacuum worker updated cost params: cost_limit=%d, cost_delay=%g, cost_page_miss=%d, cost_page_dirty=%d, cost_page_hit=%d",
682 : : vacuum_cost_limit,
683 : : vacuum_cost_delay,
684 : : VacuumCostPageMiss,
685 : : VacuumCostPageDirty,
686 : : VacuumCostPageHit);
687 : : }
688 : :
689 : : /*
690 : : * Store the cost-based vacuum delay parameters in the shared memory so that
691 : : * parallel vacuum workers can consume them (see
692 : : * parallel_vacuum_update_shared_delay_params()).
693 : : */
694 : : void
695 : 1 : parallel_vacuum_propagate_shared_delay_params(void)
696 : : {
697 [ - + ]: 1 : Assert(AmAutoVacuumWorkerProcess());
698 : :
699 : : /*
700 : : * Quick return if the leader process is not sharing the delay parameters.
701 : : */
702 [ - + ]: 1 : if (pv_shared_cost_params == NULL)
29 msawada@postgresql.o 703 :UNC 0 : return;
704 : :
705 : : /*
706 : : * Check if any delay parameters have changed. We can read them without
707 : : * locks as only the leader can modify them.
708 : : */
29 msawada@postgresql.o 709 [ - + ]:GNC 1 : if (vacuum_cost_delay == pv_shared_cost_params->cost_delay &&
29 msawada@postgresql.o 710 [ # # ]:UNC 0 : vacuum_cost_limit == pv_shared_cost_params->cost_limit &&
711 [ # # ]: 0 : VacuumCostPageDirty == pv_shared_cost_params->cost_page_dirty &&
712 [ # # ]: 0 : VacuumCostPageHit == pv_shared_cost_params->cost_page_hit &&
713 [ # # ]: 0 : VacuumCostPageMiss == pv_shared_cost_params->cost_page_miss)
714 : 0 : return;
715 : :
716 : : /* Update the shared delay parameters */
29 msawada@postgresql.o 717 :GNC 1 : SpinLockAcquire(&pv_shared_cost_params->mutex);
718 : 1 : parallel_vacuum_set_cost_parameters(pv_shared_cost_params);
719 : 1 : SpinLockRelease(&pv_shared_cost_params->mutex);
720 : :
721 : : /*
722 : : * Increment the generation of the parameters, i.e. let parallel workers
723 : : * know that they should re-read shared cost params.
724 : : */
725 : 1 : pg_atomic_fetch_add_u32(&pv_shared_cost_params->generation, 1);
29 msawada@postgresql.o 726 :ECB (17) : }
727 : :
728 : : /*
729 : : * Compute the number of parallel worker processes to request. Both index
730 : : * vacuum and index cleanup can be executed with parallel workers.
731 : : * The index is eligible for parallel vacuum iff its size is greater than
732 : : * min_parallel_index_scan_size as invoking workers for very small indexes
733 : : * can hurt performance.
734 : : *
735 : : * nrequested is the number of parallel workers that user requested. If
736 : : * nrequested is 0, we compute the parallel degree based on nindexes, that is
737 : : * the number of indexes that support parallel vacuum. This function also
738 : : * sets will_parallel_vacuum to remember indexes that participate in parallel
739 : : * vacuum.
740 : : */
741 : : static int
1594 akapila@postgresql.o 742 :CBC 6723 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
743 : : bool *will_parallel_vacuum)
744 : : {
745 : 6723 : int nindexes_parallel = 0;
746 : 6723 : int nindexes_parallel_bulkdel = 0;
747 : 6723 : int nindexes_parallel_cleanup = 0;
748 : : int parallel_workers;
749 : : int max_workers;
750 : :
29 msawada@postgresql.o 751 :GNC 13446 : max_workers = AmAutoVacuumWorkerProcess() ?
752 [ + + ]: 6723 : autovacuum_max_parallel_workers :
753 : : max_parallel_maintenance_workers;
754 : :
755 : : /*
756 : : * We don't allow performing parallel operation in standalone backend or
757 : : * when parallelism is disabled.
758 : : */
759 [ + + + + ]: 6723 : if (!IsUnderPostmaster || max_workers == 0)
1594 akapila@postgresql.o 760 :CBC 3141 : return 0;
761 : :
762 : : /*
763 : : * Compute the number of indexes that can participate in parallel vacuum.
764 : : */
765 [ + + ]: 11672 : for (int i = 0; i < nindexes; i++)
766 : : {
767 : 8090 : Relation indrel = indrels[i];
768 : 8090 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
769 : :
770 : : /* Skip index that is not a suitable target for parallel index vacuum */
771 [ + - ]: 8090 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
772 [ + + ]: 8090 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
773 : 7992 : continue;
774 : :
775 : 98 : will_parallel_vacuum[i] = true;
776 : :
777 [ + + ]: 98 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
778 : 90 : nindexes_parallel_bulkdel++;
779 [ + + ]: 98 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
780 [ + + ]: 82 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
781 : 90 : nindexes_parallel_cleanup++;
782 : : }
783 : :
784 : 3582 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
785 : : nindexes_parallel_cleanup);
786 : :
787 : : /* The leader process takes one index */
788 : 3582 : nindexes_parallel--;
789 : :
790 : : /* No index supports parallel vacuum */
791 [ + + ]: 3582 : if (nindexes_parallel <= 0)
792 : 3557 : return 0;
793 : :
794 : : /* Compute the parallel degree */
795 : 25 : parallel_workers = (nrequested > 0) ?
796 [ + + ]: 25 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
797 : :
798 : : /* Cap by GUC variable */
29 msawada@postgresql.o 799 :GNC 25 : parallel_workers = Min(parallel_workers, max_workers);
800 : :
1594 akapila@postgresql.o 801 :CBC 25 : return parallel_workers;
802 : : }
803 : :
804 : : /*
805 : : * Perform index vacuum or index cleanup with parallel workers. This function
806 : : * must be used by the parallel vacuum leader process.
807 : : *
808 : : * If wstats is not NULL, the parallel worker statistics are updated.
809 : : */
810 : : static void
811 : 38 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
812 : : bool vacuum, PVWorkerStats *wstats)
813 : : {
814 : : int nworkers;
815 : : PVIndVacStatus new_status;
816 : :
817 [ - + ]: 38 : Assert(!IsParallelWorker());
818 : :
819 [ + + ]: 38 : if (vacuum)
820 : : {
821 : 14 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
822 : :
823 : : /* Determine the number of parallel workers to launch */
824 : 14 : nworkers = pvs->nindexes_parallel_bulkdel;
825 : : }
826 : : else
827 : : {
828 : 24 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
829 : :
830 : : /* Determine the number of parallel workers to launch */
831 : 24 : nworkers = pvs->nindexes_parallel_cleanup;
832 : :
833 : : /* Add conditionally parallel-aware indexes if in the first time call */
834 [ + + ]: 24 : if (num_index_scans == 0)
835 : 15 : nworkers += pvs->nindexes_parallel_condcleanup;
836 : : }
837 : :
838 : : /* The leader process will participate */
839 : 38 : nworkers--;
840 : :
841 : : /*
842 : : * It is possible that parallel context is initialized with fewer workers
843 : : * than the number of indexes that need a separate worker in the current
844 : : * phase, so we need to consider it. See
845 : : * parallel_vacuum_compute_workers().
846 : : */
847 : 38 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
848 : :
849 : : /* Update the statistics, if we asked to */
47 msawada@postgresql.o 850 [ + - + + ]:GNC 38 : if (wstats != NULL && nworkers > 0)
851 : 30 : wstats->nplanned += nworkers;
852 : :
853 : : /*
854 : : * Set index vacuum status and mark whether parallel vacuum worker can
855 : : * process it.
856 : : */
1594 akapila@postgresql.o 857 [ + + ]:CBC 164 : for (int i = 0; i < pvs->nindexes; i++)
858 : : {
859 : 126 : PVIndStats *indstats = &(pvs->indstats[i]);
860 : :
861 [ - + ]: 126 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
862 : 126 : indstats->status = new_status;
863 : 126 : indstats->parallel_workers_can_process =
1352 864 [ + + + + ]: 244 : (pvs->will_parallel_vacuum[i] &&
1594 865 : 118 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
866 : : num_index_scans,
867 : : vacuum));
868 : : }
869 : :
870 : : /* Reset the parallel index processing and progress counters */
871 : 38 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
872 : :
873 : : /* Setup the shared cost-based vacuum delay and launch workers */
874 [ + + ]: 38 : if (nworkers > 0)
875 : : {
876 : : /* Reinitialize parallel context to relaunch parallel workers */
877 [ + + ]: 30 : if (num_index_scans > 0)
878 : 5 : ReinitializeParallelDSM(pvs->pcxt);
879 : :
880 : : /*
881 : : * Set up shared cost balance and the number of active workers for
882 : : * vacuum delay. We need to do this before launching workers as
883 : : * otherwise, they might not see the updated values for these
884 : : * parameters.
885 : : */
886 : 30 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
887 : 30 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
888 : :
889 : : /*
890 : : * The number of workers can vary between bulkdelete and cleanup
891 : : * phase.
892 : : */
893 : 30 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
894 : :
895 : 30 : LaunchParallelWorkers(pvs->pcxt);
896 : :
897 [ + - ]: 30 : if (pvs->pcxt->nworkers_launched > 0)
898 : : {
899 : : /*
900 : : * Reset the local cost values for leader backend as we have
901 : : * already accumulated the remaining balance of heap.
902 : : */
903 : 30 : VacuumCostBalance = 0;
904 : 30 : VacuumCostBalanceLocal = 0;
905 : :
906 : : /* Enable shared cost balance for leader backend */
907 : 30 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
908 : 30 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
909 : :
910 : : /* Update the statistics, if we asked to */
47 msawada@postgresql.o 911 [ + - ]:GNC 30 : if (wstats != NULL)
912 : 30 : wstats->nlaunched += pvs->pcxt->nworkers_launched;
913 : : }
914 : :
1594 akapila@postgresql.o 915 [ + + ]:CBC 30 : if (vacuum)
916 [ + + ]: 14 : ereport(pvs->shared->elevel,
917 : : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
918 : : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
919 : : pvs->pcxt->nworkers_launched),
920 : : pvs->pcxt->nworkers_launched, nworkers)));
921 : : else
922 [ - + ]: 16 : ereport(pvs->shared->elevel,
923 : : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
924 : : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
925 : : pvs->pcxt->nworkers_launched),
926 : : pvs->pcxt->nworkers_launched, nworkers)));
927 : : }
928 : :
929 : : /* Vacuum the indexes that can be processed by only leader process */
930 : 38 : parallel_vacuum_process_unsafe_indexes(pvs);
931 : :
932 : : /*
933 : : * Join as a parallel worker. The leader vacuums alone processes all
934 : : * parallel-safe indexes in the case where no workers are launched.
935 : : */
936 : 38 : parallel_vacuum_process_safe_indexes(pvs);
937 : :
938 : : /*
939 : : * Next, accumulate buffer and WAL usage. (This must wait for the workers
940 : : * to finish, or we might get incomplete data.)
941 : : */
942 [ + + ]: 37 : if (nworkers > 0)
943 : : {
944 : : /* Wait for all vacuum workers to finish */
945 : 29 : WaitForParallelWorkersToFinish(pvs->pcxt);
946 : :
947 [ + + ]: 67 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
948 : 38 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
949 : : }
950 : :
951 : : /*
952 : : * Reset all index status back to initial (while checking that we have
953 : : * vacuumed all indexes).
954 : : */
955 [ + + ]: 159 : for (int i = 0; i < pvs->nindexes; i++)
956 : : {
957 : 122 : PVIndStats *indstats = &(pvs->indstats[i]);
958 : :
959 [ - + ]: 122 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
1594 akapila@postgresql.o 960 [ # # ]:UBC 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
961 : : RelationGetRelationName(pvs->indrels[i]));
962 : :
1594 akapila@postgresql.o 963 :CBC 122 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
964 : : }
965 : :
966 : : /*
967 : : * Carry the shared balance value to heap scan and disable shared costing
968 : : */
969 [ + + ]: 37 : if (VacuumSharedCostBalance)
970 : : {
971 : 29 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
972 : 29 : VacuumSharedCostBalance = NULL;
973 : 29 : VacuumActiveNWorkers = NULL;
974 : : }
975 : 37 : }
976 : :
977 : : /*
978 : : * Index vacuum/cleanup routine used by the leader process and parallel
979 : : * vacuum worker processes to vacuum the indexes in parallel.
980 : : */
981 : : static void
982 : 77 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
983 : : {
984 : : /*
985 : : * Increment the active worker count if we are able to launch any worker.
986 : : */
987 [ + + ]: 77 : if (VacuumActiveNWorkers)
988 : 69 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
989 : :
990 : : /* Loop until all indexes are vacuumed */
991 : : for (;;)
992 : 125 : {
993 : : int idx;
994 : : PVIndStats *indstats;
995 : :
996 : : /* Get an index number to process */
997 : 202 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
998 : :
999 : : /* Done for all indexes? */
1000 [ + + ]: 202 : if (idx >= pvs->nindexes)
1001 : 76 : break;
1002 : :
1003 : 126 : indstats = &(pvs->indstats[idx]);
1004 : :
1005 : : /*
1006 : : * Skip vacuuming index that is unsafe for workers or has an
1007 : : * unsuitable target for parallel index vacuum (this is vacuumed in
1008 : : * parallel_vacuum_process_unsafe_indexes() by the leader).
1009 : : */
1010 [ + + ]: 126 : if (!indstats->parallel_workers_can_process)
1011 : 38 : continue;
1012 : :
1013 : : /* Do vacuum or cleanup of the index */
1014 : 88 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
1015 : : }
1016 : :
1017 : : /*
1018 : : * We have completed the index vacuum so decrement the active worker
1019 : : * count.
1020 : : */
1021 [ + + ]: 76 : if (VacuumActiveNWorkers)
1022 : 68 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
1023 : 76 : }
1024 : :
1025 : : /*
1026 : : * Perform parallel vacuuming of indexes in leader process.
1027 : : *
1028 : : * Handles index vacuuming (or index cleanup) for indexes that are not
1029 : : * parallel safe. It's possible that this will vary for a given index, based
1030 : : * on details like whether we're performing index cleanup right now.
1031 : : *
1032 : : * Also performs vacuuming of smaller indexes that fell under the size cutoff
1033 : : * enforced by parallel_vacuum_compute_workers().
1034 : : */
1035 : : static void
1036 : 38 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
1037 : : {
1038 [ - + ]: 38 : Assert(!IsParallelWorker());
1039 : :
1040 : : /*
1041 : : * Increment the active worker count if we are able to launch any worker.
1042 : : */
1043 [ + + ]: 38 : if (VacuumActiveNWorkers)
1044 : 30 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
1045 : :
1046 [ + + ]: 164 : for (int i = 0; i < pvs->nindexes; i++)
1047 : : {
1048 : 126 : PVIndStats *indstats = &(pvs->indstats[i]);
1049 : :
1050 : : /* Skip, indexes that are safe for workers */
1051 [ + + ]: 126 : if (indstats->parallel_workers_can_process)
1052 : 88 : continue;
1053 : :
1054 : : /* Do vacuum or cleanup of the index */
1055 : 38 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
1056 : : }
1057 : :
1058 : : /*
1059 : : * We have completed the index vacuum so decrement the active worker
1060 : : * count.
1061 : : */
1062 [ + + ]: 38 : if (VacuumActiveNWorkers)
1063 : 30 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
1064 : 38 : }
1065 : :
1066 : : /*
1067 : : * Vacuum or cleanup index either by leader process or by one of the worker
1068 : : * process. After vacuuming the index this function copies the index
1069 : : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
1070 : : * segment.
1071 : : */
1072 : : static void
1073 : 126 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
1074 : : PVIndStats *indstats)
1075 : : {
1076 : 126 : IndexBulkDeleteResult *istat = NULL;
1077 : : IndexBulkDeleteResult *istat_res;
1078 : : IndexVacuumInfo ivinfo;
1079 : :
1080 : : /*
1081 : : * Update the pointer to the corresponding bulk-deletion result if someone
1082 : : * has already updated it
1083 : : */
1084 [ + + ]: 126 : if (indstats->istat_updated)
1085 : 36 : istat = &(indstats->istat);
1086 : :
1087 : 126 : ivinfo.index = indrel;
1128 pg@bowt.ie 1088 : 126 : ivinfo.heaprel = pvs->heaprel;
1594 akapila@postgresql.o 1089 : 126 : ivinfo.analyze_only = false;
1090 : 126 : ivinfo.report_progress = false;
1572 pg@bowt.ie 1091 : 126 : ivinfo.message_level = DEBUG2;
1594 akapila@postgresql.o 1092 : 126 : ivinfo.estimated_count = pvs->shared->estimated_count;
1093 : 126 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
1094 : 126 : ivinfo.strategy = pvs->bstrategy;
1095 : :
1096 : : /* Update error traceback information */
1097 : 126 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
1098 : 126 : pvs->status = indstats->status;
1099 : :
1100 [ + + - ]: 126 : switch (indstats->status)
1101 : : {
1102 : 40 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
763 msawada@postgresql.o 1103 : 40 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
1104 : 40 : &pvs->shared->dead_items_info);
1594 akapila@postgresql.o 1105 : 39 : break;
1106 : 86 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1107 : 86 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
1108 : 86 : break;
1594 akapila@postgresql.o 1109 :UBC 0 : default:
1110 [ # # ]: 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
1111 : : indstats->status,
1112 : : RelationGetRelationName(indrel));
1113 : : }
1114 : :
1115 : : /*
1116 : : * Copy the index bulk-deletion result returned from ambulkdelete and
1117 : : * amvacuumcleanup to the DSM segment if it's the first cycle because they
1118 : : * allocate locally and it's possible that an index will be vacuumed by a
1119 : : * different vacuum process the next cycle. Copying the result normally
1120 : : * happens only the first time an index is vacuumed. For any additional
1121 : : * vacuum pass, we directly point to the result on the DSM segment and
1122 : : * pass it to vacuum index APIs so that workers can update it directly.
1123 : : *
1124 : : * Since all vacuum workers write the bulk-deletion result at different
1125 : : * slots we can write them without locking.
1126 : : */
1594 akapila@postgresql.o 1127 [ + + + + ]:CBC 125 : if (!indstats->istat_updated && istat_res != NULL)
1128 : : {
1129 : 59 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
1130 : 59 : indstats->istat_updated = true;
1131 : :
1132 : : /* Free the locally-allocated bulk-deletion result */
1133 : 59 : pfree(istat_res);
1134 : : }
1135 : :
1136 : : /*
1137 : : * Update the status to completed. No need to lock here since each worker
1138 : : * touches different indexes.
1139 : : */
1140 : 125 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
1141 : :
1142 : : /* Reset error traceback information */
1143 : 125 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
1144 : 125 : pfree(pvs->indname);
1145 : 125 : pvs->indname = NULL;
1146 : :
1147 : : /*
1148 : : * Call the parallel variant of pgstat_progress_incr_param so workers can
1149 : : * report progress of index vacuum to the leader.
1150 : : */
1029 msawada@postgresql.o 1151 : 125 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
1594 akapila@postgresql.o 1152 : 125 : }
1153 : :
1154 : : /*
1155 : : * Returns false, if the given index can't participate in the next execution of
1156 : : * parallel index vacuum or parallel index cleanup.
1157 : : */
1158 : : static bool
1159 : 118 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
1160 : : bool vacuum)
1161 : : {
1162 : : uint8 vacoptions;
1163 : :
1164 : 118 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
1165 : :
1166 : : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
1167 [ + + ]: 118 : if (vacuum)
1168 : 36 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
1169 : :
1170 : : /* Not safe, if the index does not support parallel cleanup */
1171 [ + + ]: 82 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
1172 [ + + ]: 66 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
1173 : 8 : return false;
1174 : :
1175 : : /*
1176 : : * Not safe, if the index supports parallel cleanup conditionally, but we
1177 : : * have already processed the index (for bulkdelete). We do this to avoid
1178 : : * the need to invoke workers when parallel index cleanup doesn't need to
1179 : : * scan the index. See the comments for option
1180 : : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
1181 : : * parallel cleanup conditionally.
1182 : : */
1183 [ + + ]: 74 : if (num_index_scans > 0 &&
1184 [ + + ]: 23 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
1185 : 21 : return false;
1186 : :
1187 : 53 : return true;
1188 : : }
1189 : :
1190 : : /*
1191 : : * Perform work within a launched parallel process.
1192 : : *
1193 : : * Since parallel vacuum workers perform only index vacuum or index cleanup,
1194 : : * we don't need to report progress information.
1195 : : */
1196 : : void
1197 : 39 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
1198 : : {
1199 : : ParallelVacuumState pvs;
1200 : : Relation rel;
1201 : : Relation *indrels;
1202 : : PVIndStats *indstats;
1203 : : PVShared *shared;
1204 : : TidStore *dead_items;
1205 : : BufferUsage *buffer_usage;
1206 : : WalUsage *wal_usage;
1207 : : int nindexes;
1208 : : char *sharedquery;
1209 : : ErrorContextCallback errcallback;
1210 : :
1211 : : /*
1212 : : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1213 : : * don't support parallel vacuum for autovacuum as of now.
1214 : : */
1215 [ - + ]: 39 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1216 : :
1217 [ + + ]: 39 : elog(DEBUG1, "starting parallel vacuum worker");
1218 : :
1219 : 39 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1220 : :
1221 : : /* Set debug_query_string for individual workers */
1222 : 39 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1223 : 39 : debug_query_string = sharedquery;
1224 : 39 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1225 : :
1226 : : /* Track query ID */
582 michael@paquier.xyz 1227 : 39 : pgstat_report_query_id(shared->queryid, false);
1228 : :
1229 : : /*
1230 : : * Open table. The lock mode is the same as the leader process. It's
1231 : : * okay because the lock mode does not conflict among the parallel
1232 : : * workers.
1233 : : */
1594 akapila@postgresql.o 1234 : 39 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1235 : :
1236 : : /*
1237 : : * Open all indexes. indrels are sorted in order by OID, which should be
1238 : : * matched to the leader's one.
1239 : : */
1240 : 39 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1241 [ - + ]: 39 : Assert(nindexes > 0);
1242 : :
1243 : : /*
1244 : : * Apply the desired value of maintenance_work_mem within this process.
1245 : : * Really we should use SetConfigOption() to change a GUC, but since we're
1246 : : * already in parallel mode guc.c would complain about that. Fortunately,
1247 : : * by the same token guc.c will not let any user-defined code change it.
1248 : : * So just avert your eyes while we do this:
1249 : : */
1250 [ + - ]: 39 : if (shared->maintenance_work_mem_worker > 0)
1251 : 39 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1252 : :
1253 : : /* Set index statistics */
1254 : 39 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1255 : : PARALLEL_VACUUM_KEY_INDEX_STATS,
1256 : : false);
1257 : :
1258 : : /* Find dead_items in shared memory */
763 msawada@postgresql.o 1259 : 39 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1260 : : shared->dead_items_handle);
1261 : :
1262 : : /* Set cost-based vacuum delay */
29 msawada@postgresql.o 1263 [ + + ]:GNC 39 : if (shared->is_autovacuum)
1264 : : {
1265 : : /*
1266 : : * Parallel autovacuum workers initialize cost-based delay parameters
1267 : : * from the leader's shared state rather than GUC defaults, because
1268 : : * the leader may have applied per-table or autovacuum-specific
1269 : : * overrides. pv_shared_cost_params must be set before calling
1270 : : * parallel_vacuum_update_shared_delay_params().
1271 : : */
1272 : 3 : pv_shared_cost_params = &(shared->cost_params);
1273 : 3 : parallel_vacuum_update_shared_delay_params();
1274 : : }
1275 : : else
1276 : 36 : VacuumUpdateCosts();
1277 : :
1594 akapila@postgresql.o 1278 :CBC 39 : VacuumCostBalance = 0;
1279 : 39 : VacuumCostBalanceLocal = 0;
1280 : 39 : VacuumSharedCostBalance = &(shared->cost_balance);
1281 : 39 : VacuumActiveNWorkers = &(shared->active_nworkers);
1282 : :
1283 : : /* Set parallel vacuum state */
1284 : 39 : pvs.indrels = indrels;
1285 : 39 : pvs.nindexes = nindexes;
1286 : 39 : pvs.indstats = indstats;
1287 : 39 : pvs.shared = shared;
1288 : 39 : pvs.dead_items = dead_items;
1289 : 39 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1290 : 39 : pvs.relname = pstrdup(RelationGetRelationName(rel));
1130 andres@anarazel.de 1291 : 39 : pvs.heaprel = rel;
1292 : :
1293 : : /* These fields will be filled during index vacuum or cleanup */
1594 akapila@postgresql.o 1294 : 39 : pvs.indname = NULL;
1295 : 39 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1296 : :
1297 : : /* Each parallel VACUUM worker gets its own access strategy. */
1124 drowley@postgresql.o 1298 : 78 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1299 : 39 : shared->ring_nbuffers * (BLCKSZ / 1024));
1300 : :
1301 : : /* Setup error traceback support for ereport() */
1594 akapila@postgresql.o 1302 : 39 : errcallback.callback = parallel_vacuum_error_callback;
1303 : 39 : errcallback.arg = &pvs;
1304 : 39 : errcallback.previous = error_context_stack;
1305 : 39 : error_context_stack = &errcallback;
1306 : :
1307 : : /* Prepare to track buffer usage during parallel execution */
1308 : 39 : InstrStartParallelQuery();
1309 : :
1310 : : /* Process indexes to perform vacuum/cleanup */
1311 : 39 : parallel_vacuum_process_safe_indexes(&pvs);
1312 : :
1313 : : /* Report buffer/WAL usage during parallel execution */
1314 : 39 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1315 : 39 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1316 : 39 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1317 : 39 : &wal_usage[ParallelWorkerNumber]);
1318 : :
1319 : : /* Report any remaining cost-based vacuum delay time */
448 nathan@postgresql.or 1320 [ - + ]: 39 : if (track_cost_delay_timing)
448 nathan@postgresql.or 1321 :UBC 0 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
1322 : : parallel_vacuum_worker_delay_ns);
1323 : :
763 msawada@postgresql.o 1324 :CBC 39 : TidStoreDetach(dead_items);
1325 : :
1326 : : /* Pop the error context stack */
1594 akapila@postgresql.o 1327 : 39 : error_context_stack = errcallback.previous;
1328 : :
1329 : 39 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1330 : 39 : table_close(rel, ShareUpdateExclusiveLock);
1331 : 39 : FreeAccessStrategy(pvs.bstrategy);
1332 : :
29 msawada@postgresql.o 1333 [ + + ]:GNC 39 : if (shared->is_autovacuum)
1334 : 3 : pv_shared_cost_params = NULL;
1594 akapila@postgresql.o 1335 :CBC 39 : }
1336 : :
1337 : : /*
1338 : : * Error context callback for errors occurring during parallel index vacuum.
1339 : : * The error context messages should match the messages set in the lazy vacuum
1340 : : * error context. If you change this function, change vacuum_error_callback()
1341 : : * as well.
1342 : : */
1343 : : static void
1594 akapila@postgresql.o 1344 :GBC 5 : parallel_vacuum_error_callback(void *arg)
1345 : : {
1346 : 5 : ParallelVacuumState *errinfo = arg;
1347 : :
1348 [ + - - ]: 5 : switch (errinfo->status)
1349 : : {
1350 : 5 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1351 : 5 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1352 : : errinfo->indname,
1353 : : errinfo->relnamespace,
1354 : : errinfo->relname);
1355 : 5 : break;
1594 akapila@postgresql.o 1356 :UBC 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1357 : 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1358 : : errinfo->indname,
1359 : : errinfo->relnamespace,
1360 : : errinfo->relname);
1361 : 0 : break;
1362 : 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1363 : : case PARALLEL_INDVAC_STATUS_COMPLETED:
1364 : : default:
1365 : 0 : return;
1366 : : }
1367 : : }
|