LCOV - differential code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit UBC GNC CBC DCB
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 31.8 % 44 14 30 1 13 1
Current Date: 2025-09-06 07:49:51 +0900 Functions: 100.0 % 3 3 1 2
Baseline: lcov-20250906-005545-baseline Branches: 10.0 % 30 3 27 3
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 100.0 % 1 1 1
(360..) days: 30.2 % 43 13 30 13
Function coverage date bins:
(360..) days: 100.0 % 3 3 1 2
Branch coverage date bins:
(360..) days: 10.0 % 30 3 27 3

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*
                                  2                 :                :  *  parallel.c
                                  3                 :                :  *
                                  4                 :                :  *  multi-process support
                                  5                 :                :  *
                                  6                 :                :  *  Copyright (c) 2010-2025, PostgreSQL Global Development Group
                                  7                 :                :  *  src/bin/pg_upgrade/parallel.c
                                  8                 :                :  */
                                  9                 :                : 
                                 10                 :                : #include "postgres_fe.h"
                                 11                 :                : 
                                 12                 :                : #include <sys/wait.h>
                                 13                 :                : #ifdef WIN32
                                 14                 :                : #include <io.h>
                                 15                 :                : #endif
                                 16                 :                : 
                                 17                 :                : #include "pg_upgrade.h"
                                 18                 :                : 
                                 19                 :                : static int  parallel_jobs;
                                 20                 :                : 
                                 21                 :                : #ifdef WIN32
                                 22                 :                : /*
                                 23                 :                :  *  Array holding all active threads.  There can't be any gaps/zeros so
                                 24                 :                :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
                                 25                 :                :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
                                 26                 :                :  */
                                 27                 :                : static HANDLE *thread_handles;
                                 28                 :                : 
                                 29                 :                : typedef struct
                                 30                 :                : {
                                 31                 :                :     char       *log_file;
                                 32                 :                :     char       *opt_log_file;
                                 33                 :                :     char       *cmd;
                                 34                 :                : } exec_thread_arg;
                                 35                 :                : 
                                 36                 :                : typedef struct
                                 37                 :                : {
                                 38                 :                :     DbInfoArr  *old_db_arr;
                                 39                 :                :     DbInfoArr  *new_db_arr;
                                 40                 :                :     char       *old_pgdata;
                                 41                 :                :     char       *new_pgdata;
                                 42                 :                :     char       *old_tablespace;
                                 43                 :                :     char       *new_tablespace;
                                 44                 :                : } transfer_thread_arg;
                                 45                 :                : 
                                 46                 :                : static exec_thread_arg **exec_thread_args;
                                 47                 :                : static transfer_thread_arg **transfer_thread_args;
                                 48                 :                : 
                                 49                 :                : /* track current thread_args struct so reap_child() can be used for all cases */
                                 50                 :                : static void **cur_thread_args;
                                 51                 :                : 
                                 52                 :                : DWORD       win32_exec_prog(exec_thread_arg *args);
                                 53                 :                : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
                                 54                 :                : #endif
                                 55                 :                : 
                                 56                 :                : /*
                                 57                 :                :  *  parallel_exec_prog
                                 58                 :                :  *
                                 59                 :                :  *  This has the same API as exec_prog, except it does parallel execution,
                                 60                 :                :  *  and therefore must throw errors and doesn't return an error status.
                                 61                 :                :  */
                                 62                 :                : void
 4637 bruce@momjian.us           63                 :CBC          54 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
                                 64                 :                :                    const char *fmt,...)
                                 65                 :                : {
                                 66                 :                :     va_list     args;
                                 67                 :                :     char        cmd[MAX_STRING];
                                 68                 :                : 
                                 69                 :                : #ifndef WIN32
                                 70                 :                :     pid_t       child;
                                 71                 :                : #else
                                 72                 :                :     HANDLE      child;
                                 73                 :                :     exec_thread_arg *new_arg;
                                 74                 :                : #endif
                                 75                 :                : 
                                 76                 :             54 :     va_start(args, fmt);
                                 77                 :             54 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
                                 78                 :             54 :     va_end(args);
                                 79                 :                : 
                                 80         [ +  - ]:             54 :     if (user_opts.jobs <= 1)
                                 81                 :                :         /* exit_on_error must be true to allow jobs */
 2798                            82                 :             54 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
                                 83                 :                :     else
                                 84                 :                :     {
                                 85                 :                :         /* parallel */
                                 86                 :                : #ifdef WIN32
                                 87                 :                :         if (thread_handles == NULL)
                                 88                 :                :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                 89                 :                : 
                                 90                 :                :         if (exec_thread_args == NULL)
                                 91                 :                :         {
                                 92                 :                :             int         i;
                                 93                 :                : 
                                 94                 :                :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
                                 95                 :                : 
                                 96                 :                :             /*
                                 97                 :                :              * For safety and performance, we keep the args allocated during
                                 98                 :                :              * the entire life of the process, and we don't free the args in a
                                 99                 :                :              * thread different from the one that allocated it.
                                100                 :                :              */
                                101                 :                :             for (i = 0; i < user_opts.jobs; i++)
                                102                 :                :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
                                103                 :                :         }
                                104                 :                : 
                                105                 :                :         cur_thread_args = (void **) exec_thread_args;
                                106                 :                : #endif
                                107                 :                :         /* harvest any dead children */
 4637 bruce@momjian.us          108         [ #  # ]:UBC           0 :         while (reap_child(false) == true)
                                109                 :                :             ;
                                110                 :                : 
                                111                 :                :         /* must we wait for a dead child? */
                                112         [ #  # ]:              0 :         if (parallel_jobs >= user_opts.jobs)
                                113                 :              0 :             reap_child(true);
                                114                 :                : 
                                115                 :                :         /* set this before we start the job */
                                116                 :              0 :         parallel_jobs++;
                                117                 :                : 
                                118                 :                :         /* Ensure stdio state is quiesced before forking */
                                119                 :              0 :         fflush(NULL);
                                120                 :                : 
                                121                 :                : #ifndef WIN32
                                122                 :              0 :         child = fork();
                                123         [ #  # ]:              0 :         if (child == 0)
                                124                 :                :             /* use _exit to skip atexit() functions */
 2798                           125                 :              0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
 4637                           126         [ #  # ]:              0 :         else if (child < 0)
                                127                 :                :             /* fork failed */
  543 michael@paquier.xyz       128                 :              0 :             pg_fatal("could not create worker process: %m");
                                129                 :                : #else
                                130                 :                :         /* empty array element are always at the end */
                                131                 :                :         new_arg = exec_thread_args[parallel_jobs - 1];
                                132                 :                : 
                                133                 :                :         /* Can only pass one pointer into the function, so use a struct */
                                134                 :                :         pg_free(new_arg->log_file);
                                135                 :                :         new_arg->log_file = pg_strdup(log_file);
                                136                 :                :         pg_free(new_arg->opt_log_file);
                                137                 :                :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
                                138                 :                :         pg_free(new_arg->cmd);
                                139                 :                :         new_arg->cmd = pg_strdup(cmd);
                                140                 :                : 
                                141                 :                :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
                                142                 :                :                                         new_arg, 0, NULL);
                                143                 :                :         if (child == 0)
                                144                 :                :             pg_fatal("could not create worker thread: %m");
                                145                 :                : 
                                146                 :                :         thread_handles[parallel_jobs - 1] = child;
                                147                 :                : #endif
                                148                 :                :     }
 4637 bruce@momjian.us          149                 :CBC          54 : }
                                150                 :                : 
                                151                 :                : 
                                152                 :                : #ifdef WIN32
                                153                 :                : DWORD
                                154                 :                : win32_exec_prog(exec_thread_arg *args)
                                155                 :                : {
                                156                 :                :     int         ret;
                                157                 :                : 
                                158                 :                :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
                                159                 :                : 
                                160                 :                :     /* terminates thread */
                                161                 :                :     return ret;
                                162                 :                : }
                                163                 :                : #endif
                                164                 :                : 
                                165                 :                : 
                                166                 :                : /*
                                167                 :                :  *  parallel_transfer_all_new_dbs
                                168                 :                :  *
                                169                 :                :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
                                170                 :                :  *  by transferring multiple tablespaces in parallel
                                171                 :                :  */
                                172                 :                : void
 4483                           173                 :              8 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
                                174                 :                :                               char *old_pgdata, char *new_pgdata,
                                175                 :                :                               char *old_tablespace, char *new_tablespace)
                                176                 :                : {
                                177                 :                : #ifndef WIN32
                                178                 :                :     pid_t       child;
                                179                 :                : #else
                                180                 :                :     HANDLE      child;
                                181                 :                :     transfer_thread_arg *new_arg;
                                182                 :                : #endif
                                183                 :                : 
 4623                           184         [ +  - ]:              8 :     if (user_opts.jobs <= 1)
   38 nathan@postgresql.or      185                 :GNC           8 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL, NULL);
                                186                 :                :     else
                                187                 :                :     {
                                188                 :                :         /* parallel */
                                189                 :                : #ifdef WIN32
                                190                 :                :         if (thread_handles == NULL)
                                191                 :                :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                192                 :                : 
                                193                 :                :         if (transfer_thread_args == NULL)
                                194                 :                :         {
                                195                 :                :             int         i;
                                196                 :                : 
                                197                 :                :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
                                198                 :                : 
                                199                 :                :             /*
                                200                 :                :              * For safety and performance, we keep the args allocated during
                                201                 :                :              * the entire life of the process, and we don't free the args in a
                                202                 :                :              * thread different from the one that allocated it.
                                203                 :                :              */
                                204                 :                :             for (i = 0; i < user_opts.jobs; i++)
                                205                 :                :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
                                206                 :                :         }
                                207                 :                : 
                                208                 :                :         cur_thread_args = (void **) transfer_thread_args;
                                209                 :                : #endif
                                210                 :                :         /* harvest any dead children */
 4623 bruce@momjian.us          211         [ #  # ]:UBC           0 :         while (reap_child(false) == true)
                                212                 :                :             ;
                                213                 :                : 
                                214                 :                :         /* must we wait for a dead child? */
                                215         [ #  # ]:              0 :         if (parallel_jobs >= user_opts.jobs)
                                216                 :              0 :             reap_child(true);
                                217                 :                : 
                                218                 :                :         /* set this before we start the job */
                                219                 :              0 :         parallel_jobs++;
                                220                 :                : 
                                221                 :                :         /* Ensure stdio state is quiesced before forking */
                                222                 :              0 :         fflush(NULL);
                                223                 :                : 
                                224                 :                : #ifndef WIN32
                                225                 :              0 :         child = fork();
                                226         [ #  # ]:              0 :         if (child == 0)
                                227                 :                :         {
                                228                 :              0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
                                229                 :                :                                  old_tablespace, new_tablespace);
                                230                 :                :             /* if we take another exit path, it will be non-zero */
                                231                 :                :             /* use _exit to skip atexit() functions */
                                232                 :              0 :             _exit(0);
                                233                 :                :         }
                                234         [ #  # ]:              0 :         else if (child < 0)
                                235                 :                :             /* fork failed */
  543 michael@paquier.xyz       236                 :              0 :             pg_fatal("could not create worker process: %m");
                                237                 :                : #else
                                238                 :                :         /* empty array element are always at the end */
                                239                 :                :         new_arg = transfer_thread_args[parallel_jobs - 1];
                                240                 :                : 
                                241                 :                :         /* Can only pass one pointer into the function, so use a struct */
                                242                 :                :         new_arg->old_db_arr = old_db_arr;
                                243                 :                :         new_arg->new_db_arr = new_db_arr;
                                244                 :                :         pg_free(new_arg->old_pgdata);
                                245                 :                :         new_arg->old_pgdata = pg_strdup(old_pgdata);
                                246                 :                :         pg_free(new_arg->new_pgdata);
                                247                 :                :         new_arg->new_pgdata = pg_strdup(new_pgdata);
                                248                 :                :         pg_free(new_arg->old_tablespace);
                                249                 :                :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
                                250                 :                :         new_arg->new_tablespace = new_tablespace ? pg_strdup(new_tablespace) : NULL;
                                251                 :                : 
                                252                 :                :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
                                253                 :                :                                         new_arg, 0, NULL);
                                254                 :                :         if (child == 0)
                                255                 :                :             pg_fatal("could not create worker thread: %m");
                                256                 :                : 
                                257                 :                :         thread_handles[parallel_jobs - 1] = child;
                                258                 :                : #endif
                                259                 :                :     }
 4623 bruce@momjian.us          260                 :CBC           8 : }
                                261                 :                : 
                                262                 :                : 
                                263                 :                : #ifdef WIN32
                                264                 :                : DWORD
                                265                 :                : win32_transfer_all_new_dbs(transfer_thread_arg *args)
                                266                 :                : {
                                267                 :                :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
                                268                 :                :                          args->new_pgdata, args->old_tablespace,
                                269                 :                :                          args->new_tablespace);
                                270                 :                : 
                                271                 :                :     /* terminates thread */
                                272                 :                :     return 0;
                                273                 :                : }
                                274                 :                : #endif
                                275                 :                : 
                                276                 :                : 
                                277                 :                : /*
                                278                 :                :  *  collect status from a completed worker child
                                279                 :                :  */
                                280                 :                : bool
 4637                           281                 :             18 : reap_child(bool wait_for_child)
                                282                 :                : {
                                283                 :                : #ifndef WIN32
                                284                 :                :     int         work_status;
                                285                 :                :     pid_t       child;
                                286                 :                : #else
                                287                 :                :     int         thread_num;
                                288                 :                :     DWORD       res;
                                289                 :                : #endif
                                290                 :                : 
                                291   [ -  +  -  - ]:             18 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
                                292                 :             18 :         return false;
                                293                 :                : 
                                294                 :                : #ifndef WIN32
 2456 tgl@sss.pgh.pa.us         295                 :UBC           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
                                296         [ #  # ]:              0 :     if (child == (pid_t) -1)
  543 michael@paquier.xyz       297                 :              0 :         pg_fatal("%s() failed: %m", "waitpid");
 2456 tgl@sss.pgh.pa.us         298         [ #  # ]:              0 :     if (child == 0)
                                299                 :              0 :         return false;           /* no children, or no dead children */
                                300         [ #  # ]:              0 :     if (work_status != 0)
 1152                           301                 :              0 :         pg_fatal("child process exited abnormally: status %d", work_status);
                                302                 :                : #else
                                303                 :                :     /* wait for one to finish */
                                304                 :                :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
                                305                 :                :                                         false, wait_for_child ? INFINITE : 0);
                                306                 :                : 
                                307                 :                :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
                                308                 :                :         return false;
                                309                 :                : 
                                310                 :                :     /* compute thread index in active_threads */
                                311                 :                :     thread_num -= WAIT_OBJECT_0;
                                312                 :                : 
                                313                 :                :     /* get the result */
                                314                 :                :     GetExitCodeThread(thread_handles[thread_num], &res);
                                315                 :                :     if (res != 0)
                                316                 :                :         pg_fatal("child worker exited abnormally: %m");
                                317                 :                : 
                                318                 :                :     /* dispose of handle to stop leaks */
                                319                 :                :     CloseHandle(thread_handles[thread_num]);
                                320                 :                : 
                                321                 :                :     /* Move last slot into dead child's position */
                                322                 :                :     if (thread_num != parallel_jobs - 1)
                                323                 :                :     {
                                324                 :                :         void       *tmp_args;
                                325                 :                : 
                                326                 :                :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
                                327                 :                : 
                                328                 :                :         /*
                                329                 :                :          * Move last active thread arg struct into the now-dead slot, and the
                                330                 :                :          * now-dead slot to the end for reuse by the next thread. Though the
                                331                 :                :          * thread struct is in use by another thread, we can safely swap the
                                332                 :                :          * struct pointers within the array.
                                333                 :                :          */
                                334                 :                :         tmp_args = cur_thread_args[thread_num];
                                335                 :                :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
                                336                 :                :         cur_thread_args[parallel_jobs - 1] = tmp_args;
                                337                 :                :     }
                                338                 :                : #endif
                                339                 :                : 
                                340                 :                :     /* do this after job has been removed */
 4637 bruce@momjian.us          341                 :              0 :     parallel_jobs--;
                                342                 :                : 
                                343                 :              0 :     return true;
                                344                 :                : }
        

Generated by: LCOV version 2.4-beta