Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * method_worker.c
4 : : * AIO - perform AIO using worker processes
5 : : *
6 : : * IO workers consume IOs from a shared memory submission queue, run
7 : : * traditional synchronous system calls, and perform the shared completion
8 : : * handling immediately. Client code submits most requests by pushing IOs
9 : : * into the submission queue, and waits (if necessary) using condition
10 : : * variables. Some IOs cannot be performed in another process due to lack of
11 : : * infrastructure for reopening the file, and must processed synchronously by
12 : : * the client code when submitted.
13 : : *
14 : : * So that the submitter can make just one system call when submitting a batch
15 : : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : : * could be improved by using futexes instead of latches to wake N waiters.
17 : : *
18 : : * This method of AIO is available in all builds on all operating systems, and
19 : : * is the default.
20 : : *
21 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
22 : : * Portions Copyright (c) 1994, Regents of the University of California
23 : : *
24 : : * IDENTIFICATION
25 : : * src/backend/storage/aio/method_worker.c
26 : : *
27 : : *-------------------------------------------------------------------------
28 : : */
29 : :
30 : : #include "postgres.h"
31 : :
32 : : #include "libpq/pqsignal.h"
33 : : #include "miscadmin.h"
34 : : #include "port/pg_bitutils.h"
35 : : #include "postmaster/auxprocess.h"
36 : : #include "postmaster/interrupt.h"
37 : : #include "storage/aio.h"
38 : : #include "storage/aio_internal.h"
39 : : #include "storage/aio_subsys.h"
40 : : #include "storage/io_worker.h"
41 : : #include "storage/ipc.h"
42 : : #include "storage/latch.h"
43 : : #include "storage/proc.h"
44 : : #include "tcop/tcopprot.h"
45 : : #include "utils/injection_point.h"
46 : : #include "utils/memdebug.h"
47 : : #include "utils/ps_status.h"
48 : : #include "utils/wait_event.h"
49 : :
50 : :
51 : : /* How many workers should each worker wake up if needed? */
52 : : #define IO_WORKER_WAKEUP_FANOUT 2
53 : :
54 : :
55 : : typedef struct PgAioWorkerSubmissionQueue
56 : : {
57 : : uint32 size;
58 : : uint32 head;
59 : : uint32 tail;
60 : : int sqes[FLEXIBLE_ARRAY_MEMBER];
61 : : } PgAioWorkerSubmissionQueue;
62 : :
63 : : typedef struct PgAioWorkerSlot
64 : : {
65 : : Latch *latch;
66 : : bool in_use;
67 : : } PgAioWorkerSlot;
68 : :
69 : : typedef struct PgAioWorkerControl
70 : : {
71 : : uint64 idle_worker_mask;
72 : : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
73 : : } PgAioWorkerControl;
74 : :
75 : :
76 : : static size_t pgaio_worker_shmem_size(void);
77 : : static void pgaio_worker_shmem_init(bool first_time);
78 : :
79 : : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
80 : : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
81 : :
82 : :
83 : : const IoMethodOps pgaio_worker_ops = {
84 : : .shmem_size = pgaio_worker_shmem_size,
85 : : .shmem_init = pgaio_worker_shmem_init,
86 : :
87 : : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
88 : : .submit = pgaio_worker_submit,
89 : : };
90 : :
91 : :
92 : : /* GUCs */
93 : : int io_workers = 3;
94 : :
95 : :
96 : : static int io_worker_queue_size = 64;
97 : : static int MyIoWorkerId;
98 : : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
99 : : static PgAioWorkerControl *io_worker_control;
100 : :
101 : :
102 : : static size_t
362 andres@anarazel.de 103 :CBC 3273 : pgaio_worker_queue_shmem_size(int *queue_size)
104 : : {
105 : : /* Round size up to next power of two so we can make a mask. */
106 : 3273 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
107 : :
246 tmunro@postgresql.or 108 : 6546 : return offsetof(PgAioWorkerSubmissionQueue, sqes) +
206 peter@eisentraut.org 109 : 3273 : sizeof(int) * *queue_size;
110 : : }
111 : :
112 : : static size_t
362 andres@anarazel.de 113 : 3273 : pgaio_worker_control_shmem_size(void)
114 : : {
246 tmunro@postgresql.or 115 : 3273 : return offsetof(PgAioWorkerControl, workers) +
116 : : sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
117 : : }
118 : :
119 : : static size_t
362 andres@anarazel.de 120 : 2133 : pgaio_worker_shmem_size(void)
121 : : {
122 : : size_t sz;
123 : : int queue_size;
124 : :
125 : 2133 : sz = pgaio_worker_queue_shmem_size(&queue_size);
126 : 2133 : sz = add_size(sz, pgaio_worker_control_shmem_size());
127 : :
128 : 2133 : return sz;
129 : : }
130 : :
131 : : static void
132 : 1140 : pgaio_worker_shmem_init(bool first_time)
133 : : {
134 : : bool found;
135 : : int queue_size;
136 : :
137 : 1140 : io_worker_submission_queue =
138 : 1140 : ShmemInitStruct("AioWorkerSubmissionQueue",
139 : : pgaio_worker_queue_shmem_size(&queue_size),
140 : : &found);
141 [ + - ]: 1140 : if (!found)
142 : : {
143 : 1140 : io_worker_submission_queue->size = queue_size;
144 : 1140 : io_worker_submission_queue->head = 0;
145 : 1140 : io_worker_submission_queue->tail = 0;
146 : : }
147 : :
148 : 1140 : io_worker_control =
149 : 1140 : ShmemInitStruct("AioWorkerControl",
150 : : pgaio_worker_control_shmem_size(),
151 : : &found);
152 [ + - ]: 1140 : if (!found)
153 : : {
154 : 1140 : io_worker_control->idle_worker_mask = 0;
155 [ + + ]: 37620 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
156 : : {
157 : 36480 : io_worker_control->workers[i].latch = NULL;
158 : 36480 : io_worker_control->workers[i].in_use = false;
159 : : }
160 : : }
161 : 1140 : }
162 : :
163 : : static int
246 tmunro@postgresql.or 164 : 644207 : pgaio_worker_choose_idle(void)
165 : : {
166 : : int worker;
167 : :
362 andres@anarazel.de 168 [ + + ]: 644207 : if (io_worker_control->idle_worker_mask == 0)
169 : 24909 : return -1;
170 : :
171 : : /* Find the lowest bit position, and clear it. */
172 : 619298 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
173 : 619298 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
246 tmunro@postgresql.or 174 [ - + ]: 619298 : Assert(io_worker_control->workers[worker].in_use);
175 : :
362 andres@anarazel.de 176 : 619298 : return worker;
177 : : }
178 : :
179 : : static bool
180 : 621060 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
181 : : {
182 : : PgAioWorkerSubmissionQueue *queue;
183 : : uint32 new_head;
184 : :
185 : 621060 : queue = io_worker_submission_queue;
186 : 621060 : new_head = (queue->head + 1) & (queue->size - 1);
187 [ - + ]: 621060 : if (new_head == queue->tail)
188 : : {
362 andres@anarazel.de 189 [ # # ]:UBC 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
190 : : io_worker_submission_queue->size);
191 : 0 : return false; /* full */
192 : : }
193 : :
246 tmunro@postgresql.or 194 :CBC 621060 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
362 andres@anarazel.de 195 : 621060 : queue->head = new_head;
196 : :
197 : 621060 : return true;
198 : : }
199 : :
200 : : static int
201 : 977777 : pgaio_worker_submission_queue_consume(void)
202 : : {
203 : : PgAioWorkerSubmissionQueue *queue;
204 : : int result;
205 : :
206 : 977777 : queue = io_worker_submission_queue;
207 [ + + ]: 977777 : if (queue->tail == queue->head)
206 peter@eisentraut.org 208 : 490754 : return -1; /* empty */
209 : :
246 tmunro@postgresql.or 210 : 487023 : result = queue->sqes[queue->tail];
362 andres@anarazel.de 211 : 487023 : queue->tail = (queue->tail + 1) & (queue->size - 1);
212 : :
213 : 487023 : return result;
214 : : }
215 : :
216 : : static uint32
217 : 972318 : pgaio_worker_submission_queue_depth(void)
218 : : {
219 : : uint32 head;
220 : : uint32 tail;
221 : :
222 : 972318 : head = io_worker_submission_queue->head;
223 : 972318 : tail = io_worker_submission_queue->tail;
224 : :
225 [ + + ]: 972318 : if (tail > head)
226 : 741 : head += io_worker_submission_queue->size;
227 : :
228 [ - + ]: 972318 : Assert(head >= tail);
229 : :
230 : 972318 : return head - tail;
231 : : }
232 : :
233 : : static bool
234 : 1248342 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
235 : : {
236 : : return
237 : 1248342 : !IsUnderPostmaster
238 [ + + ]: 1244661 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
239 [ + + - + ]: 2493003 : || !pgaio_io_can_reopen(ioh);
240 : : }
241 : :
242 : : static void
246 tmunro@postgresql.or 243 : 621901 : pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
244 : : {
4 tomas.vondra@postgre 245 :GNC 621901 : PgAioHandle **synchronous_ios = NULL;
362 andres@anarazel.de 246 :CBC 621901 : int nsync = 0;
247 : 621901 : Latch *wakeup = NULL;
248 : : int worker;
249 : :
246 tmunro@postgresql.or 250 [ - + ]: 621901 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
251 : :
4 tomas.vondra@postgre 252 [ + + ]:GNC 621901 : if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
253 : : {
254 [ + + ]: 1241572 : for (int i = 0; i < num_staged_ios; ++i)
255 : : {
256 [ - + ]: 621060 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
257 [ - + ]: 621060 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
258 : : {
259 : : /*
260 : : * Do the rest synchronously. If the queue is full, give up
261 : : * and do the rest synchronously. We're holding an exclusive
262 : : * lock on the queue so nothing can consume entries.
263 : : */
4 tomas.vondra@postgre 264 :UNC 0 : synchronous_ios = &staged_ios[i];
265 : 0 : nsync = (num_staged_ios - i);
266 : :
267 : 0 : break;
268 : : }
269 : :
4 tomas.vondra@postgre 270 [ + + ]:GNC 621060 : if (wakeup == NULL)
271 : : {
272 : : /* Choose an idle worker to wake up if we haven't already. */
273 : 620532 : worker = pgaio_worker_choose_idle();
274 [ + + ]: 620532 : if (worker >= 0)
275 : 609368 : wakeup = io_worker_control->workers[worker].latch;
276 : :
277 [ - + ]: 620532 : pgaio_debug_io(DEBUG4, staged_ios[i],
278 : : "choosing worker %d",
279 : : worker);
280 : : }
281 : : }
282 : 620512 : LWLockRelease(AioWorkerSubmissionQueueLock);
283 : : }
284 : : else
285 : : {
286 : : /* do everything synchronously, no wakeup needed */
287 : 1389 : synchronous_ios = staged_ios;
288 : 1389 : nsync = num_staged_ios;
289 : : }
290 : :
362 andres@anarazel.de 291 [ + + ]:CBC 621901 : if (wakeup)
292 : 609368 : SetLatch(wakeup);
293 : :
294 : : /* Run whatever is left synchronously. */
295 [ + + ]: 621901 : if (nsync > 0)
296 : : {
362 andres@anarazel.de 297 [ + + ]:GBC 2778 : for (int i = 0; i < nsync; ++i)
298 : : {
299 : 1389 : pgaio_io_perform_synchronously(synchronous_ios[i]);
300 : : }
301 : : }
362 andres@anarazel.de 302 :CBC 621901 : }
303 : :
304 : : static int
305 : 621901 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
306 : : {
307 [ + + ]: 1244350 : for (int i = 0; i < num_staged_ios; i++)
308 : : {
309 : 622449 : PgAioHandle *ioh = staged_ios[i];
310 : :
311 : 622449 : pgaio_io_prepare_submit(ioh);
312 : : }
313 : :
314 : 621901 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
315 : :
316 : 621901 : return num_staged_ios;
317 : : }
318 : :
319 : : /*
320 : : * on_shmem_exit() callback that releases the worker's slot in
321 : : * io_worker_control.
322 : : */
323 : : static void
324 : 1767 : pgaio_worker_die(int code, Datum arg)
325 : : {
326 : 1767 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
327 [ - + ]: 1767 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
328 [ - + ]: 1767 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
329 : :
246 tmunro@postgresql.or 330 : 1767 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
362 andres@anarazel.de 331 : 1767 : io_worker_control->workers[MyIoWorkerId].in_use = false;
332 : 1767 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
333 : 1767 : LWLockRelease(AioWorkerSubmissionQueueLock);
334 : 1767 : }
335 : :
336 : : /*
337 : : * Register the worker in shared memory, assign MyIoWorkerId and register a
338 : : * shutdown callback to release registration.
339 : : */
340 : : static void
341 : 1767 : pgaio_worker_register(void)
342 : : {
343 : 1767 : MyIoWorkerId = -1;
344 : :
345 : : /*
346 : : * XXX: This could do with more fine-grained locking. But it's also not
347 : : * very common for the number of workers to change at the moment...
348 : : */
349 : 1767 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
350 : :
351 [ + - ]: 4019 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
352 : : {
353 [ + + ]: 4019 : if (!io_worker_control->workers[i].in_use)
354 : : {
355 [ - + ]: 1767 : Assert(io_worker_control->workers[i].latch == NULL);
356 : 1767 : io_worker_control->workers[i].in_use = true;
357 : 1767 : MyIoWorkerId = i;
358 : 1767 : break;
359 : : }
360 : : else
361 [ - + ]: 2252 : Assert(io_worker_control->workers[i].latch != NULL);
362 : : }
363 : :
364 [ - + ]: 1767 : if (MyIoWorkerId == -1)
362 andres@anarazel.de 365 [ # # ]:UBC 0 : elog(ERROR, "couldn't find a free worker slot");
366 : :
362 andres@anarazel.de 367 :CBC 1767 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
368 : 1767 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
369 : 1767 : LWLockRelease(AioWorkerSubmissionQueueLock);
370 : :
371 : 1767 : on_shmem_exit(pgaio_worker_die, 0);
372 : 1767 : }
373 : :
374 : : static void
348 melanieplageman@gmai 375 : 1139 : pgaio_worker_error_callback(void *arg)
376 : : {
377 : : ProcNumber owner;
378 : : PGPROC *owner_proc;
379 : : int32 owner_pid;
380 : 1139 : PgAioHandle *ioh = arg;
381 : :
382 [ + + ]: 1139 : if (!ioh)
383 : 9 : return;
384 : :
385 [ - + ]: 1130 : Assert(ioh->owner_procno != MyProcNumber);
386 [ - + ]: 1130 : Assert(MyBackendType == B_IO_WORKER);
387 : :
388 : 1130 : owner = ioh->owner_procno;
389 : 1130 : owner_proc = GetPGProcByNumber(owner);
390 : 1130 : owner_pid = owner_proc->pid;
391 : :
392 : 1130 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
393 : : }
394 : :
395 : : void
362 andres@anarazel.de 396 : 1767 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
397 : : {
398 : : sigjmp_buf local_sigjmp_buf;
399 : 1767 : PgAioHandle *volatile error_ioh = NULL;
348 melanieplageman@gmai 400 : 1767 : ErrorContextCallback errcallback = {0};
362 andres@anarazel.de 401 : 1767 : volatile int error_errno = 0;
402 : : char cmd[128];
403 : :
404 : 1767 : AuxiliaryProcessMainCommon();
405 : :
406 : 1767 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
407 : 1767 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
408 : :
409 : : /*
410 : : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
411 : : * shutdown sequence, similar to checkpointer.
412 : : */
413 : 1767 : pqsignal(SIGTERM, SIG_IGN);
414 : : /* SIGQUIT handler was already set up by InitPostmasterChild */
415 : 1767 : pqsignal(SIGALRM, SIG_IGN);
416 : 1767 : pqsignal(SIGPIPE, SIG_IGN);
417 : 1767 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
418 : 1767 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
419 : :
420 : : /* also registers a shutdown callback to unregister */
421 : 1767 : pgaio_worker_register();
422 : :
358 tmunro@postgresql.or 423 : 1767 : sprintf(cmd, "%d", MyIoWorkerId);
362 andres@anarazel.de 424 : 1767 : set_ps_display(cmd);
425 : :
348 melanieplageman@gmai 426 : 1767 : errcallback.callback = pgaio_worker_error_callback;
427 : 1767 : errcallback.previous = error_context_stack;
428 : 1767 : error_context_stack = &errcallback;
429 : :
430 : : /* see PostgresMain() */
362 andres@anarazel.de 431 [ + + ]: 1767 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
432 : : {
433 : 1 : error_context_stack = NULL;
434 : 1 : HOLD_INTERRUPTS();
435 : :
436 : 1 : EmitErrorReport();
437 : :
438 : : /*
439 : : * In the - very unlikely - case that the IO failed in a way that
440 : : * raises an error we need to mark the IO as failed.
441 : : *
442 : : * Need to do just enough error recovery so that we can mark the IO as
443 : : * failed and then exit (postmaster will start a new worker).
444 : : */
445 : 1 : LWLockReleaseAll();
446 : :
447 [ + - ]: 1 : if (error_ioh != NULL)
448 : : {
449 : : /* should never fail without setting error_errno */
450 [ - + ]: 1 : Assert(error_errno != 0);
451 : :
452 : 1 : errno = error_errno;
453 : :
454 : 1 : START_CRIT_SECTION();
455 : 1 : pgaio_io_process_completion(error_ioh, -error_errno);
456 [ - + ]: 1 : END_CRIT_SECTION();
457 : : }
458 : :
459 : 1 : proc_exit(1);
460 : : }
461 : :
462 : : /* We can now handle ereport(ERROR) */
463 : 1767 : PG_exception_stack = &local_sigjmp_buf;
464 : :
465 : 1767 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
466 : :
467 [ + + ]: 979518 : while (!ShutdownRequestPending)
468 : : {
469 : : uint32 io_index;
470 : : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
471 : 977777 : int nlatches = 0;
472 : 977777 : int nwakeups = 0;
473 : : int worker;
474 : :
475 : : /*
476 : : * Try to get a job to do.
477 : : *
478 : : * The lwlock acquisition also provides the necessary memory barrier
479 : : * to ensure that we don't see an outdated data in the handle.
480 : : */
481 : 977777 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
206 peter@eisentraut.org 482 [ + + ]: 977777 : if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
483 : : {
484 : : /*
485 : : * Nothing to do. Mark self idle.
486 : : *
487 : : * XXX: Invent some kind of back pressure to reduce useless
488 : : * wakeups?
489 : : */
362 andres@anarazel.de 490 : 490754 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
491 : : }
492 : : else
493 : : {
494 : : /* Got one. Clear idle flag. */
495 : 487023 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
496 : :
497 : : /* See if we can wake up some peers. */
498 [ + + ]: 487023 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
499 : : IO_WORKER_WAKEUP_FANOUT);
500 [ + + ]: 496953 : for (int i = 0; i < nwakeups; ++i)
501 : : {
246 tmunro@postgresql.or 502 [ + + ]: 23675 : if ((worker = pgaio_worker_choose_idle()) < 0)
362 andres@anarazel.de 503 : 13745 : break;
504 : 9930 : latches[nlatches++] = io_worker_control->workers[worker].latch;
505 : : }
506 : : }
507 : 977777 : LWLockRelease(AioWorkerSubmissionQueueLock);
508 : :
509 [ + + ]: 987707 : for (int i = 0; i < nlatches; ++i)
510 : 9930 : SetLatch(latches[i]);
511 : :
206 peter@eisentraut.org 512 [ + + ]: 977777 : if (io_index != -1)
513 : : {
362 andres@anarazel.de 514 : 487023 : PgAioHandle *ioh = NULL;
515 : :
516 : 487023 : ioh = &pgaio_ctl->io_handles[io_index];
517 : 487023 : error_ioh = ioh;
348 melanieplageman@gmai 518 : 487023 : errcallback.arg = ioh;
519 : :
362 andres@anarazel.de 520 [ - + ]: 487023 : pgaio_debug_io(DEBUG4, ioh,
521 : : "worker %d processing IO",
522 : : MyIoWorkerId);
523 : :
524 : : /*
525 : : * Prevent interrupts between pgaio_io_reopen() and
526 : : * pgaio_io_perform_synchronously() that otherwise could lead to
527 : : * the FD getting closed in that window.
528 : : */
354 529 : 487023 : HOLD_INTERRUPTS();
530 : :
531 : : /*
532 : : * It's very unlikely, but possible, that reopen fails. E.g. due
533 : : * to memory allocations failing or file permissions changing or
534 : : * such. In that case we need to fail the IO.
535 : : *
536 : : * There's not really a good errno we can report here.
537 : : */
362 538 : 487023 : error_errno = ENOENT;
539 : 487023 : pgaio_io_reopen(ioh);
540 : :
541 : : /*
542 : : * To be able to exercise the reopen-fails path, allow injection
543 : : * points to trigger a failure at this point.
544 : : */
309 michael@paquier.xyz 545 : 487023 : INJECTION_POINT("aio-worker-after-reopen", ioh);
546 : :
362 andres@anarazel.de 547 : 487022 : error_errno = 0;
548 : 487022 : error_ioh = NULL;
549 : :
550 : : /*
551 : : * As part of IO completion the buffer will be marked as NOACCESS,
552 : : * until the buffer is pinned again - which never happens in io
553 : : * workers. Therefore the next time there is IO for the same
554 : : * buffer, the memory will be considered inaccessible. To avoid
555 : : * that, explicitly allow access to the memory before reading data
556 : : * into it.
557 : : */
558 : : #ifdef USE_VALGRIND
559 : : {
560 : : struct iovec *iov;
561 : : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
562 : :
563 : : for (int i = 0; i < iov_length; i++)
564 : : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
565 : : }
566 : : #endif
567 : :
568 : : /*
569 : : * We don't expect this to ever fail with ERROR or FATAL, no need
570 : : * to keep error_ioh set to the IO.
571 : : * pgaio_io_perform_synchronously() contains a critical section to
572 : : * ensure we don't accidentally fail.
573 : : */
574 : 487022 : pgaio_io_perform_synchronously(ioh);
575 : :
354 576 [ - + ]: 487022 : RESUME_INTERRUPTS();
348 melanieplageman@gmai 577 : 487022 : errcallback.arg = NULL;
578 : : }
579 : : else
580 : : {
362 andres@anarazel.de 581 : 490754 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
582 : : WAIT_EVENT_IO_WORKER_MAIN);
583 : 490733 : ResetLatch(MyLatch);
584 : : }
585 : :
586 [ + + ]: 977755 : CHECK_FOR_INTERRUPTS();
587 : :
246 tmunro@postgresql.or 588 [ + + ]: 977751 : if (ConfigReloadPending)
589 : : {
590 : 263 : ConfigReloadPending = false;
591 : 263 : ProcessConfigFile(PGC_SIGHUP);
592 : : }
593 : : }
594 : :
348 melanieplageman@gmai 595 : 1741 : error_context_stack = errcallback.previous;
362 andres@anarazel.de 596 : 1741 : proc_exit(0);
597 : : }
598 : :
599 : : bool
600 : 44750 : pgaio_workers_enabled(void)
601 : : {
602 : 44750 : return io_method == IOMETHOD_WORKER;
603 : : }
|