Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * method_worker.c
4 : : * AIO - perform AIO using worker processes
5 : : *
6 : : * IO workers consume IOs from a shared memory submission queue, run
7 : : * traditional synchronous system calls, and perform the shared completion
8 : : * handling immediately. Client code submits most requests by pushing IOs
9 : : * into the submission queue, and waits (if necessary) using condition
10 : : * variables. Some IOs cannot be performed in another process due to lack of
11 : : * infrastructure for reopening the file, and must processed synchronously by
12 : : * the client code when submitted.
13 : : *
14 : : * So that the submitter can make just one system call when submitting a batch
15 : : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : : * could be improved by using futexes instead of latches to wake N waiters.
17 : : *
18 : : * This method of AIO is available in all builds on all operating systems, and
19 : : * is the default.
20 : : *
21 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 : : * Portions Copyright (c) 1994, Regents of the University of California
23 : : *
24 : : * IDENTIFICATION
25 : : * src/backend/storage/aio/method_worker.c
26 : : *
27 : : *-------------------------------------------------------------------------
28 : : */
29 : :
30 : : #include "postgres.h"
31 : :
32 : : #include "libpq/pqsignal.h"
33 : : #include "miscadmin.h"
34 : : #include "port/pg_bitutils.h"
35 : : #include "postmaster/auxprocess.h"
36 : : #include "postmaster/interrupt.h"
37 : : #include "storage/aio.h"
38 : : #include "storage/aio_internal.h"
39 : : #include "storage/aio_subsys.h"
40 : : #include "storage/io_worker.h"
41 : : #include "storage/ipc.h"
42 : : #include "storage/latch.h"
43 : : #include "storage/proc.h"
44 : : #include "tcop/tcopprot.h"
45 : : #include "utils/injection_point.h"
46 : : #include "utils/memdebug.h"
47 : : #include "utils/ps_status.h"
48 : : #include "utils/wait_event.h"
49 : :
50 : :
51 : : /* How many workers should each worker wake up if needed? */
52 : : #define IO_WORKER_WAKEUP_FANOUT 2
53 : :
54 : :
55 : : typedef struct PgAioWorkerSubmissionQueue
56 : : {
57 : : uint32 size;
58 : : uint32 mask;
59 : : uint32 head;
60 : : uint32 tail;
61 : : int sqes[FLEXIBLE_ARRAY_MEMBER];
62 : : } PgAioWorkerSubmissionQueue;
63 : :
64 : : typedef struct PgAioWorkerSlot
65 : : {
66 : : Latch *latch;
67 : : bool in_use;
68 : : } PgAioWorkerSlot;
69 : :
70 : : typedef struct PgAioWorkerControl
71 : : {
72 : : uint64 idle_worker_mask;
73 : : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
74 : : } PgAioWorkerControl;
75 : :
76 : :
77 : : static size_t pgaio_worker_shmem_size(void);
78 : : static void pgaio_worker_shmem_init(bool first_time);
79 : :
80 : : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
81 : : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
82 : :
83 : :
84 : : const IoMethodOps pgaio_worker_ops = {
85 : : .shmem_size = pgaio_worker_shmem_size,
86 : : .shmem_init = pgaio_worker_shmem_init,
87 : :
88 : : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
89 : : .submit = pgaio_worker_submit,
90 : : };
91 : :
92 : :
93 : : /* GUCs */
94 : : int io_workers = 3;
95 : :
96 : :
97 : : static int io_worker_queue_size = 64;
98 : : static int MyIoWorkerId;
99 : : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
100 : : static PgAioWorkerControl *io_worker_control;
101 : :
102 : :
103 : : static size_t
172 andres@anarazel.de 104 :CBC 2914 : pgaio_worker_queue_shmem_size(int *queue_size)
105 : : {
106 : : /* Round size up to next power of two so we can make a mask. */
107 : 2914 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
108 : :
56 tmunro@postgresql.or 109 : 5828 : return offsetof(PgAioWorkerSubmissionQueue, sqes) +
16 peter@eisentraut.org 110 : 2914 : sizeof(int) * *queue_size;
111 : : }
112 : :
113 : : static size_t
172 andres@anarazel.de 114 : 2914 : pgaio_worker_control_shmem_size(void)
115 : : {
56 tmunro@postgresql.or 116 : 2914 : return offsetof(PgAioWorkerControl, workers) +
117 : : sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
118 : : }
119 : :
120 : : static size_t
172 andres@anarazel.de 121 : 1895 : pgaio_worker_shmem_size(void)
122 : : {
123 : : size_t sz;
124 : : int queue_size;
125 : :
126 : 1895 : sz = pgaio_worker_queue_shmem_size(&queue_size);
127 : 1895 : sz = add_size(sz, pgaio_worker_control_shmem_size());
128 : :
129 : 1895 : return sz;
130 : : }
131 : :
132 : : static void
133 : 1019 : pgaio_worker_shmem_init(bool first_time)
134 : : {
135 : : bool found;
136 : : int queue_size;
137 : :
138 : 1019 : io_worker_submission_queue =
139 : 1019 : ShmemInitStruct("AioWorkerSubmissionQueue",
140 : : pgaio_worker_queue_shmem_size(&queue_size),
141 : : &found);
142 [ + - ]: 1019 : if (!found)
143 : : {
144 : 1019 : io_worker_submission_queue->size = queue_size;
145 : 1019 : io_worker_submission_queue->head = 0;
146 : 1019 : io_worker_submission_queue->tail = 0;
147 : : }
148 : :
149 : 1019 : io_worker_control =
150 : 1019 : ShmemInitStruct("AioWorkerControl",
151 : : pgaio_worker_control_shmem_size(),
152 : : &found);
153 [ + - ]: 1019 : if (!found)
154 : : {
155 : 1019 : io_worker_control->idle_worker_mask = 0;
156 [ + + ]: 33627 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
157 : : {
158 : 32608 : io_worker_control->workers[i].latch = NULL;
159 : 32608 : io_worker_control->workers[i].in_use = false;
160 : : }
161 : : }
162 : 1019 : }
163 : :
164 : : static int
56 tmunro@postgresql.or 165 : 570230 : pgaio_worker_choose_idle(void)
166 : : {
167 : : int worker;
168 : :
172 andres@anarazel.de 169 [ + + ]: 570230 : if (io_worker_control->idle_worker_mask == 0)
170 : 20000 : return -1;
171 : :
172 : : /* Find the lowest bit position, and clear it. */
173 : 550230 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
174 : 550230 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
56 tmunro@postgresql.or 175 [ - + ]: 550230 : Assert(io_worker_control->workers[worker].in_use);
176 : :
172 andres@anarazel.de 177 : 550230 : return worker;
178 : : }
179 : :
180 : : static bool
181 : 551885 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
182 : : {
183 : : PgAioWorkerSubmissionQueue *queue;
184 : : uint32 new_head;
185 : :
186 : 551885 : queue = io_worker_submission_queue;
187 : 551885 : new_head = (queue->head + 1) & (queue->size - 1);
188 [ - + ]: 551885 : if (new_head == queue->tail)
189 : : {
172 andres@anarazel.de 190 [ # # ]:UBC 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
191 : : io_worker_submission_queue->size);
192 : 0 : return false; /* full */
193 : : }
194 : :
56 tmunro@postgresql.or 195 :CBC 551885 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
172 andres@anarazel.de 196 : 551885 : queue->head = new_head;
197 : :
198 : 551885 : return true;
199 : : }
200 : :
201 : : static int
202 : 899114 : pgaio_worker_submission_queue_consume(void)
203 : : {
204 : : PgAioWorkerSubmissionQueue *queue;
205 : : int result;
206 : :
207 : 899114 : queue = io_worker_submission_queue;
208 [ + + ]: 899114 : if (queue->tail == queue->head)
16 peter@eisentraut.org 209 : 450712 : return -1; /* empty */
210 : :
56 tmunro@postgresql.or 211 : 448402 : result = queue->sqes[queue->tail];
172 andres@anarazel.de 212 : 448402 : queue->tail = (queue->tail + 1) & (queue->size - 1);
213 : :
214 : 448402 : return result;
215 : : }
216 : :
217 : : static uint32
218 : 895310 : pgaio_worker_submission_queue_depth(void)
219 : : {
220 : : uint32 head;
221 : : uint32 tail;
222 : :
223 : 895310 : head = io_worker_submission_queue->head;
224 : 895310 : tail = io_worker_submission_queue->tail;
225 : :
226 [ + + ]: 895310 : if (tail > head)
227 : 548 : head += io_worker_submission_queue->size;
228 : :
229 [ - + ]: 895310 : Assert(head >= tail);
230 : :
231 : 895310 : return head - tail;
232 : : }
233 : :
234 : : static bool
235 : 1108347 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
236 : : {
237 : : return
238 : 1108347 : !IsUnderPostmaster
239 [ + + ]: 1104922 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
240 [ + + - + ]: 2213269 : || !pgaio_io_can_reopen(ioh);
241 : : }
242 : :
243 : : static void
56 tmunro@postgresql.or 244 : 551338 : pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
245 : : {
246 : : PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
172 andres@anarazel.de 247 : 551338 : int nsync = 0;
248 : 551338 : Latch *wakeup = NULL;
249 : : int worker;
250 : :
56 tmunro@postgresql.or 251 [ - + ]: 551338 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
252 : :
172 andres@anarazel.de 253 : 551338 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
56 tmunro@postgresql.or 254 [ + + ]: 1103223 : for (int i = 0; i < num_staged_ios; ++i)
255 : : {
256 [ - + ]: 551885 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
257 [ - + ]: 551885 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
258 : : {
259 : : /*
260 : : * We'll do it synchronously, but only after we've sent as many as
261 : : * we can to workers, to maximize concurrency.
262 : : */
56 tmunro@postgresql.or 263 :UBC 0 : synchronous_ios[nsync++] = staged_ios[i];
172 andres@anarazel.de 264 : 0 : continue;
265 : : }
266 : :
172 andres@anarazel.de 267 [ + + ]:CBC 551885 : if (wakeup == NULL)
268 : : {
269 : : /* Choose an idle worker to wake up if we haven't already. */
56 tmunro@postgresql.or 270 : 551348 : worker = pgaio_worker_choose_idle();
172 andres@anarazel.de 271 [ + + ]: 551348 : if (worker >= 0)
272 : 542245 : wakeup = io_worker_control->workers[worker].latch;
273 : :
56 tmunro@postgresql.or 274 [ - + ]: 551348 : pgaio_debug_io(DEBUG4, staged_ios[i],
275 : : "choosing worker %d",
276 : : worker);
277 : : }
278 : : }
172 andres@anarazel.de 279 : 551338 : LWLockRelease(AioWorkerSubmissionQueueLock);
280 : :
281 [ + + ]: 551338 : if (wakeup)
282 : 542245 : SetLatch(wakeup);
283 : :
284 : : /* Run whatever is left synchronously. */
285 [ - + ]: 551338 : if (nsync > 0)
286 : : {
172 andres@anarazel.de 287 [ # # ]:UBC 0 : for (int i = 0; i < nsync; ++i)
288 : : {
289 : 0 : pgaio_io_perform_synchronously(synchronous_ios[i]);
290 : : }
291 : : }
172 andres@anarazel.de 292 :CBC 551338 : }
293 : :
294 : : static int
295 : 551338 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
296 : : {
297 [ + + ]: 1103223 : for (int i = 0; i < num_staged_ios; i++)
298 : : {
299 : 551885 : PgAioHandle *ioh = staged_ios[i];
300 : :
301 : 551885 : pgaio_io_prepare_submit(ioh);
302 : : }
303 : :
304 : 551338 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
305 : :
306 : 551338 : return num_staged_ios;
307 : : }
308 : :
309 : : /*
310 : : * on_shmem_exit() callback that releases the worker's slot in
311 : : * io_worker_control.
312 : : */
313 : : static void
314 : 1502 : pgaio_worker_die(int code, Datum arg)
315 : : {
316 : 1502 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
317 [ - + ]: 1502 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
318 [ - + ]: 1502 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
319 : :
56 tmunro@postgresql.or 320 : 1502 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
172 andres@anarazel.de 321 : 1502 : io_worker_control->workers[MyIoWorkerId].in_use = false;
322 : 1502 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
323 : 1502 : LWLockRelease(AioWorkerSubmissionQueueLock);
324 : 1502 : }
325 : :
326 : : /*
327 : : * Register the worker in shared memory, assign MyIoWorkerId and register a
328 : : * shutdown callback to release registration.
329 : : */
330 : : static void
331 : 1502 : pgaio_worker_register(void)
332 : : {
333 : 1502 : MyIoWorkerId = -1;
334 : :
335 : : /*
336 : : * XXX: This could do with more fine-grained locking. But it's also not
337 : : * very common for the number of workers to change at the moment...
338 : : */
339 : 1502 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
340 : :
341 [ + - ]: 3514 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
342 : : {
343 [ + + ]: 3514 : if (!io_worker_control->workers[i].in_use)
344 : : {
345 [ - + ]: 1502 : Assert(io_worker_control->workers[i].latch == NULL);
346 : 1502 : io_worker_control->workers[i].in_use = true;
347 : 1502 : MyIoWorkerId = i;
348 : 1502 : break;
349 : : }
350 : : else
351 [ - + ]: 2012 : Assert(io_worker_control->workers[i].latch != NULL);
352 : : }
353 : :
354 [ - + ]: 1502 : if (MyIoWorkerId == -1)
172 andres@anarazel.de 355 [ # # ]:UBC 0 : elog(ERROR, "couldn't find a free worker slot");
356 : :
172 andres@anarazel.de 357 :CBC 1502 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
358 : 1502 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
359 : 1502 : LWLockRelease(AioWorkerSubmissionQueueLock);
360 : :
361 : 1502 : on_shmem_exit(pgaio_worker_die, 0);
362 : 1502 : }
363 : :
364 : : static void
158 melanieplageman@gmai 365 : 403 : pgaio_worker_error_callback(void *arg)
366 : : {
367 : : ProcNumber owner;
368 : : PGPROC *owner_proc;
369 : : int32 owner_pid;
370 : 403 : PgAioHandle *ioh = arg;
371 : :
372 [ + + ]: 403 : if (!ioh)
373 : 9 : return;
374 : :
375 [ - + ]: 394 : Assert(ioh->owner_procno != MyProcNumber);
376 [ - + ]: 394 : Assert(MyBackendType == B_IO_WORKER);
377 : :
378 : 394 : owner = ioh->owner_procno;
379 : 394 : owner_proc = GetPGProcByNumber(owner);
380 : 394 : owner_pid = owner_proc->pid;
381 : :
382 : 394 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
383 : : }
384 : :
385 : : void
172 andres@anarazel.de 386 : 1502 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
387 : : {
388 : : sigjmp_buf local_sigjmp_buf;
389 : 1502 : PgAioHandle *volatile error_ioh = NULL;
158 melanieplageman@gmai 390 : 1502 : ErrorContextCallback errcallback = {0};
172 andres@anarazel.de 391 : 1502 : volatile int error_errno = 0;
392 : : char cmd[128];
393 : :
394 : 1502 : MyBackendType = B_IO_WORKER;
395 : 1502 : AuxiliaryProcessMainCommon();
396 : :
397 : 1502 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
398 : 1502 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
399 : :
400 : : /*
401 : : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
402 : : * shutdown sequence, similar to checkpointer.
403 : : */
404 : 1502 : pqsignal(SIGTERM, SIG_IGN);
405 : : /* SIGQUIT handler was already set up by InitPostmasterChild */
406 : 1502 : pqsignal(SIGALRM, SIG_IGN);
407 : 1502 : pqsignal(SIGPIPE, SIG_IGN);
408 : 1502 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
409 : 1502 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
410 : :
411 : : /* also registers a shutdown callback to unregister */
412 : 1502 : pgaio_worker_register();
413 : :
168 tmunro@postgresql.or 414 : 1502 : sprintf(cmd, "%d", MyIoWorkerId);
172 andres@anarazel.de 415 : 1502 : set_ps_display(cmd);
416 : :
158 melanieplageman@gmai 417 : 1502 : errcallback.callback = pgaio_worker_error_callback;
418 : 1502 : errcallback.previous = error_context_stack;
419 : 1502 : error_context_stack = &errcallback;
420 : :
421 : : /* see PostgresMain() */
172 andres@anarazel.de 422 [ - + ]: 1502 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
423 : : {
172 andres@anarazel.de 424 :UBC 0 : error_context_stack = NULL;
425 : 0 : HOLD_INTERRUPTS();
426 : :
427 : 0 : EmitErrorReport();
428 : :
429 : : /*
430 : : * In the - very unlikely - case that the IO failed in a way that
431 : : * raises an error we need to mark the IO as failed.
432 : : *
433 : : * Need to do just enough error recovery so that we can mark the IO as
434 : : * failed and then exit (postmaster will start a new worker).
435 : : */
436 : 0 : LWLockReleaseAll();
437 : :
438 [ # # ]: 0 : if (error_ioh != NULL)
439 : : {
440 : : /* should never fail without setting error_errno */
441 [ # # ]: 0 : Assert(error_errno != 0);
442 : :
443 : 0 : errno = error_errno;
444 : :
445 : 0 : START_CRIT_SECTION();
446 : 0 : pgaio_io_process_completion(error_ioh, -error_errno);
447 [ # # ]: 0 : END_CRIT_SECTION();
448 : : }
449 : :
450 : 0 : proc_exit(1);
451 : : }
452 : :
453 : : /* We can now handle ereport(ERROR) */
172 andres@anarazel.de 454 :CBC 1502 : PG_exception_stack = &local_sigjmp_buf;
455 : :
456 : 1502 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
457 : :
458 [ + + ]: 900603 : while (!ShutdownRequestPending)
459 : : {
460 : : uint32 io_index;
461 : : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
462 : 899114 : int nlatches = 0;
463 : 899114 : int nwakeups = 0;
464 : : int worker;
465 : :
466 : : /*
467 : : * Try to get a job to do.
468 : : *
469 : : * The lwlock acquisition also provides the necessary memory barrier
470 : : * to ensure that we don't see an outdated data in the handle.
471 : : */
472 : 899114 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
16 peter@eisentraut.org 473 [ + + ]: 899114 : if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
474 : : {
475 : : /*
476 : : * Nothing to do. Mark self idle.
477 : : *
478 : : * XXX: Invent some kind of back pressure to reduce useless
479 : : * wakeups?
480 : : */
172 andres@anarazel.de 481 : 450712 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
482 : : }
483 : : else
484 : : {
485 : : /* Got one. Clear idle flag. */
486 : 448402 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
487 : :
488 : : /* See if we can wake up some peers. */
489 [ + + ]: 448402 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
490 : : IO_WORKER_WAKEUP_FANOUT);
491 [ + + ]: 456387 : for (int i = 0; i < nwakeups; ++i)
492 : : {
56 tmunro@postgresql.or 493 [ + + ]: 18882 : if ((worker = pgaio_worker_choose_idle()) < 0)
172 andres@anarazel.de 494 : 10897 : break;
495 : 7985 : latches[nlatches++] = io_worker_control->workers[worker].latch;
496 : : }
497 : : }
498 : 899114 : LWLockRelease(AioWorkerSubmissionQueueLock);
499 : :
500 [ + + ]: 907099 : for (int i = 0; i < nlatches; ++i)
501 : 7985 : SetLatch(latches[i]);
502 : :
16 peter@eisentraut.org 503 [ + + ]: 899114 : if (io_index != -1)
504 : : {
172 andres@anarazel.de 505 : 448402 : PgAioHandle *ioh = NULL;
506 : :
507 : 448402 : ioh = &pgaio_ctl->io_handles[io_index];
508 : 448402 : error_ioh = ioh;
158 melanieplageman@gmai 509 : 448402 : errcallback.arg = ioh;
510 : :
172 andres@anarazel.de 511 [ - + ]: 448402 : pgaio_debug_io(DEBUG4, ioh,
512 : : "worker %d processing IO",
513 : : MyIoWorkerId);
514 : :
515 : : /*
516 : : * Prevent interrupts between pgaio_io_reopen() and
517 : : * pgaio_io_perform_synchronously() that otherwise could lead to
518 : : * the FD getting closed in that window.
519 : : */
164 520 : 448402 : HOLD_INTERRUPTS();
521 : :
522 : : /*
523 : : * It's very unlikely, but possible, that reopen fails. E.g. due
524 : : * to memory allocations failing or file permissions changing or
525 : : * such. In that case we need to fail the IO.
526 : : *
527 : : * There's not really a good errno we can report here.
528 : : */
172 529 : 448402 : error_errno = ENOENT;
530 : 448402 : pgaio_io_reopen(ioh);
531 : :
532 : : /*
533 : : * To be able to exercise the reopen-fails path, allow injection
534 : : * points to trigger a failure at this point.
535 : : */
536 : : INJECTION_POINT("aio-worker-after-reopen", ioh);
537 : :
538 : 448402 : error_errno = 0;
539 : 448402 : error_ioh = NULL;
540 : :
541 : : /*
542 : : * As part of IO completion the buffer will be marked as NOACCESS,
543 : : * until the buffer is pinned again - which never happens in io
544 : : * workers. Therefore the next time there is IO for the same
545 : : * buffer, the memory will be considered inaccessible. To avoid
546 : : * that, explicitly allow access to the memory before reading data
547 : : * into it.
548 : : */
549 : : #ifdef USE_VALGRIND
550 : : {
551 : : struct iovec *iov;
552 : : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
553 : :
554 : : for (int i = 0; i < iov_length; i++)
555 : : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
556 : : }
557 : : #endif
558 : :
559 : : /*
560 : : * We don't expect this to ever fail with ERROR or FATAL, no need
561 : : * to keep error_ioh set to the IO.
562 : : * pgaio_io_perform_synchronously() contains a critical section to
563 : : * ensure we don't accidentally fail.
564 : : */
565 : 448402 : pgaio_io_perform_synchronously(ioh);
566 : :
164 567 [ - + ]: 448402 : RESUME_INTERRUPTS();
158 melanieplageman@gmai 568 : 448402 : errcallback.arg = NULL;
569 : : }
570 : : else
571 : : {
172 andres@anarazel.de 572 : 450712 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
573 : : WAIT_EVENT_IO_WORKER_MAIN);
574 : 450703 : ResetLatch(MyLatch);
575 : : }
576 : :
577 [ + + ]: 899105 : CHECK_FOR_INTERRUPTS();
578 : :
56 tmunro@postgresql.or 579 [ + + ]: 899101 : if (ConfigReloadPending)
580 : : {
581 : 221 : ConfigReloadPending = false;
582 : 221 : ProcessConfigFile(PGC_SIGHUP);
583 : : }
584 : : }
585 : :
158 melanieplageman@gmai 586 : 1489 : error_context_stack = errcallback.previous;
172 andres@anarazel.de 587 : 1489 : proc_exit(0);
588 : : }
589 : :
590 : : bool
591 : 39331 : pgaio_workers_enabled(void)
592 : : {
593 : 39331 : return io_method == IOMETHOD_WORKER;
594 : : }
|