LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - method_worker.c (source / functions) Coverage Total Hit UNC UBC GBC GIC GNC CBC ECB DUB DCB
Current: bed3ffbf9d952be6c7d739d068cdce44c046dfb7 vs 574581b50ac9c63dd9e4abebb731a3b67e5b50f6 Lines: 96.6 % 326 315 9 2 2 1 207 105 1 3 83
Current Date: 2026-05-05 10:23:31 +0900 Functions: 100.0 % 34 34 31 3 7
Baseline: lcov-20260505-025707-baseline Branches: 71.4 % 182 130 39 13 3 93 34 14 36
Baseline Date: 2026-05-05 10:27:06 +0900 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 97.1 % 205 199 6 1 198
(30,360] days: 85.7 % 21 18 3 8 10
(360..) days: 98.0 % 100 98 2 2 1 95 1
Function coverage date bins:
(7,30] days: 100.0 % 25 25 25
(360..) days: 100.0 % 9 9 6 3
Branch coverage date bins:
(7,30] days: 70.2 % 124 87 37 87
(30,360] days: 81.2 % 16 13 2 1 6 7
(360..) days: 71.4 % 42 30 12 3 27

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * method_worker.c
                                  4                 :                :  *    AIO - perform AIO using worker processes
                                  5                 :                :  *
                                  6                 :                :  * IO workers consume IOs from a shared memory submission queue, run
                                  7                 :                :  * traditional synchronous system calls, and perform the shared completion
                                  8                 :                :  * handling immediately.  Client code submits most requests by pushing IOs
                                  9                 :                :  * into the submission queue, and waits (if necessary) using condition
                                 10                 :                :  * variables.  Some IOs cannot be performed in another process due to lack of
                                 11                 :                :  * infrastructure for reopening the file, and must processed synchronously by
                                 12                 :                :  * the client code when submitted.
                                 13                 :                :  *
                                 14                 :                :  * The pool of workers tries to stabilize at a size that can handle recently
                                 15                 :                :  * seen variation in demand, within the configured limits.
                                 16                 :                :  *
                                 17                 :                :  * This method of AIO is available in all builds on all operating systems, and
                                 18                 :                :  * is the default.
                                 19                 :                :  *
                                 20                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                 21                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 22                 :                :  *
                                 23                 :                :  * IDENTIFICATION
                                 24                 :                :  *    src/backend/storage/aio/method_worker.c
                                 25                 :                :  *
                                 26                 :                :  *-------------------------------------------------------------------------
                                 27                 :                :  */
                                 28                 :                : 
                                 29                 :                : #include "postgres.h"
                                 30                 :                : 
                                 31                 :                : #include <limits.h>
                                 32                 :                : 
                                 33                 :                : #include "libpq/pqsignal.h"
                                 34                 :                : #include "miscadmin.h"
                                 35                 :                : #include "port/pg_bitutils.h"
                                 36                 :                : #include "postmaster/auxprocess.h"
                                 37                 :                : #include "postmaster/interrupt.h"
                                 38                 :                : #include "storage/aio.h"
                                 39                 :                : #include "storage/aio_internal.h"
                                 40                 :                : #include "storage/aio_subsys.h"
                                 41                 :                : #include "storage/io_worker.h"
                                 42                 :                : #include "storage/ipc.h"
                                 43                 :                : #include "storage/latch.h"
                                 44                 :                : #include "storage/lwlock.h"
                                 45                 :                : #include "storage/pmsignal.h"
                                 46                 :                : #include "storage/proc.h"
                                 47                 :                : #include "storage/shmem.h"
                                 48                 :                : #include "tcop/tcopprot.h"
                                 49                 :                : #include "utils/injection_point.h"
                                 50                 :                : #include "utils/memdebug.h"
                                 51                 :                : #include "utils/ps_status.h"
                                 52                 :                : #include "utils/wait_event.h"
                                 53                 :                : 
                                 54                 :                : /*
                                 55                 :                :  * Saturation for counters used to estimate wakeup:IO ratio.
                                 56                 :                :  *
                                 57                 :                :  * We maintain hist_wakeups for wakeups received and hist_ios for IOs
                                 58                 :                :  * processed by each worker.  When either counter reaches this saturation
                                 59                 :                :  * value, we divide both by two.  The result is an exponentially decaying
                                 60                 :                :  * ratio of wakeups to IOs, with a very short memory.
                                 61                 :                :  *
                                 62                 :                :  * If a worker is itself experiencing useless wakeups, it assumes that
                                 63                 :                :  * higher-numbered workers would experience even more, so it should end the
                                 64                 :                :  * chain.
                                 65                 :                :  */
                                 66                 :                : #define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
                                 67                 :                : 
                                 68                 :                : /* Debugging support: show current IO and wakeups:ios statistics in ps. */
                                 69                 :                : /* #define PGAIO_WORKER_SHOW_PS_INFO */
                                 70                 :                : 
                                 71                 :                : typedef struct PgAioWorkerSubmissionQueue
                                 72                 :                : {
                                 73                 :                :     uint32      size;
                                 74                 :                :     uint32      head;
                                 75                 :                :     uint32      tail;
                                 76                 :                :     int         sqes[FLEXIBLE_ARRAY_MEMBER];
                                 77                 :                : } PgAioWorkerSubmissionQueue;
                                 78                 :                : 
                                 79                 :                : typedef struct PgAioWorkerSlot
                                 80                 :                : {
                                 81                 :                :     ProcNumber  proc_number;
                                 82                 :                : } PgAioWorkerSlot;
                                 83                 :                : 
                                 84                 :                : /*
                                 85                 :                :  * Sets of worker IDs are held in a simple bitmap, accessed through functions
                                 86                 :                :  * that provide a more readable abstraction.  If we wanted to support more
                                 87                 :                :  * workers than that, the contention on the single queue would surely get too
                                 88                 :                :  * high, so we might want to consider multiple pools instead of widening this.
                                 89                 :                :  */
                                 90                 :                : typedef uint64 PgAioWorkerSet;
                                 91                 :                : 
                                 92                 :                : #define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
                                 93                 :                : 
                                 94                 :                : static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
                                 95                 :                : 
                                 96                 :                : typedef struct PgAioWorkerControl
                                 97                 :                : {
                                 98                 :                :     /* Seen by postmaster */
                                 99                 :                :     bool        grow;
                                100                 :                :     bool        grow_signal_sent;
                                101                 :                : 
                                102                 :                :     /* Protected by AioWorkerSubmissionQueueLock. */
                                103                 :                :     PgAioWorkerSet idle_workerset;
                                104                 :                : 
                                105                 :                :     /* Protected by AioWorkerControlLock. */
                                106                 :                :     PgAioWorkerSet workerset;
                                107                 :                :     int         nworkers;
                                108                 :                : 
                                109                 :                :     /* Protected by AioWorkerControlLock. */
                                110                 :                :     PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
                                111                 :                : } PgAioWorkerControl;
                                112                 :                : 
                                113                 :                : 
                                114                 :                : static void pgaio_worker_shmem_request(void *arg);
                                115                 :                : static void pgaio_worker_shmem_init(void *arg);
                                116                 :                : 
                                117                 :                : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
                                118                 :                : static int  pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
                                119                 :                : 
                                120                 :                : 
                                121                 :                : const IoMethodOps pgaio_worker_ops = {
                                122                 :                :     .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
                                123                 :                :     .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
                                124                 :                : 
                                125                 :                :     .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
                                126                 :                :     .submit = pgaio_worker_submit,
                                127                 :                : };
                                128                 :                : 
                                129                 :                : 
                                130                 :                : /* GUCs */
                                131                 :                : int         io_min_workers = 2;
                                132                 :                : int         io_max_workers = 8;
                                133                 :                : int         io_worker_idle_timeout = 60000;
                                134                 :                : int         io_worker_launch_interval = 100;
                                135                 :                : 
                                136                 :                : 
                                137                 :                : static int  io_worker_queue_size = 64;
                                138                 :                : static int  MyIoWorkerId = -1;
                                139                 :                : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
                                140                 :                : static PgAioWorkerControl *io_worker_control;
                                141                 :                : 
                                142                 :                : 
                                143                 :                : static void
   27 tmunro@postgresql.or      144                 :GNC        2454 : pgaio_workerset_initialize(PgAioWorkerSet *set)
                                145                 :                : {
                                146                 :           2454 :     *set = 0;
                                147                 :           2454 : }
                                148                 :                : 
                                149                 :                : static bool
                                150                 :        1365038 : pgaio_workerset_is_empty(PgAioWorkerSet *set)
                                151                 :                : {
                                152                 :        1365038 :     return *set == 0;
                                153                 :                : }
                                154                 :                : 
                                155                 :                : static PgAioWorkerSet
                                156                 :        1733991 : pgaio_workerset_singleton(int worker)
                                157                 :                : {
                                158   [ +  -  -  + ]:        1733991 :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
                                159                 :        1733991 :     return UINT64_C(1) << worker;
                                160                 :                : }
                                161                 :                : 
                                162                 :                : static void
                                163                 :           1312 : pgaio_workerset_all(PgAioWorkerSet *set)
                                164                 :                : {
                                165                 :           1312 :     *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
   27 tmunro@postgresql.or      166                 :GIC        1312 : }
                                167                 :                : 
                                168                 :                : static void
   27 tmunro@postgresql.or      169                 :GNC        1312 : pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
                                170                 :                : {
                                171                 :           1312 :     *set1 &= ~*set2;
                                172                 :           1312 : }
                                173                 :                : 
                                174                 :                : static void
                                175                 :         531368 : pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
                                176                 :                : {
                                177   [ +  -  -  + ]:         531368 :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
                                178                 :         531368 :     *set |= pgaio_workerset_singleton(worker);
                                179                 :         531368 : }
                                180                 :                : 
                                181                 :                : static void
                                182                 :        1199999 : pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
                                183                 :                : {
                                184   [ +  -  -  + ]:        1199999 :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
                                185                 :        1199999 :     *set &= ~pgaio_workerset_singleton(worker);
                                186                 :        1199999 : }
                                187                 :                : 
                                188                 :                : static void
                                189                 :          13241 : pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
                                190                 :                : {
                                191   [ +  -  -  + ]:          13241 :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
                                192                 :          13241 :     *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
                                193                 :          13241 : }
                                194                 :                : 
                                195                 :                : static int
                                196                 :          12031 : pgaio_workerset_get_highest(PgAioWorkerSet *set)
                                197                 :                : {
                                198         [ -  + ]:          12031 :     Assert(!pgaio_workerset_is_empty(set));
                                199                 :          12031 :     return pg_leftmost_one_pos64(*set);
                                200                 :                : }
                                201                 :                : 
                                202                 :                : static int
                                203                 :         662328 : pgaio_workerset_get_lowest(PgAioWorkerSet *set)
                                204                 :                : {
                                205         [ -  + ]:         662328 :     Assert(!pgaio_workerset_is_empty(set));
                                206                 :         662328 :     return pg_rightmost_one_pos64(*set);
                                207                 :                : }
                                208                 :                : 
                                209                 :                : static int
                                210                 :           2556 : pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
                                211                 :                : {
                                212                 :           2556 :     int         worker = pgaio_workerset_get_lowest(set);
                                213                 :                : 
                                214                 :           2556 :     pgaio_workerset_remove(set, worker);
                                215                 :           2556 :     return worker;
                                216                 :                : }
                                217                 :                : 
                                218                 :                : #ifdef USE_ASSERT_CHECKING
                                219                 :                : static bool
                                220                 :           2624 : pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
                                221                 :                : {
                                222   [ +  -  -  + ]:           2624 :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
                                223                 :           2624 :     return (*set & pgaio_workerset_singleton(worker)) != 0;
                                224                 :                : }
                                225                 :                : 
                                226                 :                : static int
                                227                 :           2624 : pgaio_workerset_count(PgAioWorkerSet *set)
                                228                 :                : {
                                229                 :           2624 :     return pg_popcount64(*set);
                                230                 :                : }
                                231                 :                : #endif
                                232                 :                : 
                                233                 :                : static void
   29 heikki.linnakangas@i      234                 :           1230 : pgaio_worker_shmem_request(void *arg)
                                235                 :                : {
                                236                 :                :     size_t      size;
                                237                 :                :     int         queue_size;
                                238                 :                : 
                                239                 :                :     /* Round size up to next power of two so we can make a mask. */
                                240                 :           1230 :     queue_size = pg_nextpower2_32(io_worker_queue_size);
                                241                 :                : 
                                242                 :           1230 :     size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
                                243                 :           1230 :     ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
                                244                 :                :                        .size = size,
                                245                 :                :                        .ptr = (void **) &io_worker_submission_queue,
                                246                 :                :         );
                                247                 :                : 
                                248                 :           1230 :     size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
                                249                 :           1230 :     ShmemRequestStruct(.name = "AioWorkerControl",
                                250                 :                :                        .size = size,
                                251                 :                :                        .ptr = (void **) &io_worker_control,
                                252                 :                :         );
  413 andres@anarazel.de        253                 :           1230 : }
                                254                 :                : 
                                255                 :                : static void
   29 heikki.linnakangas@i      256                 :           1227 : pgaio_worker_shmem_init(void *arg)
                                257                 :                : {
                                258                 :                :     int         queue_size;
                                259                 :                : 
                                260                 :                :     /* Round size up like in pgaio_worker_shmem_request() */
                                261                 :           1227 :     queue_size = pg_nextpower2_32(io_worker_queue_size);
                                262                 :                : 
                                263                 :           1227 :     io_worker_submission_queue->size = queue_size;
                                264                 :           1227 :     io_worker_submission_queue->head = 0;
                                265                 :           1227 :     io_worker_submission_queue->tail = 0;
   27 tmunro@postgresql.or      266                 :           1227 :     io_worker_control->grow = false;
                                267                 :           1227 :     pgaio_workerset_initialize(&io_worker_control->workerset);
                                268                 :           1227 :     pgaio_workerset_initialize(&io_worker_control->idle_workerset);
                                269                 :                : 
   29 heikki.linnakangas@i      270         [ +  + ]:          40491 :     for (int i = 0; i < MAX_IO_WORKERS; ++i)
   27 tmunro@postgresql.or      271                 :          39264 :         io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
                                272                 :           1227 : }
                                273                 :                : 
                                274                 :                : /*
                                275                 :                :  * Tell postmaster that we think a new worker is needed.
                                276                 :                :  */
                                277                 :                : static void
                                278                 :            250 : pgaio_worker_request_grow(void)
                                279                 :                : {
                                280                 :                :     /*
                                281                 :                :      * Suppress useless signaling if we already know that we're at the
                                282                 :                :      * maximum.  This uses an unlocked read of nworkers, but that's OK for
                                283                 :                :      * this heuristic purpose.
                                284                 :                :      */
                                285         [ -  + ]:            250 :     if (io_worker_control->nworkers >= io_max_workers)
   27 tmunro@postgresql.or      286                 :UNC           0 :         return;
                                287                 :                : 
                                288                 :                :     /* Already requested? */
   27 tmunro@postgresql.or      289         [ +  + ]:GNC         250 :     if (io_worker_control->grow)
                                290                 :            173 :         return;
                                291                 :                : 
                                292                 :             77 :     io_worker_control->grow = true;
                                293                 :             77 :     pg_memory_barrier();
                                294                 :                : 
                                295                 :                :     /*
                                296                 :                :      * If the postmaster has already been signaled, don't do it again until
                                297                 :                :      * the postmaster clears this flag.  There is no point in repeated signals
                                298                 :                :      * if grow is being set and cleared repeatedly while the postmaster is
                                299                 :                :      * waiting for io_worker_launch_interval, which it applies even to
                                300                 :                :      * canceled requests.
                                301                 :                :      */
                                302         [ +  + ]:             77 :     if (io_worker_control->grow_signal_sent)
                                303                 :             35 :         return;
                                304                 :                : 
                                305                 :             42 :     io_worker_control->grow_signal_sent = true;
                                306                 :             42 :     pg_memory_barrier();
                                307                 :             42 :     SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
                                308                 :                : }
                                309                 :                : 
                                310                 :                : /*
                                311                 :                :  * Cancel any request for a new worker, after observing an empty queue.
                                312                 :                :  */
                                313                 :                : static void
                                314                 :         530056 : pgaio_worker_cancel_grow(void)
                                315                 :                : {
                                316         [ +  + ]:         530056 :     if (!io_worker_control->grow)
                                317                 :         529979 :         return;
                                318                 :                : 
                                319                 :             77 :     io_worker_control->grow = false;
                                320                 :             77 :     pg_memory_barrier();
                                321                 :                : }
                                322                 :                : 
                                323                 :                : /*
                                324                 :                :  * Called by the postmaster to check if a new worker has been requested (but
                                325                 :                :  * possibly canceled since).
                                326                 :                :  */
                                327                 :                : bool
                                328                 :          82340 : pgaio_worker_pm_test_grow_signal_sent(void)
                                329                 :                : {
                                330                 :          82340 :     pg_memory_barrier();
                                331   [ +  -  +  + ]:          82340 :     return io_worker_control && io_worker_control->grow_signal_sent;
                                332                 :                : }
                                333                 :                : 
                                334                 :                : /*
                                335                 :                :  * Called by the postmaster to check if a new worker has been requested and
                                336                 :                :  * not canceled since.
                                337                 :                :  */
                                338                 :                : bool
                                339                 :             59 : pgaio_worker_pm_test_grow(void)
                                340                 :                : {
                                341                 :             59 :     pg_memory_barrier();
                                342   [ +  -  +  + ]:             59 :     return io_worker_control && io_worker_control->grow;
                                343                 :                : }
                                344                 :                : 
                                345                 :                : /*
                                346                 :                :  * Called by the postmaster to clear the request for a new worker.
                                347                 :                :  */
                                348                 :                : void
                                349                 :             48 : pgaio_worker_pm_clear_grow_signal_sent(void)
                                350                 :                : {
                                351         [ -  + ]:             48 :     if (!io_worker_control)
   27 tmunro@postgresql.or      352                 :UNC           0 :         return;
                                353                 :                : 
   27 tmunro@postgresql.or      354                 :GNC          48 :     io_worker_control->grow = false;
                                355                 :             48 :     io_worker_control->grow_signal_sent = false;
                                356                 :             48 :     pg_memory_barrier();
  413 andres@anarazel.de        357                 :ECB      (1060) : }
                                358                 :                : 
                                359                 :                : static int
   27 tmunro@postgresql.or      360                 :GNC      684187 : pgaio_worker_choose_idle(int only_workers_above)
                                361                 :                : {
                                362                 :                :     PgAioWorkerSet workerset;
                                363                 :                :     int         worker;
                                364                 :                : 
                                365         [ -  + ]:         684187 :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
                                366                 :                : 
                                367                 :         684187 :     workerset = io_worker_control->idle_workerset;
                                368         [ +  + ]:         684187 :     if (only_workers_above >= 0)
                                369                 :          13241 :         pgaio_workerset_remove_lte(&workerset, only_workers_above);
                                370         [ +  + ]:         684187 :     if (pgaio_workerset_is_empty(&workerset))
  413 andres@anarazel.de        371                 :CBC       25727 :         return -1;
                                372                 :                : 
                                373                 :                :     /* Find the lowest numbered idle worker and mark it not idle. */
   27 tmunro@postgresql.or      374                 :GNC      658460 :     worker = pgaio_workerset_get_lowest(&workerset);
                                375                 :         658460 :     pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
                                376                 :                : 
  413 andres@anarazel.de        377                 :CBC      658460 :     return worker;
                                378                 :                : }
                                379                 :                : 
                                380                 :                : /*
                                381                 :                :  * Try to wake a worker by setting its latch, to tell it there are IOs to
                                382                 :                :  * process in the submission queue.
                                383                 :                :  */
                                384                 :                : static void
   27 tmunro@postgresql.or      385                 :GNC      661016 : pgaio_worker_wake(int worker)
                                386                 :                : {
                                387                 :                :     ProcNumber  proc_number;
                                388                 :                : 
                                389                 :                :     /*
                                390                 :                :      * If the selected worker is concurrently exiting, then pgaio_worker_die()
                                391                 :                :      * had not yet removed it as of when we saw it in idle_workerset.  That's
                                392                 :                :      * OK, because it will wake all remaining workers to close wakeup-vs-exit
                                393                 :                :      * races: *someone* will see the queued IO.  If there are no workers
                                394                 :                :      * running, the postmaster will start a new one.
                                395                 :                :      */
                                396                 :         661016 :     proc_number = io_worker_control->workers[worker].proc_number;
                                397         [ +  + ]:         661016 :     if (proc_number != INVALID_PROC_NUMBER)
                                398                 :         660973 :         SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
                                399                 :         661016 : }
                                400                 :                : 
                                401                 :                : /*
                                402                 :                :  * Try to wake a set of workers.  Used on pool change, to close races
                                403                 :                :  * described in the callers.
                                404                 :                :  */
                                405                 :                : static void
                                406                 :           2624 : pgaio_workerset_wake(PgAioWorkerSet workerset)
                                407                 :                : {
                                408         [ +  + ]:           5180 :     while (!pgaio_workerset_is_empty(&workerset))
                                409                 :           2556 :         pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
                                410                 :           2624 : }
                                411                 :                : 
                                412                 :                : static bool
  413 andres@anarazel.de        413                 :CBC      671066 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
                                414                 :                : {
                                415                 :                :     PgAioWorkerSubmissionQueue *queue;
                                416                 :                :     uint32      new_head;
                                417                 :                : 
   27 tmunro@postgresql.or      418         [ -  + ]:GNC      671066 :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
                                419                 :                : 
  413 andres@anarazel.de        420                 :CBC      671066 :     queue = io_worker_submission_queue;
                                421                 :         671066 :     new_head = (queue->head + 1) & (queue->size - 1);
                                422         [ -  + ]:         671066 :     if (new_head == queue->tail)
                                423                 :                :     {
  413 andres@anarazel.de        424         [ #  # ]:UBC           0 :         pgaio_debug(DEBUG3, "io queue is full, at %u elements",
                                425                 :                :                     io_worker_submission_queue->size);
                                426                 :              0 :         return false;           /* full */
                                427                 :                :     }
                                428                 :                : 
  297 tmunro@postgresql.or      429                 :CBC      671066 :     queue->sqes[queue->head] = pgaio_io_get_id(ioh);
  413 andres@anarazel.de        430                 :         671066 :     queue->head = new_head;
                                431                 :                : 
                                432                 :         671066 :     return true;
                                433                 :                : }
                                434                 :                : 
                                435                 :                : static int
                                436                 :        1066415 : pgaio_worker_submission_queue_consume(void)
                                437                 :                : {
                                438                 :                :     PgAioWorkerSubmissionQueue *queue;
                                439                 :                :     int         result;
                                440                 :                : 
   27 tmunro@postgresql.or      441         [ -  + ]:GNC     1066415 :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
                                442                 :                : 
  413 andres@anarazel.de        443                 :CBC     1066415 :     queue = io_worker_submission_queue;
                                444         [ +  + ]:        1066415 :     if (queue->tail == queue->head)
  257 peter@eisentraut.org      445                 :         530056 :         return -1;              /* empty */
                                446                 :                : 
  297 tmunro@postgresql.or      447                 :         536359 :     result = queue->sqes[queue->tail];
  413 andres@anarazel.de        448                 :         536359 :     queue->tail = (queue->tail + 1) & (queue->size - 1);
                                449                 :                : 
                                450                 :         536359 :     return result;
                                451                 :                : }
                                452                 :                : 
                                453                 :                : static uint32
                                454                 :         308425 : pgaio_worker_submission_queue_depth(void)
                                455                 :                : {
                                456                 :                :     uint32      head;
                                457                 :                :     uint32      tail;
                                458                 :                : 
   27 tmunro@postgresql.or      459         [ -  + ]:GNC      308425 :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
                                460                 :                : 
  413 andres@anarazel.de        461                 :CBC      308425 :     head = io_worker_submission_queue->head;
                                462                 :         308425 :     tail = io_worker_submission_queue->tail;
                                463                 :                : 
                                464         [ +  + ]:         308425 :     if (tail > head)
                                465                 :            215 :         head += io_worker_submission_queue->size;
                                466                 :                : 
                                467         [ -  + ]:         308425 :     Assert(head >= tail);
                                468                 :                : 
                                469                 :         308425 :     return head - tail;
                                470                 :                : }
                                471                 :                : 
                                472                 :                : static bool
                                473                 :        1350360 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
                                474                 :                : {
                                475                 :                :     return
                                476                 :        1350360 :         !IsUnderPostmaster
                                477         [ +  + ]:        1346683 :         || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
                                478   [ +  +  -  + ]:        2697043 :         || !pgaio_io_can_reopen(ioh);
                                479                 :                : }
                                480                 :                : 
                                481                 :                : static int
   30 tmunro@postgresql.or      482                 :GNC      673559 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
                                483                 :                : {
   55 tomas.vondra@postgre      484                 :         673559 :     PgAioHandle **synchronous_ios = NULL;
  413 andres@anarazel.de        485                 :CBC      673559 :     int         nsync = 0;
   27 tmunro@postgresql.or      486                 :GNC      673559 :     int         worker = -1;
                                487                 :                : 
  297 tmunro@postgresql.or      488         [ -  + ]:CBC      673559 :     Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
                                489                 :                : 
   30 tmunro@postgresql.or      490         [ +  + ]:GNC     1347238 :     for (int i = 0; i < num_staged_ios; i++)
                                491                 :         673679 :         pgaio_io_prepare_submit(staged_ios[i]);
                                492                 :                : 
   55 tomas.vondra@postgre      493         [ +  + ]:         673559 :     if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
                                494                 :                :     {
                                495         [ +  + ]:        1342012 :         for (int i = 0; i < num_staged_ios; ++i)
                                496                 :                :         {
                                497         [ -  + ]:         671066 :             Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
                                498         [ -  + ]:         671066 :             if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
                                499                 :                :             {
                                500                 :                :                 /*
                                501                 :                :                  * Do the rest synchronously. If the queue is full, give up
                                502                 :                :                  * and do the rest synchronously. We're holding an exclusive
                                503                 :                :                  * lock on the queue so nothing can consume entries.
                                504                 :                :                  */
   55 tomas.vondra@postgre      505                 :UNC           0 :                 synchronous_ios = &staged_ios[i];
                                506                 :              0 :                 nsync = (num_staged_ios - i);
                                507                 :                : 
                                508                 :              0 :                 break;
                                509                 :                :             }
                                510                 :                :         }
                                511                 :                :         /* Choose one worker to wake for this batch. */
   27 tmunro@postgresql.or      512                 :GNC      670946 :         worker = pgaio_worker_choose_idle(-1);
   55 tomas.vondra@postgre      513                 :         670946 :         LWLockRelease(AioWorkerSubmissionQueueLock);
                                514                 :                : 
                                515                 :                :         /* Wake up chosen worker.  It will wake peers if necessary. */
   27 tmunro@postgresql.or      516         [ +  + ]:         670946 :         if (worker != -1)
                                517                 :         656134 :             pgaio_worker_wake(worker);
                                518                 :                :     }
                                519                 :                :     else
                                520                 :                :     {
                                521                 :                :         /* do everything synchronously, no wakeup needed */
   55 tomas.vondra@postgre      522                 :           2613 :         synchronous_ios = staged_ios;
                                523                 :           2613 :         nsync = num_staged_ios;
                                524                 :                :     }
                                525                 :                : 
                                526                 :                :     /* Run whatever is left synchronously. */
  413 andres@anarazel.de        527         [ +  + ]:CBC      673559 :     if (nsync > 0)
                                528                 :                :     {
  413 andres@anarazel.de        529         [ +  + ]:GBC        5226 :         for (int i = 0; i < nsync; ++i)
                                530                 :                :         {
                                531                 :           2613 :             pgaio_io_perform_synchronously(synchronous_ios[i]);
                                532                 :                :         }
                                533                 :                :     }
                                534                 :                : 
  413 andres@anarazel.de        535                 :CBC      673559 :     return num_staged_ios;
                                536                 :                : }
                                537                 :                : 
                                538                 :                : /*
                                539                 :                :  * on_shmem_exit() callback that releases the worker's slot in
                                540                 :                :  * io_worker_control.
                                541                 :                :  */
                                542                 :                : static void
                                543                 :           1312 : pgaio_worker_die(int code, Datum arg)
                                544                 :                : {
                                545                 :                :     PgAioWorkerSet notify_set;
                                546                 :                : 
   27 tmunro@postgresql.or      547                 :GNC        1312 :     LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
                                548                 :           1312 :     pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
  413 andres@anarazel.de        549                 :CBC        1312 :     LWLockRelease(AioWorkerSubmissionQueueLock);
                                550                 :                : 
   27 tmunro@postgresql.or      551                 :GNC        1312 :     LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
                                552         [ -  + ]:           1312 :     Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
                                553                 :           1312 :     io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
                                554         [ -  + ]:           1312 :     Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
                                555                 :           1312 :     pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
                                556                 :           1312 :     notify_set = io_worker_control->workerset;
                                557         [ -  + ]:           1312 :     Assert(io_worker_control->nworkers > 0);
                                558                 :           1312 :     io_worker_control->nworkers--;
                                559         [ -  + ]:           1312 :     Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
                                560                 :                :            io_worker_control->nworkers);
                                561                 :           1312 :     LWLockRelease(AioWorkerControlLock);
                                562                 :                : 
                                563                 :                :     /*
                                564                 :                :      * Notify other workers on pool change.  This allows the new highest
                                565                 :                :      * worker to know that it is now the one that can time out, and closes a
                                566                 :                :      * wakeup-loss race described in pgaio_worker_wake().
                                567                 :                :      */
                                568                 :           1312 :     pgaio_workerset_wake(notify_set);
  413 andres@anarazel.de        569                 :CBC        1312 : }
                                570                 :                : 
                                571                 :                : /*
                                572                 :                :  * Register the worker in shared memory, assign MyIoWorkerId and register a
                                573                 :                :  * shutdown callback to release registration.
                                574                 :                :  */
                                575                 :                : static void
                                576                 :           1312 : pgaio_worker_register(void)
                                577                 :                : {
                                578                 :                :     PgAioWorkerSet free_workerset;
                                579                 :                :     PgAioWorkerSet old_workerset;
                                580                 :                : 
                                581                 :           1312 :     MyIoWorkerId = -1;
                                582                 :                : 
   27 tmunro@postgresql.or      583                 :GNC        1312 :     LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
                                584                 :                :     /* Find lowest unused worker ID. */
                                585                 :           1312 :     pgaio_workerset_all(&free_workerset);
                                586                 :           1312 :     pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
                                587         [ +  - ]:           1312 :     if (!pgaio_workerset_is_empty(&free_workerset))
                                588                 :           1312 :         MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
                                589         [ -  + ]:           1312 :     if (MyIoWorkerId == -1)
   27 tmunro@postgresql.or      590         [ #  # ]:UNC           0 :         elog(ERROR, "couldn't find a free worker ID");
                                591                 :                : 
   27 tmunro@postgresql.or      592         [ -  + ]:GNC        1312 :     Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
                                593                 :                :            INVALID_PROC_NUMBER);
                                594                 :           1312 :     io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
                                595                 :                : 
                                596                 :           1312 :     old_workerset = io_worker_control->workerset;
                                597         [ -  + ]:           1312 :     Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
                                598                 :           1312 :     pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
                                599                 :           1312 :     io_worker_control->nworkers++;
                                600         [ -  + ]:           1312 :     Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
                                601         [ -  + ]:           1312 :     Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
                                602                 :                :            io_worker_control->nworkers);
                                603                 :           1312 :     LWLockRelease(AioWorkerControlLock);
                                604                 :                : 
                                605                 :                :     /*
                                606                 :                :      * Notify other workers on pool change.  If we were the highest worker,
                                607                 :                :      * this allows the new highest worker to know that it can time out.
                                608                 :                :      */
                                609                 :           1312 :     pgaio_workerset_wake(old_workerset);
                                610                 :                : 
  413 andres@anarazel.de        611                 :CBC        1312 :     on_shmem_exit(pgaio_worker_die, 0);
                                612                 :           1312 : }
                                613                 :                : 
                                614                 :                : static void
  399 melanieplageman@gmai      615                 :           1652 : pgaio_worker_error_callback(void *arg)
                                616                 :                : {
                                617                 :                :     ProcNumber  owner;
                                618                 :                :     PGPROC     *owner_proc;
                                619                 :                :     int32       owner_pid;
                                620                 :           1652 :     PgAioHandle *ioh = arg;
                                621                 :                : 
                                622         [ +  + ]:           1652 :     if (!ioh)
                                623                 :             12 :         return;
                                624                 :                : 
                                625         [ -  + ]:           1640 :     Assert(ioh->owner_procno != MyProcNumber);
                                626         [ -  + ]:           1640 :     Assert(MyBackendType == B_IO_WORKER);
                                627                 :                : 
                                628                 :           1640 :     owner = ioh->owner_procno;
                                629                 :           1640 :     owner_proc = GetPGProcByNumber(owner);
                                630                 :           1640 :     owner_pid = owner_proc->pid;
                                631                 :                : 
                                632                 :           1640 :     errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
                                633                 :                : }
                                634                 :                : 
                                635                 :                : /*
                                636                 :                :  * Check if this backend is allowed to time out, and thus should use a
                                637                 :                :  * non-infinite sleep time.  Only the highest-numbered worker is allowed to
                                638                 :                :  * time out, and only if the pool is above io_min_workers.  Serializing
                                639                 :                :  * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
                                640                 :                :  * io_min_workers.
                                641                 :                :  *
                                642                 :                :  * The result is only instantaneously true and may be temporarily inconsistent
                                643                 :                :  * in different workers around transitions, but all workers are woken up on
                                644                 :                :  * pool size or GUC changes making the result eventually consistent.
                                645                 :                :  */
                                646                 :                : static bool
   27 tmunro@postgresql.or      647                 :GNC      530070 : pgaio_worker_can_timeout(void)
                                648                 :                : {
                                649                 :                :     PgAioWorkerSet workerset;
                                650                 :                : 
                                651         [ +  + ]:         530070 :     if (MyIoWorkerId < io_min_workers)
                                652                 :         518039 :         return false;
                                653                 :                : 
                                654                 :                :     /* Serialize against pool size changes. */
                                655                 :          12031 :     LWLockAcquire(AioWorkerControlLock, LW_SHARED);
                                656                 :          12031 :     workerset = io_worker_control->workerset;
                                657                 :          12031 :     LWLockRelease(AioWorkerControlLock);
                                658                 :                : 
                                659         [ +  + ]:          12031 :     if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
                                660                 :           7870 :         return false;
                                661                 :                : 
                                662                 :           4161 :     return true;
                                663                 :                : }
                                664                 :                : 
                                665                 :                : void
  413 andres@anarazel.de        666                 :CBC        1312 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
                                667                 :                : {
                                668                 :                :     sigjmp_buf  local_sigjmp_buf;
   27 tmunro@postgresql.or      669                 :GNC        1312 :     TimestampTz idle_timeout_abs = 0;
                                670                 :           1312 :     int         timeout_guc_used = 0;
  413 andres@anarazel.de        671                 :CBC        1312 :     PgAioHandle *volatile error_ioh = NULL;
  399 melanieplageman@gmai      672                 :           1312 :     ErrorContextCallback errcallback = {0};
  413 andres@anarazel.de        673                 :           1312 :     volatile int error_errno = 0;
                                674                 :                :     char        cmd[128];
   27 tmunro@postgresql.or      675                 :GNC        1312 :     int         hist_ios = 0;
                                676                 :           1312 :     int         hist_wakeups = 0;
                                677                 :                : 
  413 andres@anarazel.de        678                 :CBC        1312 :     AuxiliaryProcessMainCommon();
                                679                 :                : 
                                680                 :           1312 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
                                681                 :           1312 :     pqsignal(SIGINT, die);      /* to allow manually triggering worker restart */
                                682                 :                : 
                                683                 :                :     /*
                                684                 :                :      * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
                                685                 :                :      * shutdown sequence, similar to checkpointer.
                                686                 :                :      */
   21 andrew@dunslane.net       687                 :GNC        1312 :     pqsignal(SIGTERM, PG_SIG_IGN);
                                688                 :                :     /* SIGQUIT handler was already set up by InitPostmasterChild */
                                689                 :           1312 :     pqsignal(SIGALRM, PG_SIG_IGN);
                                690                 :           1312 :     pqsignal(SIGPIPE, PG_SIG_IGN);
  413 andres@anarazel.de        691                 :CBC        1312 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
                                692                 :           1312 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
                                693                 :                : 
                                694                 :                :     /* also registers a shutdown callback to unregister */
                                695                 :           1312 :     pgaio_worker_register();
                                696                 :                : 
  409 tmunro@postgresql.or      697                 :           1312 :     sprintf(cmd, "%d", MyIoWorkerId);
  413 andres@anarazel.de        698                 :           1312 :     set_ps_display(cmd);
                                699                 :                : 
  399 melanieplageman@gmai      700                 :           1312 :     errcallback.callback = pgaio_worker_error_callback;
                                701                 :           1312 :     errcallback.previous = error_context_stack;
                                702                 :           1312 :     error_context_stack = &errcallback;
                                703                 :                : 
                                704                 :                :     /* see PostgresMain() */
  413 andres@anarazel.de        705         [ +  + ]:           1312 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
                                706                 :                :     {
                                707                 :              1 :         error_context_stack = NULL;
                                708                 :              1 :         HOLD_INTERRUPTS();
                                709                 :                : 
                                710                 :              1 :         EmitErrorReport();
                                711                 :                : 
                                712                 :                :         /*
                                713                 :                :          * In the - very unlikely - case that the IO failed in a way that
                                714                 :                :          * raises an error we need to mark the IO as failed.
                                715                 :                :          *
                                716                 :                :          * Need to do just enough error recovery so that we can mark the IO as
                                717                 :                :          * failed and then exit (postmaster will start a new worker).
                                718                 :                :          */
                                719                 :              1 :         LWLockReleaseAll();
                                720                 :                : 
                                721         [ +  - ]:              1 :         if (error_ioh != NULL)
                                722                 :                :         {
                                723                 :                :             /* should never fail without setting error_errno */
                                724         [ -  + ]:              1 :             Assert(error_errno != 0);
                                725                 :                : 
                                726                 :              1 :             errno = error_errno;
                                727                 :                : 
                                728                 :              1 :             START_CRIT_SECTION();
                                729                 :              1 :             pgaio_io_process_completion(error_ioh, -error_errno);
                                730         [ -  + ]:              1 :             END_CRIT_SECTION();
                                731                 :                :         }
                                732                 :                : 
                                733                 :              1 :         proc_exit(1);
                                734                 :                :     }
                                735                 :                : 
                                736                 :                :     /* We can now handle ereport(ERROR) */
                                737                 :           1312 :     PG_exception_stack = &local_sigjmp_buf;
                                738                 :                : 
                                739                 :           1312 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
                                740                 :                : 
                                741         [ +  + ]:        1067702 :     while (!ShutdownRequestPending)
                                742                 :                :     {
                                743                 :                :         uint32      io_index;
   27 tmunro@postgresql.or      744                 :GNC     1066415 :         int         worker = -1;
                                745                 :        1066415 :         int         queue_depth = 0;
                                746                 :        1066415 :         bool        maybe_grow = false;
                                747                 :                : 
                                748                 :                :         /*
                                749                 :                :          * Try to get a job to do.
                                750                 :                :          *
                                751                 :                :          * The lwlock acquisition also provides the necessary memory barrier
                                752                 :                :          * to ensure that we don't see an outdated data in the handle.
                                753                 :                :          */
  413 andres@anarazel.de        754                 :CBC     1066415 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
  257 peter@eisentraut.org      755         [ +  + ]:        1066415 :         if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
                                756                 :                :         {
                                757                 :                :             /* Nothing to do.  Mark self idle. */
   27 tmunro@postgresql.or      758                 :GNC      530056 :             pgaio_workerset_insert(&io_worker_control->idle_workerset,
                                759                 :                :                                    MyIoWorkerId);
                                760                 :                :         }
                                761                 :                :         else
                                762                 :                :         {
                                763                 :                :             /* Got one.  Clear idle flag. */
                                764                 :         536359 :             pgaio_workerset_remove(&io_worker_control->idle_workerset,
                                765                 :                :                                    MyIoWorkerId);
                                766                 :                : 
                                767                 :                :             /*
                                768                 :                :              * See if we should wake up a higher numbered peer.  Only do that
                                769                 :                :              * if this worker is not receiving spurious wakeups itself.  The
                                770                 :                :              * intention is create a frontier beyond which idle workers stay
                                771                 :                :              * asleep.
                                772                 :                :              *
                                773                 :                :              * This heuristic tries to discover the useful wakeup propagation
                                774                 :                :              * chain length when IOs are very fast and workers wake up to find
                                775                 :                :              * that all IOs have already been taken.
                                776                 :                :              *
                                777                 :                :              * If we chose not to wake a worker when we ideally should have,
                                778                 :                :              * then the ratio will soon change to correct that.
                                779                 :                :              */
                                780         [ +  + ]:         536359 :             if (hist_wakeups <= hist_ios)
                                781                 :                :             {
                                782                 :         308425 :                 queue_depth = pgaio_worker_submission_queue_depth();
                                783         [ +  + ]:         308425 :                 if (queue_depth > 0)
                                784                 :                :                 {
                                785                 :                :                     /* Choose a worker higher than me to wake. */
                                786                 :          13241 :                     worker = pgaio_worker_choose_idle(MyIoWorkerId);
                                787         [ +  + ]:          13241 :                     if (worker == -1)
                                788                 :          10915 :                         maybe_grow = true;
                                789                 :                :                 }
                                790                 :                :             }
                                791                 :                :         }
  413 andres@anarazel.de        792                 :CBC     1066415 :         LWLockRelease(AioWorkerSubmissionQueueLock);
                                793                 :                : 
                                794                 :                :         /* Propagate wakeups. */
   27 tmunro@postgresql.or      795         [ +  + ]:GNC     1066415 :         if (worker != -1)
                                796                 :                :         {
                                797                 :           2326 :             pgaio_worker_wake(worker);
                                798                 :                :         }
                                799         [ +  + ]:        1064089 :         else if (maybe_grow)
                                800                 :                :         {
                                801                 :                :             /*
                                802                 :                :              * We know there was at least one more item in the queue, and we
                                803                 :                :              * failed to find a higher-numbered idle worker to wake.  Now we
                                804                 :                :              * decide if we should try to start one more worker.
                                805                 :                :              *
                                806                 :                :              * We do this with a simple heuristic: is the queue depth greater
                                807                 :                :              * than the current number of workers?
                                808                 :                :              *
                                809                 :                :              * Consider the following situations:
                                810                 :                :              *
                                811                 :                :              * 1. The queue depth is constantly increasing, because IOs are
                                812                 :                :              * arriving faster than they can possibly be serviced.  It doesn't
                                813                 :                :              * matter much which threshold we choose, as we will surely hit
                                814                 :                :              * it.  Crossing the current worker count is a useful signal
                                815                 :                :              * because it's clearly too deep to avoid queuing latency already,
                                816                 :                :              * but still leaves a small window of opportunity to improve the
                                817                 :                :              * situation before the queue overflows.
                                818                 :                :              *
                                819                 :                :              * 2. The worker pool is keeping up, no latency is being
                                820                 :                :              * introduced and an extra worker would be a waste of resources.
                                821                 :                :              * Queue depth distributions tend to be heavily skewed, with long
                                822                 :                :              * tails of low probability spikes (due to submission clustering,
                                823                 :                :              * scheduling, jitter, stalls, noisy neighbors, etc).  We want a
                                824                 :                :              * number that is very unlikely to be triggered by an outlier, and
                                825                 :                :              * we bet that an exponential or similar distribution whose
                                826                 :                :              * outliers never reach this threshold must be almost entirely
                                827                 :                :              * concentrated at the low end.  If we do see a spike as big as
                                828                 :                :              * the worker count, we take it as a signal that the distribution
                                829                 :                :              * is surely too wide.
                                830                 :                :              *
                                831                 :                :              * On its own, this is an extremely crude signal.  When combined
                                832                 :                :              * with the wakeup propagation test that precedes it (but on its
                                833                 :                :              * own tends to overshoot) and io_worker_launch_interval, the
                                834                 :                :              * result is that we gradually test each pool size until we find
                                835                 :                :              * one that doesn't trigger further expansion, and then hold it
                                836                 :                :              * for at least io_worker_idle_timeout.
                                837                 :                :              *
                                838                 :                :              * XXX Perhaps ideas from queueing theory or control theory could
                                839                 :                :              * do a better job of this.
                                840                 :                :              */
                                841                 :                : 
                                842                 :                :             /* Read nworkers without lock for this heuristic purpose. */
                                843         [ +  + ]:          10915 :             if (queue_depth > io_worker_control->nworkers)
                                844                 :            250 :                 pgaio_worker_request_grow();
                                845                 :                :         }
                                846                 :                : 
  257 peter@eisentraut.org      847         [ +  + ]:CBC     1066415 :         if (io_index != -1)
                                848                 :                :         {
  413 andres@anarazel.de        849                 :         536359 :             PgAioHandle *ioh = NULL;
                                850                 :                : 
                                851                 :                :             /* Cancel timeout and update wakeup:work ratio. */
   27 tmunro@postgresql.or      852                 :GNC      536359 :             idle_timeout_abs = 0;
                                853         [ +  + ]:         536359 :             if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
                                854                 :                :             {
                                855                 :         154278 :                 hist_wakeups /= 2;
                                856                 :         154278 :                 hist_ios /= 2;
                                857                 :                :             }
                                858                 :                : 
  413 andres@anarazel.de        859                 :CBC      536359 :             ioh = &pgaio_ctl->io_handles[io_index];
                                860                 :         536359 :             error_ioh = ioh;
  399 melanieplageman@gmai      861                 :         536359 :             errcallback.arg = ioh;
                                862                 :                : 
  413 andres@anarazel.de        863         [ -  + ]:         536359 :             pgaio_debug_io(DEBUG4, ioh,
                                864                 :                :                            "worker %d processing IO",
                                865                 :                :                            MyIoWorkerId);
                                866                 :                : 
                                867                 :                :             /*
                                868                 :                :              * Prevent interrupts between pgaio_io_reopen() and
                                869                 :                :              * pgaio_io_perform_synchronously() that otherwise could lead to
                                870                 :                :              * the FD getting closed in that window.
                                871                 :                :              */
  405                           872                 :         536359 :             HOLD_INTERRUPTS();
                                873                 :                : 
                                874                 :                :             /*
                                875                 :                :              * It's very unlikely, but possible, that reopen fails. E.g. due
                                876                 :                :              * to memory allocations failing or file permissions changing or
                                877                 :                :              * such.  In that case we need to fail the IO.
                                878                 :                :              *
                                879                 :                :              * There's not really a good errno we can report here.
                                880                 :                :              */
  413                           881                 :         536359 :             error_errno = ENOENT;
                                882                 :         536359 :             pgaio_io_reopen(ioh);
                                883                 :                : 
                                884                 :                :             /*
                                885                 :                :              * To be able to exercise the reopen-fails path, allow injection
                                886                 :                :              * points to trigger a failure at this point.
                                887                 :                :              */
  360 michael@paquier.xyz       888                 :         536359 :             INJECTION_POINT("aio-worker-after-reopen", ioh);
                                889                 :                : 
  413 andres@anarazel.de        890                 :         536358 :             error_errno = 0;
                                891                 :         536358 :             error_ioh = NULL;
                                892                 :                : 
                                893                 :                :             /*
                                894                 :                :              * As part of IO completion the buffer will be marked as NOACCESS,
                                895                 :                :              * until the buffer is pinned again - which never happens in io
                                896                 :                :              * workers. Therefore the next time there is IO for the same
                                897                 :                :              * buffer, the memory will be considered inaccessible. To avoid
                                898                 :                :              * that, explicitly allow access to the memory before reading data
                                899                 :                :              * into it.
                                900                 :                :              */
                                901                 :                : #ifdef USE_VALGRIND
                                902                 :                :             {
                                903                 :                :                 struct iovec *iov;
                                904                 :                :                 uint16      iov_length = pgaio_io_get_iovec_length(ioh, &iov);
                                905                 :                : 
                                906                 :                :                 for (int i = 0; i < iov_length; i++)
                                907                 :                :                     VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
                                908                 :                :             }
                                909                 :                : #endif
                                910                 :                : 
                                911                 :                : #ifdef PGAIO_WORKER_SHOW_PS_INFO
                                912                 :                :             {
                                913                 :                :                 char       *description = pgaio_io_get_target_description(ioh);
                                914                 :                : 
                                915                 :                :                 sprintf(cmd, "%d: [%s] %s",
                                916                 :                :                         MyIoWorkerId,
                                917                 :                :                         pgaio_io_get_op_name(ioh),
                                918                 :                :                         description);
                                919                 :                :                 pfree(description);
                                920                 :                :                 set_ps_display(cmd);
                                921                 :                :             }
                                922                 :                : #endif
                                923                 :                : 
                                924                 :                :             /*
                                925                 :                :              * We don't expect this to ever fail with ERROR or FATAL, no need
                                926                 :                :              * to keep error_ioh set to the IO.
                                927                 :                :              * pgaio_io_perform_synchronously() contains a critical section to
                                928                 :                :              * ensure we don't accidentally fail.
                                929                 :                :              */
                                930                 :         536358 :             pgaio_io_perform_synchronously(ioh);
                                931                 :                : 
  405                           932         [ -  + ]:         536358 :             RESUME_INTERRUPTS();
  399 melanieplageman@gmai      933                 :         536358 :             errcallback.arg = NULL;
                                934                 :                :         }
                                935                 :                :         else
                                936                 :                :         {
                                937                 :                :             int         timeout_ms;
                                938                 :                : 
                                939                 :                :             /* Cancel new worker request if pending. */
   27 tmunro@postgresql.or      940                 :GNC      530056 :             pgaio_worker_cancel_grow();
                                941                 :                : 
                                942                 :                :             /* Compute the remaining allowed idle time. */
                                943         [ -  + ]:         530056 :             if (io_worker_idle_timeout == -1)
                                944                 :                :             {
                                945                 :                :                 /* Never time out. */
   27 tmunro@postgresql.or      946                 :UNC           0 :                 timeout_ms = -1;
                                947                 :                :             }
                                948                 :                :             else
                                949                 :                :             {
   27 tmunro@postgresql.or      950                 :GNC      530056 :                 TimestampTz now = GetCurrentTimestamp();
                                951                 :                : 
                                952                 :                :                 /* If the GUC changes, reset timer. */
                                953         [ +  + ]:         530056 :                 if (idle_timeout_abs != 0 &&
                                954         [ -  + ]:           1152 :                     io_worker_idle_timeout != timeout_guc_used)
   27 tmunro@postgresql.or      955                 :UNC           0 :                     idle_timeout_abs = 0;
                                956                 :                : 
                                957                 :                :                 /* Only the highest-numbered worker can time out. */
   27 tmunro@postgresql.or      958         [ +  + ]:GNC      530056 :                 if (pgaio_worker_can_timeout())
                                959                 :                :                 {
                                960         [ +  + ]:           4147 :                     if (idle_timeout_abs == 0)
                                961                 :                :                     {
                                962                 :                :                         /*
                                963                 :                :                          * I have just been promoted to the timeout worker, or
                                964                 :                :                          * the GUC changed.  Compute new absolute time from
                                965                 :                :                          * now.
                                966                 :                :                          */
                                967                 :           2996 :                         idle_timeout_abs =
                                968                 :           2996 :                             TimestampTzPlusMilliseconds(now,
                                969                 :                :                                                         io_worker_idle_timeout);
                                970                 :           2996 :                         timeout_guc_used = io_worker_idle_timeout;
                                971                 :                :                     }
                                972                 :           4147 :                     timeout_ms =
                                973                 :           4147 :                         TimestampDifferenceMilliseconds(now, idle_timeout_abs);
                                974                 :                :                 }
                                975                 :                :                 else
                                976                 :                :                 {
                                977                 :                :                     /* No timeout for me. */
                                978                 :         525909 :                     idle_timeout_abs = 0;
                                979                 :         525909 :                     timeout_ms = -1;
                                980                 :                :                 }
                                981                 :                :             }
                                982                 :                : 
                                983                 :                : #ifdef PGAIO_WORKER_SHOW_PS_INFO
                                984                 :                :             sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
                                985                 :                :                     MyIoWorkerId, hist_wakeups, hist_ios);
                                986                 :                :             set_ps_display(cmd);
                                987                 :                : #endif
                                988                 :                : 
                                989         [ +  + ]:         530056 :             if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
                                990                 :                :                           timeout_ms,
                                991                 :                :                           WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
                                992                 :                :             {
                                993                 :                :                 /* WL_TIMEOUT */
                                994         [ +  - ]:             14 :                 if (pgaio_worker_can_timeout())
                                995         [ +  - ]:             14 :                     if (GetCurrentTimestamp() >= idle_timeout_abs)
                                996                 :             14 :                         break;
                                997                 :                :             }
                                998                 :                :             else
                                999                 :                :             {
                               1000                 :                :                 /* WL_LATCH_SET */
                               1001         [ +  + ]:         530036 :                 if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
                               1002                 :                :                 {
                               1003                 :         119440 :                     hist_wakeups /= 2;
                               1004                 :         119440 :                     hist_ios /= 2;
                               1005                 :                :                 }
                               1006                 :                :             }
  413 andres@anarazel.de       1007                 :CBC      530036 :             ResetLatch(MyLatch);
                               1008                 :                :         }
                               1009                 :                : 
                               1010         [ +  + ]:        1066394 :         CHECK_FOR_INTERRUPTS();
                               1011                 :                : 
  297 tmunro@postgresql.or     1012         [ +  + ]:        1066390 :         if (ConfigReloadPending)
                               1013                 :                :         {
                               1014                 :            234 :             ConfigReloadPending = false;
                               1015                 :            234 :             ProcessConfigFile(PGC_SIGHUP);
                               1016                 :                : 
                               1017                 :                :             /* If io_max_workers has been decreased, exit highest first. */
   27 tmunro@postgresql.or     1018         [ -  + ]:GNC         234 :             if (MyIoWorkerId >= io_max_workers)
   27 tmunro@postgresql.or     1019                 :UNC           0 :                 break;
                               1020                 :                :         }
                               1021                 :                :     }
                               1022                 :                : 
  399 melanieplageman@gmai     1023                 :CBC        1301 :     error_context_stack = errcallback.previous;
  413 andres@anarazel.de       1024                 :           1301 :     proc_exit(0);
                               1025                 :                : }
                               1026                 :                : 
                               1027                 :                : bool
                               1028                 :          92315 : pgaio_workers_enabled(void)
                               1029                 :                : {
                               1030                 :          92315 :     return io_method == IOMETHOD_WORKER;
                               1031                 :                : }
        

Generated by: LCOV version 2.5.0-beta