Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * method_worker.c
4 : : * AIO - perform AIO using worker processes
5 : : *
6 : : * IO workers consume IOs from a shared memory submission queue, run
7 : : * traditional synchronous system calls, and perform the shared completion
8 : : * handling immediately. Client code submits most requests by pushing IOs
9 : : * into the submission queue, and waits (if necessary) using condition
10 : : * variables. Some IOs cannot be performed in another process due to lack of
11 : : * infrastructure for reopening the file, and must processed synchronously by
12 : : * the client code when submitted.
13 : : *
14 : : * The pool of workers tries to stabilize at a size that can handle recently
15 : : * seen variation in demand, within the configured limits.
16 : : *
17 : : * This method of AIO is available in all builds on all operating systems, and
18 : : * is the default.
19 : : *
20 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
21 : : * Portions Copyright (c) 1994, Regents of the University of California
22 : : *
23 : : * IDENTIFICATION
24 : : * src/backend/storage/aio/method_worker.c
25 : : *
26 : : *-------------------------------------------------------------------------
27 : : */
28 : :
29 : : #include "postgres.h"
30 : :
31 : : #include <limits.h>
32 : :
33 : : #include "libpq/pqsignal.h"
34 : : #include "miscadmin.h"
35 : : #include "port/pg_bitutils.h"
36 : : #include "postmaster/auxprocess.h"
37 : : #include "postmaster/interrupt.h"
38 : : #include "storage/aio.h"
39 : : #include "storage/aio_internal.h"
40 : : #include "storage/aio_subsys.h"
41 : : #include "storage/io_worker.h"
42 : : #include "storage/ipc.h"
43 : : #include "storage/latch.h"
44 : : #include "storage/lwlock.h"
45 : : #include "storage/pmsignal.h"
46 : : #include "storage/proc.h"
47 : : #include "storage/shmem.h"
48 : : #include "tcop/tcopprot.h"
49 : : #include "utils/injection_point.h"
50 : : #include "utils/memdebug.h"
51 : : #include "utils/ps_status.h"
52 : : #include "utils/wait_event.h"
53 : :
54 : : /*
55 : : * Saturation for counters used to estimate wakeup:IO ratio.
56 : : *
57 : : * We maintain hist_wakeups for wakeups received and hist_ios for IOs
58 : : * processed by each worker. When either counter reaches this saturation
59 : : * value, we divide both by two. The result is an exponentially decaying
60 : : * ratio of wakeups to IOs, with a very short memory.
61 : : *
62 : : * If a worker is itself experiencing useless wakeups, it assumes that
63 : : * higher-numbered workers would experience even more, so it should end the
64 : : * chain.
65 : : */
66 : : #define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
67 : :
68 : : /* Debugging support: show current IO and wakeups:ios statistics in ps. */
69 : : /* #define PGAIO_WORKER_SHOW_PS_INFO */
70 : :
71 : : typedef struct PgAioWorkerSubmissionQueue
72 : : {
73 : : uint32 size;
74 : : uint32 head;
75 : : uint32 tail;
76 : : int sqes[FLEXIBLE_ARRAY_MEMBER];
77 : : } PgAioWorkerSubmissionQueue;
78 : :
79 : : typedef struct PgAioWorkerSlot
80 : : {
81 : : ProcNumber proc_number;
82 : : } PgAioWorkerSlot;
83 : :
84 : : /*
85 : : * Sets of worker IDs are held in a simple bitmap, accessed through functions
86 : : * that provide a more readable abstraction. If we wanted to support more
87 : : * workers than that, the contention on the single queue would surely get too
88 : : * high, so we might want to consider multiple pools instead of widening this.
89 : : */
90 : : typedef uint64 PgAioWorkerSet;
91 : :
92 : : #define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
93 : :
94 : : static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
95 : :
96 : : typedef struct PgAioWorkerControl
97 : : {
98 : : /* Seen by postmaster */
99 : : bool grow;
100 : : bool grow_signal_sent;
101 : :
102 : : /* Protected by AioWorkerSubmissionQueueLock. */
103 : : PgAioWorkerSet idle_workerset;
104 : :
105 : : /* Protected by AioWorkerControlLock. */
106 : : PgAioWorkerSet workerset;
107 : : int nworkers;
108 : :
109 : : /* Protected by AioWorkerControlLock. */
110 : : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
111 : : } PgAioWorkerControl;
112 : :
113 : :
114 : : static void pgaio_worker_shmem_request(void *arg);
115 : : static void pgaio_worker_shmem_init(void *arg);
116 : :
117 : : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
118 : : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
119 : :
120 : :
121 : : const IoMethodOps pgaio_worker_ops = {
122 : : .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
123 : : .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
124 : :
125 : : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
126 : : .submit = pgaio_worker_submit,
127 : : };
128 : :
129 : :
130 : : /* GUCs */
131 : : int io_min_workers = 2;
132 : : int io_max_workers = 8;
133 : : int io_worker_idle_timeout = 60000;
134 : : int io_worker_launch_interval = 100;
135 : :
136 : :
137 : : static int io_worker_queue_size = 64;
138 : : static int MyIoWorkerId = -1;
139 : : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
140 : : static PgAioWorkerControl *io_worker_control;
141 : :
142 : :
143 : : static void
27 tmunro@postgresql.or 144 :GNC 2454 : pgaio_workerset_initialize(PgAioWorkerSet *set)
145 : : {
146 : 2454 : *set = 0;
147 : 2454 : }
148 : :
149 : : static bool
150 : 1365038 : pgaio_workerset_is_empty(PgAioWorkerSet *set)
151 : : {
152 : 1365038 : return *set == 0;
153 : : }
154 : :
155 : : static PgAioWorkerSet
156 : 1733991 : pgaio_workerset_singleton(int worker)
157 : : {
158 [ + - - + ]: 1733991 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
159 : 1733991 : return UINT64_C(1) << worker;
160 : : }
161 : :
162 : : static void
163 : 1312 : pgaio_workerset_all(PgAioWorkerSet *set)
164 : : {
165 : 1312 : *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
27 tmunro@postgresql.or 166 :GIC 1312 : }
167 : :
168 : : static void
27 tmunro@postgresql.or 169 :GNC 1312 : pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
170 : : {
171 : 1312 : *set1 &= ~*set2;
172 : 1312 : }
173 : :
174 : : static void
175 : 531368 : pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
176 : : {
177 [ + - - + ]: 531368 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
178 : 531368 : *set |= pgaio_workerset_singleton(worker);
179 : 531368 : }
180 : :
181 : : static void
182 : 1199999 : pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
183 : : {
184 [ + - - + ]: 1199999 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
185 : 1199999 : *set &= ~pgaio_workerset_singleton(worker);
186 : 1199999 : }
187 : :
188 : : static void
189 : 13241 : pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
190 : : {
191 [ + - - + ]: 13241 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
192 : 13241 : *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
193 : 13241 : }
194 : :
195 : : static int
196 : 12031 : pgaio_workerset_get_highest(PgAioWorkerSet *set)
197 : : {
198 [ - + ]: 12031 : Assert(!pgaio_workerset_is_empty(set));
199 : 12031 : return pg_leftmost_one_pos64(*set);
200 : : }
201 : :
202 : : static int
203 : 662328 : pgaio_workerset_get_lowest(PgAioWorkerSet *set)
204 : : {
205 [ - + ]: 662328 : Assert(!pgaio_workerset_is_empty(set));
206 : 662328 : return pg_rightmost_one_pos64(*set);
207 : : }
208 : :
209 : : static int
210 : 2556 : pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
211 : : {
212 : 2556 : int worker = pgaio_workerset_get_lowest(set);
213 : :
214 : 2556 : pgaio_workerset_remove(set, worker);
215 : 2556 : return worker;
216 : : }
217 : :
218 : : #ifdef USE_ASSERT_CHECKING
219 : : static bool
220 : 2624 : pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
221 : : {
222 [ + - - + ]: 2624 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
223 : 2624 : return (*set & pgaio_workerset_singleton(worker)) != 0;
224 : : }
225 : :
226 : : static int
227 : 2624 : pgaio_workerset_count(PgAioWorkerSet *set)
228 : : {
229 : 2624 : return pg_popcount64(*set);
230 : : }
231 : : #endif
232 : :
233 : : static void
29 heikki.linnakangas@i 234 : 1230 : pgaio_worker_shmem_request(void *arg)
235 : : {
236 : : size_t size;
237 : : int queue_size;
238 : :
239 : : /* Round size up to next power of two so we can make a mask. */
240 : 1230 : queue_size = pg_nextpower2_32(io_worker_queue_size);
241 : :
242 : 1230 : size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
243 : 1230 : ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
244 : : .size = size,
245 : : .ptr = (void **) &io_worker_submission_queue,
246 : : );
247 : :
248 : 1230 : size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
249 : 1230 : ShmemRequestStruct(.name = "AioWorkerControl",
250 : : .size = size,
251 : : .ptr = (void **) &io_worker_control,
252 : : );
413 andres@anarazel.de 253 : 1230 : }
254 : :
255 : : static void
29 heikki.linnakangas@i 256 : 1227 : pgaio_worker_shmem_init(void *arg)
257 : : {
258 : : int queue_size;
259 : :
260 : : /* Round size up like in pgaio_worker_shmem_request() */
261 : 1227 : queue_size = pg_nextpower2_32(io_worker_queue_size);
262 : :
263 : 1227 : io_worker_submission_queue->size = queue_size;
264 : 1227 : io_worker_submission_queue->head = 0;
265 : 1227 : io_worker_submission_queue->tail = 0;
27 tmunro@postgresql.or 266 : 1227 : io_worker_control->grow = false;
267 : 1227 : pgaio_workerset_initialize(&io_worker_control->workerset);
268 : 1227 : pgaio_workerset_initialize(&io_worker_control->idle_workerset);
269 : :
29 heikki.linnakangas@i 270 [ + + ]: 40491 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
27 tmunro@postgresql.or 271 : 39264 : io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
272 : 1227 : }
273 : :
274 : : /*
275 : : * Tell postmaster that we think a new worker is needed.
276 : : */
277 : : static void
278 : 250 : pgaio_worker_request_grow(void)
279 : : {
280 : : /*
281 : : * Suppress useless signaling if we already know that we're at the
282 : : * maximum. This uses an unlocked read of nworkers, but that's OK for
283 : : * this heuristic purpose.
284 : : */
285 [ - + ]: 250 : if (io_worker_control->nworkers >= io_max_workers)
27 tmunro@postgresql.or 286 :UNC 0 : return;
287 : :
288 : : /* Already requested? */
27 tmunro@postgresql.or 289 [ + + ]:GNC 250 : if (io_worker_control->grow)
290 : 173 : return;
291 : :
292 : 77 : io_worker_control->grow = true;
293 : 77 : pg_memory_barrier();
294 : :
295 : : /*
296 : : * If the postmaster has already been signaled, don't do it again until
297 : : * the postmaster clears this flag. There is no point in repeated signals
298 : : * if grow is being set and cleared repeatedly while the postmaster is
299 : : * waiting for io_worker_launch_interval, which it applies even to
300 : : * canceled requests.
301 : : */
302 [ + + ]: 77 : if (io_worker_control->grow_signal_sent)
303 : 35 : return;
304 : :
305 : 42 : io_worker_control->grow_signal_sent = true;
306 : 42 : pg_memory_barrier();
307 : 42 : SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
308 : : }
309 : :
310 : : /*
311 : : * Cancel any request for a new worker, after observing an empty queue.
312 : : */
313 : : static void
314 : 530056 : pgaio_worker_cancel_grow(void)
315 : : {
316 [ + + ]: 530056 : if (!io_worker_control->grow)
317 : 529979 : return;
318 : :
319 : 77 : io_worker_control->grow = false;
320 : 77 : pg_memory_barrier();
321 : : }
322 : :
323 : : /*
324 : : * Called by the postmaster to check if a new worker has been requested (but
325 : : * possibly canceled since).
326 : : */
327 : : bool
328 : 82340 : pgaio_worker_pm_test_grow_signal_sent(void)
329 : : {
330 : 82340 : pg_memory_barrier();
331 [ + - + + ]: 82340 : return io_worker_control && io_worker_control->grow_signal_sent;
332 : : }
333 : :
334 : : /*
335 : : * Called by the postmaster to check if a new worker has been requested and
336 : : * not canceled since.
337 : : */
338 : : bool
339 : 59 : pgaio_worker_pm_test_grow(void)
340 : : {
341 : 59 : pg_memory_barrier();
342 [ + - + + ]: 59 : return io_worker_control && io_worker_control->grow;
343 : : }
344 : :
345 : : /*
346 : : * Called by the postmaster to clear the request for a new worker.
347 : : */
348 : : void
349 : 48 : pgaio_worker_pm_clear_grow_signal_sent(void)
350 : : {
351 [ - + ]: 48 : if (!io_worker_control)
27 tmunro@postgresql.or 352 :UNC 0 : return;
353 : :
27 tmunro@postgresql.or 354 :GNC 48 : io_worker_control->grow = false;
355 : 48 : io_worker_control->grow_signal_sent = false;
356 : 48 : pg_memory_barrier();
413 andres@anarazel.de 357 :ECB (1060) : }
358 : :
359 : : static int
27 tmunro@postgresql.or 360 :GNC 684187 : pgaio_worker_choose_idle(int only_workers_above)
361 : : {
362 : : PgAioWorkerSet workerset;
363 : : int worker;
364 : :
365 [ - + ]: 684187 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
366 : :
367 : 684187 : workerset = io_worker_control->idle_workerset;
368 [ + + ]: 684187 : if (only_workers_above >= 0)
369 : 13241 : pgaio_workerset_remove_lte(&workerset, only_workers_above);
370 [ + + ]: 684187 : if (pgaio_workerset_is_empty(&workerset))
413 andres@anarazel.de 371 :CBC 25727 : return -1;
372 : :
373 : : /* Find the lowest numbered idle worker and mark it not idle. */
27 tmunro@postgresql.or 374 :GNC 658460 : worker = pgaio_workerset_get_lowest(&workerset);
375 : 658460 : pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
376 : :
413 andres@anarazel.de 377 :CBC 658460 : return worker;
378 : : }
379 : :
380 : : /*
381 : : * Try to wake a worker by setting its latch, to tell it there are IOs to
382 : : * process in the submission queue.
383 : : */
384 : : static void
27 tmunro@postgresql.or 385 :GNC 661016 : pgaio_worker_wake(int worker)
386 : : {
387 : : ProcNumber proc_number;
388 : :
389 : : /*
390 : : * If the selected worker is concurrently exiting, then pgaio_worker_die()
391 : : * had not yet removed it as of when we saw it in idle_workerset. That's
392 : : * OK, because it will wake all remaining workers to close wakeup-vs-exit
393 : : * races: *someone* will see the queued IO. If there are no workers
394 : : * running, the postmaster will start a new one.
395 : : */
396 : 661016 : proc_number = io_worker_control->workers[worker].proc_number;
397 [ + + ]: 661016 : if (proc_number != INVALID_PROC_NUMBER)
398 : 660973 : SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
399 : 661016 : }
400 : :
401 : : /*
402 : : * Try to wake a set of workers. Used on pool change, to close races
403 : : * described in the callers.
404 : : */
405 : : static void
406 : 2624 : pgaio_workerset_wake(PgAioWorkerSet workerset)
407 : : {
408 [ + + ]: 5180 : while (!pgaio_workerset_is_empty(&workerset))
409 : 2556 : pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
410 : 2624 : }
411 : :
412 : : static bool
413 andres@anarazel.de 413 :CBC 671066 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
414 : : {
415 : : PgAioWorkerSubmissionQueue *queue;
416 : : uint32 new_head;
417 : :
27 tmunro@postgresql.or 418 [ - + ]:GNC 671066 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
419 : :
413 andres@anarazel.de 420 :CBC 671066 : queue = io_worker_submission_queue;
421 : 671066 : new_head = (queue->head + 1) & (queue->size - 1);
422 [ - + ]: 671066 : if (new_head == queue->tail)
423 : : {
413 andres@anarazel.de 424 [ # # ]:UBC 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
425 : : io_worker_submission_queue->size);
426 : 0 : return false; /* full */
427 : : }
428 : :
297 tmunro@postgresql.or 429 :CBC 671066 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
413 andres@anarazel.de 430 : 671066 : queue->head = new_head;
431 : :
432 : 671066 : return true;
433 : : }
434 : :
435 : : static int
436 : 1066415 : pgaio_worker_submission_queue_consume(void)
437 : : {
438 : : PgAioWorkerSubmissionQueue *queue;
439 : : int result;
440 : :
27 tmunro@postgresql.or 441 [ - + ]:GNC 1066415 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
442 : :
413 andres@anarazel.de 443 :CBC 1066415 : queue = io_worker_submission_queue;
444 [ + + ]: 1066415 : if (queue->tail == queue->head)
257 peter@eisentraut.org 445 : 530056 : return -1; /* empty */
446 : :
297 tmunro@postgresql.or 447 : 536359 : result = queue->sqes[queue->tail];
413 andres@anarazel.de 448 : 536359 : queue->tail = (queue->tail + 1) & (queue->size - 1);
449 : :
450 : 536359 : return result;
451 : : }
452 : :
453 : : static uint32
454 : 308425 : pgaio_worker_submission_queue_depth(void)
455 : : {
456 : : uint32 head;
457 : : uint32 tail;
458 : :
27 tmunro@postgresql.or 459 [ - + ]:GNC 308425 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
460 : :
413 andres@anarazel.de 461 :CBC 308425 : head = io_worker_submission_queue->head;
462 : 308425 : tail = io_worker_submission_queue->tail;
463 : :
464 [ + + ]: 308425 : if (tail > head)
465 : 215 : head += io_worker_submission_queue->size;
466 : :
467 [ - + ]: 308425 : Assert(head >= tail);
468 : :
469 : 308425 : return head - tail;
470 : : }
471 : :
472 : : static bool
473 : 1350360 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
474 : : {
475 : : return
476 : 1350360 : !IsUnderPostmaster
477 [ + + ]: 1346683 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
478 [ + + - + ]: 2697043 : || !pgaio_io_can_reopen(ioh);
479 : : }
480 : :
481 : : static int
30 tmunro@postgresql.or 482 :GNC 673559 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
483 : : {
55 tomas.vondra@postgre 484 : 673559 : PgAioHandle **synchronous_ios = NULL;
413 andres@anarazel.de 485 :CBC 673559 : int nsync = 0;
27 tmunro@postgresql.or 486 :GNC 673559 : int worker = -1;
487 : :
297 tmunro@postgresql.or 488 [ - + ]:CBC 673559 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
489 : :
30 tmunro@postgresql.or 490 [ + + ]:GNC 1347238 : for (int i = 0; i < num_staged_ios; i++)
491 : 673679 : pgaio_io_prepare_submit(staged_ios[i]);
492 : :
55 tomas.vondra@postgre 493 [ + + ]: 673559 : if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
494 : : {
495 [ + + ]: 1342012 : for (int i = 0; i < num_staged_ios; ++i)
496 : : {
497 [ - + ]: 671066 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
498 [ - + ]: 671066 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
499 : : {
500 : : /*
501 : : * Do the rest synchronously. If the queue is full, give up
502 : : * and do the rest synchronously. We're holding an exclusive
503 : : * lock on the queue so nothing can consume entries.
504 : : */
55 tomas.vondra@postgre 505 :UNC 0 : synchronous_ios = &staged_ios[i];
506 : 0 : nsync = (num_staged_ios - i);
507 : :
508 : 0 : break;
509 : : }
510 : : }
511 : : /* Choose one worker to wake for this batch. */
27 tmunro@postgresql.or 512 :GNC 670946 : worker = pgaio_worker_choose_idle(-1);
55 tomas.vondra@postgre 513 : 670946 : LWLockRelease(AioWorkerSubmissionQueueLock);
514 : :
515 : : /* Wake up chosen worker. It will wake peers if necessary. */
27 tmunro@postgresql.or 516 [ + + ]: 670946 : if (worker != -1)
517 : 656134 : pgaio_worker_wake(worker);
518 : : }
519 : : else
520 : : {
521 : : /* do everything synchronously, no wakeup needed */
55 tomas.vondra@postgre 522 : 2613 : synchronous_ios = staged_ios;
523 : 2613 : nsync = num_staged_ios;
524 : : }
525 : :
526 : : /* Run whatever is left synchronously. */
413 andres@anarazel.de 527 [ + + ]:CBC 673559 : if (nsync > 0)
528 : : {
413 andres@anarazel.de 529 [ + + ]:GBC 5226 : for (int i = 0; i < nsync; ++i)
530 : : {
531 : 2613 : pgaio_io_perform_synchronously(synchronous_ios[i]);
532 : : }
533 : : }
534 : :
413 andres@anarazel.de 535 :CBC 673559 : return num_staged_ios;
536 : : }
537 : :
538 : : /*
539 : : * on_shmem_exit() callback that releases the worker's slot in
540 : : * io_worker_control.
541 : : */
542 : : static void
543 : 1312 : pgaio_worker_die(int code, Datum arg)
544 : : {
545 : : PgAioWorkerSet notify_set;
546 : :
27 tmunro@postgresql.or 547 :GNC 1312 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
548 : 1312 : pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
413 andres@anarazel.de 549 :CBC 1312 : LWLockRelease(AioWorkerSubmissionQueueLock);
550 : :
27 tmunro@postgresql.or 551 :GNC 1312 : LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
552 [ - + ]: 1312 : Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
553 : 1312 : io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
554 [ - + ]: 1312 : Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
555 : 1312 : pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
556 : 1312 : notify_set = io_worker_control->workerset;
557 [ - + ]: 1312 : Assert(io_worker_control->nworkers > 0);
558 : 1312 : io_worker_control->nworkers--;
559 [ - + ]: 1312 : Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
560 : : io_worker_control->nworkers);
561 : 1312 : LWLockRelease(AioWorkerControlLock);
562 : :
563 : : /*
564 : : * Notify other workers on pool change. This allows the new highest
565 : : * worker to know that it is now the one that can time out, and closes a
566 : : * wakeup-loss race described in pgaio_worker_wake().
567 : : */
568 : 1312 : pgaio_workerset_wake(notify_set);
413 andres@anarazel.de 569 :CBC 1312 : }
570 : :
571 : : /*
572 : : * Register the worker in shared memory, assign MyIoWorkerId and register a
573 : : * shutdown callback to release registration.
574 : : */
575 : : static void
576 : 1312 : pgaio_worker_register(void)
577 : : {
578 : : PgAioWorkerSet free_workerset;
579 : : PgAioWorkerSet old_workerset;
580 : :
581 : 1312 : MyIoWorkerId = -1;
582 : :
27 tmunro@postgresql.or 583 :GNC 1312 : LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
584 : : /* Find lowest unused worker ID. */
585 : 1312 : pgaio_workerset_all(&free_workerset);
586 : 1312 : pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
587 [ + - ]: 1312 : if (!pgaio_workerset_is_empty(&free_workerset))
588 : 1312 : MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
589 [ - + ]: 1312 : if (MyIoWorkerId == -1)
27 tmunro@postgresql.or 590 [ # # ]:UNC 0 : elog(ERROR, "couldn't find a free worker ID");
591 : :
27 tmunro@postgresql.or 592 [ - + ]:GNC 1312 : Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
593 : : INVALID_PROC_NUMBER);
594 : 1312 : io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
595 : :
596 : 1312 : old_workerset = io_worker_control->workerset;
597 [ - + ]: 1312 : Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
598 : 1312 : pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
599 : 1312 : io_worker_control->nworkers++;
600 [ - + ]: 1312 : Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
601 [ - + ]: 1312 : Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
602 : : io_worker_control->nworkers);
603 : 1312 : LWLockRelease(AioWorkerControlLock);
604 : :
605 : : /*
606 : : * Notify other workers on pool change. If we were the highest worker,
607 : : * this allows the new highest worker to know that it can time out.
608 : : */
609 : 1312 : pgaio_workerset_wake(old_workerset);
610 : :
413 andres@anarazel.de 611 :CBC 1312 : on_shmem_exit(pgaio_worker_die, 0);
612 : 1312 : }
613 : :
614 : : static void
399 melanieplageman@gmai 615 : 1652 : pgaio_worker_error_callback(void *arg)
616 : : {
617 : : ProcNumber owner;
618 : : PGPROC *owner_proc;
619 : : int32 owner_pid;
620 : 1652 : PgAioHandle *ioh = arg;
621 : :
622 [ + + ]: 1652 : if (!ioh)
623 : 12 : return;
624 : :
625 [ - + ]: 1640 : Assert(ioh->owner_procno != MyProcNumber);
626 [ - + ]: 1640 : Assert(MyBackendType == B_IO_WORKER);
627 : :
628 : 1640 : owner = ioh->owner_procno;
629 : 1640 : owner_proc = GetPGProcByNumber(owner);
630 : 1640 : owner_pid = owner_proc->pid;
631 : :
632 : 1640 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
633 : : }
634 : :
635 : : /*
636 : : * Check if this backend is allowed to time out, and thus should use a
637 : : * non-infinite sleep time. Only the highest-numbered worker is allowed to
638 : : * time out, and only if the pool is above io_min_workers. Serializing
639 : : * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
640 : : * io_min_workers.
641 : : *
642 : : * The result is only instantaneously true and may be temporarily inconsistent
643 : : * in different workers around transitions, but all workers are woken up on
644 : : * pool size or GUC changes making the result eventually consistent.
645 : : */
646 : : static bool
27 tmunro@postgresql.or 647 :GNC 530070 : pgaio_worker_can_timeout(void)
648 : : {
649 : : PgAioWorkerSet workerset;
650 : :
651 [ + + ]: 530070 : if (MyIoWorkerId < io_min_workers)
652 : 518039 : return false;
653 : :
654 : : /* Serialize against pool size changes. */
655 : 12031 : LWLockAcquire(AioWorkerControlLock, LW_SHARED);
656 : 12031 : workerset = io_worker_control->workerset;
657 : 12031 : LWLockRelease(AioWorkerControlLock);
658 : :
659 [ + + ]: 12031 : if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
660 : 7870 : return false;
661 : :
662 : 4161 : return true;
663 : : }
664 : :
665 : : void
413 andres@anarazel.de 666 :CBC 1312 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
667 : : {
668 : : sigjmp_buf local_sigjmp_buf;
27 tmunro@postgresql.or 669 :GNC 1312 : TimestampTz idle_timeout_abs = 0;
670 : 1312 : int timeout_guc_used = 0;
413 andres@anarazel.de 671 :CBC 1312 : PgAioHandle *volatile error_ioh = NULL;
399 melanieplageman@gmai 672 : 1312 : ErrorContextCallback errcallback = {0};
413 andres@anarazel.de 673 : 1312 : volatile int error_errno = 0;
674 : : char cmd[128];
27 tmunro@postgresql.or 675 :GNC 1312 : int hist_ios = 0;
676 : 1312 : int hist_wakeups = 0;
677 : :
413 andres@anarazel.de 678 :CBC 1312 : AuxiliaryProcessMainCommon();
679 : :
680 : 1312 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
681 : 1312 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
682 : :
683 : : /*
684 : : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
685 : : * shutdown sequence, similar to checkpointer.
686 : : */
21 andrew@dunslane.net 687 :GNC 1312 : pqsignal(SIGTERM, PG_SIG_IGN);
688 : : /* SIGQUIT handler was already set up by InitPostmasterChild */
689 : 1312 : pqsignal(SIGALRM, PG_SIG_IGN);
690 : 1312 : pqsignal(SIGPIPE, PG_SIG_IGN);
413 andres@anarazel.de 691 :CBC 1312 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
692 : 1312 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
693 : :
694 : : /* also registers a shutdown callback to unregister */
695 : 1312 : pgaio_worker_register();
696 : :
409 tmunro@postgresql.or 697 : 1312 : sprintf(cmd, "%d", MyIoWorkerId);
413 andres@anarazel.de 698 : 1312 : set_ps_display(cmd);
699 : :
399 melanieplageman@gmai 700 : 1312 : errcallback.callback = pgaio_worker_error_callback;
701 : 1312 : errcallback.previous = error_context_stack;
702 : 1312 : error_context_stack = &errcallback;
703 : :
704 : : /* see PostgresMain() */
413 andres@anarazel.de 705 [ + + ]: 1312 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
706 : : {
707 : 1 : error_context_stack = NULL;
708 : 1 : HOLD_INTERRUPTS();
709 : :
710 : 1 : EmitErrorReport();
711 : :
712 : : /*
713 : : * In the - very unlikely - case that the IO failed in a way that
714 : : * raises an error we need to mark the IO as failed.
715 : : *
716 : : * Need to do just enough error recovery so that we can mark the IO as
717 : : * failed and then exit (postmaster will start a new worker).
718 : : */
719 : 1 : LWLockReleaseAll();
720 : :
721 [ + - ]: 1 : if (error_ioh != NULL)
722 : : {
723 : : /* should never fail without setting error_errno */
724 [ - + ]: 1 : Assert(error_errno != 0);
725 : :
726 : 1 : errno = error_errno;
727 : :
728 : 1 : START_CRIT_SECTION();
729 : 1 : pgaio_io_process_completion(error_ioh, -error_errno);
730 [ - + ]: 1 : END_CRIT_SECTION();
731 : : }
732 : :
733 : 1 : proc_exit(1);
734 : : }
735 : :
736 : : /* We can now handle ereport(ERROR) */
737 : 1312 : PG_exception_stack = &local_sigjmp_buf;
738 : :
739 : 1312 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
740 : :
741 [ + + ]: 1067702 : while (!ShutdownRequestPending)
742 : : {
743 : : uint32 io_index;
27 tmunro@postgresql.or 744 :GNC 1066415 : int worker = -1;
745 : 1066415 : int queue_depth = 0;
746 : 1066415 : bool maybe_grow = false;
747 : :
748 : : /*
749 : : * Try to get a job to do.
750 : : *
751 : : * The lwlock acquisition also provides the necessary memory barrier
752 : : * to ensure that we don't see an outdated data in the handle.
753 : : */
413 andres@anarazel.de 754 :CBC 1066415 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
257 peter@eisentraut.org 755 [ + + ]: 1066415 : if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
756 : : {
757 : : /* Nothing to do. Mark self idle. */
27 tmunro@postgresql.or 758 :GNC 530056 : pgaio_workerset_insert(&io_worker_control->idle_workerset,
759 : : MyIoWorkerId);
760 : : }
761 : : else
762 : : {
763 : : /* Got one. Clear idle flag. */
764 : 536359 : pgaio_workerset_remove(&io_worker_control->idle_workerset,
765 : : MyIoWorkerId);
766 : :
767 : : /*
768 : : * See if we should wake up a higher numbered peer. Only do that
769 : : * if this worker is not receiving spurious wakeups itself. The
770 : : * intention is create a frontier beyond which idle workers stay
771 : : * asleep.
772 : : *
773 : : * This heuristic tries to discover the useful wakeup propagation
774 : : * chain length when IOs are very fast and workers wake up to find
775 : : * that all IOs have already been taken.
776 : : *
777 : : * If we chose not to wake a worker when we ideally should have,
778 : : * then the ratio will soon change to correct that.
779 : : */
780 [ + + ]: 536359 : if (hist_wakeups <= hist_ios)
781 : : {
782 : 308425 : queue_depth = pgaio_worker_submission_queue_depth();
783 [ + + ]: 308425 : if (queue_depth > 0)
784 : : {
785 : : /* Choose a worker higher than me to wake. */
786 : 13241 : worker = pgaio_worker_choose_idle(MyIoWorkerId);
787 [ + + ]: 13241 : if (worker == -1)
788 : 10915 : maybe_grow = true;
789 : : }
790 : : }
791 : : }
413 andres@anarazel.de 792 :CBC 1066415 : LWLockRelease(AioWorkerSubmissionQueueLock);
793 : :
794 : : /* Propagate wakeups. */
27 tmunro@postgresql.or 795 [ + + ]:GNC 1066415 : if (worker != -1)
796 : : {
797 : 2326 : pgaio_worker_wake(worker);
798 : : }
799 [ + + ]: 1064089 : else if (maybe_grow)
800 : : {
801 : : /*
802 : : * We know there was at least one more item in the queue, and we
803 : : * failed to find a higher-numbered idle worker to wake. Now we
804 : : * decide if we should try to start one more worker.
805 : : *
806 : : * We do this with a simple heuristic: is the queue depth greater
807 : : * than the current number of workers?
808 : : *
809 : : * Consider the following situations:
810 : : *
811 : : * 1. The queue depth is constantly increasing, because IOs are
812 : : * arriving faster than they can possibly be serviced. It doesn't
813 : : * matter much which threshold we choose, as we will surely hit
814 : : * it. Crossing the current worker count is a useful signal
815 : : * because it's clearly too deep to avoid queuing latency already,
816 : : * but still leaves a small window of opportunity to improve the
817 : : * situation before the queue overflows.
818 : : *
819 : : * 2. The worker pool is keeping up, no latency is being
820 : : * introduced and an extra worker would be a waste of resources.
821 : : * Queue depth distributions tend to be heavily skewed, with long
822 : : * tails of low probability spikes (due to submission clustering,
823 : : * scheduling, jitter, stalls, noisy neighbors, etc). We want a
824 : : * number that is very unlikely to be triggered by an outlier, and
825 : : * we bet that an exponential or similar distribution whose
826 : : * outliers never reach this threshold must be almost entirely
827 : : * concentrated at the low end. If we do see a spike as big as
828 : : * the worker count, we take it as a signal that the distribution
829 : : * is surely too wide.
830 : : *
831 : : * On its own, this is an extremely crude signal. When combined
832 : : * with the wakeup propagation test that precedes it (but on its
833 : : * own tends to overshoot) and io_worker_launch_interval, the
834 : : * result is that we gradually test each pool size until we find
835 : : * one that doesn't trigger further expansion, and then hold it
836 : : * for at least io_worker_idle_timeout.
837 : : *
838 : : * XXX Perhaps ideas from queueing theory or control theory could
839 : : * do a better job of this.
840 : : */
841 : :
842 : : /* Read nworkers without lock for this heuristic purpose. */
843 [ + + ]: 10915 : if (queue_depth > io_worker_control->nworkers)
844 : 250 : pgaio_worker_request_grow();
845 : : }
846 : :
257 peter@eisentraut.org 847 [ + + ]:CBC 1066415 : if (io_index != -1)
848 : : {
413 andres@anarazel.de 849 : 536359 : PgAioHandle *ioh = NULL;
850 : :
851 : : /* Cancel timeout and update wakeup:work ratio. */
27 tmunro@postgresql.or 852 :GNC 536359 : idle_timeout_abs = 0;
853 [ + + ]: 536359 : if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
854 : : {
855 : 154278 : hist_wakeups /= 2;
856 : 154278 : hist_ios /= 2;
857 : : }
858 : :
413 andres@anarazel.de 859 :CBC 536359 : ioh = &pgaio_ctl->io_handles[io_index];
860 : 536359 : error_ioh = ioh;
399 melanieplageman@gmai 861 : 536359 : errcallback.arg = ioh;
862 : :
413 andres@anarazel.de 863 [ - + ]: 536359 : pgaio_debug_io(DEBUG4, ioh,
864 : : "worker %d processing IO",
865 : : MyIoWorkerId);
866 : :
867 : : /*
868 : : * Prevent interrupts between pgaio_io_reopen() and
869 : : * pgaio_io_perform_synchronously() that otherwise could lead to
870 : : * the FD getting closed in that window.
871 : : */
405 872 : 536359 : HOLD_INTERRUPTS();
873 : :
874 : : /*
875 : : * It's very unlikely, but possible, that reopen fails. E.g. due
876 : : * to memory allocations failing or file permissions changing or
877 : : * such. In that case we need to fail the IO.
878 : : *
879 : : * There's not really a good errno we can report here.
880 : : */
413 881 : 536359 : error_errno = ENOENT;
882 : 536359 : pgaio_io_reopen(ioh);
883 : :
884 : : /*
885 : : * To be able to exercise the reopen-fails path, allow injection
886 : : * points to trigger a failure at this point.
887 : : */
360 michael@paquier.xyz 888 : 536359 : INJECTION_POINT("aio-worker-after-reopen", ioh);
889 : :
413 andres@anarazel.de 890 : 536358 : error_errno = 0;
891 : 536358 : error_ioh = NULL;
892 : :
893 : : /*
894 : : * As part of IO completion the buffer will be marked as NOACCESS,
895 : : * until the buffer is pinned again - which never happens in io
896 : : * workers. Therefore the next time there is IO for the same
897 : : * buffer, the memory will be considered inaccessible. To avoid
898 : : * that, explicitly allow access to the memory before reading data
899 : : * into it.
900 : : */
901 : : #ifdef USE_VALGRIND
902 : : {
903 : : struct iovec *iov;
904 : : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
905 : :
906 : : for (int i = 0; i < iov_length; i++)
907 : : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
908 : : }
909 : : #endif
910 : :
911 : : #ifdef PGAIO_WORKER_SHOW_PS_INFO
912 : : {
913 : : char *description = pgaio_io_get_target_description(ioh);
914 : :
915 : : sprintf(cmd, "%d: [%s] %s",
916 : : MyIoWorkerId,
917 : : pgaio_io_get_op_name(ioh),
918 : : description);
919 : : pfree(description);
920 : : set_ps_display(cmd);
921 : : }
922 : : #endif
923 : :
924 : : /*
925 : : * We don't expect this to ever fail with ERROR or FATAL, no need
926 : : * to keep error_ioh set to the IO.
927 : : * pgaio_io_perform_synchronously() contains a critical section to
928 : : * ensure we don't accidentally fail.
929 : : */
930 : 536358 : pgaio_io_perform_synchronously(ioh);
931 : :
405 932 [ - + ]: 536358 : RESUME_INTERRUPTS();
399 melanieplageman@gmai 933 : 536358 : errcallback.arg = NULL;
934 : : }
935 : : else
936 : : {
937 : : int timeout_ms;
938 : :
939 : : /* Cancel new worker request if pending. */
27 tmunro@postgresql.or 940 :GNC 530056 : pgaio_worker_cancel_grow();
941 : :
942 : : /* Compute the remaining allowed idle time. */
943 [ - + ]: 530056 : if (io_worker_idle_timeout == -1)
944 : : {
945 : : /* Never time out. */
27 tmunro@postgresql.or 946 :UNC 0 : timeout_ms = -1;
947 : : }
948 : : else
949 : : {
27 tmunro@postgresql.or 950 :GNC 530056 : TimestampTz now = GetCurrentTimestamp();
951 : :
952 : : /* If the GUC changes, reset timer. */
953 [ + + ]: 530056 : if (idle_timeout_abs != 0 &&
954 [ - + ]: 1152 : io_worker_idle_timeout != timeout_guc_used)
27 tmunro@postgresql.or 955 :UNC 0 : idle_timeout_abs = 0;
956 : :
957 : : /* Only the highest-numbered worker can time out. */
27 tmunro@postgresql.or 958 [ + + ]:GNC 530056 : if (pgaio_worker_can_timeout())
959 : : {
960 [ + + ]: 4147 : if (idle_timeout_abs == 0)
961 : : {
962 : : /*
963 : : * I have just been promoted to the timeout worker, or
964 : : * the GUC changed. Compute new absolute time from
965 : : * now.
966 : : */
967 : 2996 : idle_timeout_abs =
968 : 2996 : TimestampTzPlusMilliseconds(now,
969 : : io_worker_idle_timeout);
970 : 2996 : timeout_guc_used = io_worker_idle_timeout;
971 : : }
972 : 4147 : timeout_ms =
973 : 4147 : TimestampDifferenceMilliseconds(now, idle_timeout_abs);
974 : : }
975 : : else
976 : : {
977 : : /* No timeout for me. */
978 : 525909 : idle_timeout_abs = 0;
979 : 525909 : timeout_ms = -1;
980 : : }
981 : : }
982 : :
983 : : #ifdef PGAIO_WORKER_SHOW_PS_INFO
984 : : sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
985 : : MyIoWorkerId, hist_wakeups, hist_ios);
986 : : set_ps_display(cmd);
987 : : #endif
988 : :
989 [ + + ]: 530056 : if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
990 : : timeout_ms,
991 : : WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
992 : : {
993 : : /* WL_TIMEOUT */
994 [ + - ]: 14 : if (pgaio_worker_can_timeout())
995 [ + - ]: 14 : if (GetCurrentTimestamp() >= idle_timeout_abs)
996 : 14 : break;
997 : : }
998 : : else
999 : : {
1000 : : /* WL_LATCH_SET */
1001 [ + + ]: 530036 : if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
1002 : : {
1003 : 119440 : hist_wakeups /= 2;
1004 : 119440 : hist_ios /= 2;
1005 : : }
1006 : : }
413 andres@anarazel.de 1007 :CBC 530036 : ResetLatch(MyLatch);
1008 : : }
1009 : :
1010 [ + + ]: 1066394 : CHECK_FOR_INTERRUPTS();
1011 : :
297 tmunro@postgresql.or 1012 [ + + ]: 1066390 : if (ConfigReloadPending)
1013 : : {
1014 : 234 : ConfigReloadPending = false;
1015 : 234 : ProcessConfigFile(PGC_SIGHUP);
1016 : :
1017 : : /* If io_max_workers has been decreased, exit highest first. */
27 tmunro@postgresql.or 1018 [ - + ]:GNC 234 : if (MyIoWorkerId >= io_max_workers)
27 tmunro@postgresql.or 1019 :UNC 0 : break;
1020 : : }
1021 : : }
1022 : :
399 melanieplageman@gmai 1023 :CBC 1301 : error_context_stack = errcallback.previous;
413 andres@anarazel.de 1024 : 1301 : proc_exit(0);
1025 : : }
1026 : :
1027 : : bool
1028 : 92315 : pgaio_workers_enabled(void)
1029 : : {
1030 : 92315 : return io_method == IOMETHOD_WORKER;
1031 : : }
|