Age Owner Branch data TLA Line data Source code
1 : : /*
2 : : * task.c
3 : : * framework for parallelizing pg_upgrade's once-in-each-database tasks
4 : : *
5 : : * This framework provides an efficient way of running the various
6 : : * once-in-each-database tasks required by pg_upgrade. Specifically, it
7 : : * parallelizes these tasks by managing a set of slots that follow a simple
8 : : * state machine and by using libpq's asynchronous APIs to establish the
9 : : * connections and run the queries. Callers simply need to create a callback
10 : : * function and build/execute an UpgradeTask. A simple example follows:
11 : : *
12 : : * static void
13 : : * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
14 : : * {
15 : : * for (int i = 0; i < PQntuples(res); i++)
16 : : * {
17 : : * ... process results ...
18 : : * }
19 : : * }
20 : : *
21 : : * void
22 : : * my_task(ClusterInfo *cluster)
23 : : * {
24 : : * UpgradeTask *task = upgrade_task_create();
25 : : *
26 : : * upgrade_task_add_step(task,
27 : : * "... query text ...",
28 : : * my_process_cb,
29 : : * true, // let the task free the PGresult
30 : : * NULL); // "arg" pointer for callback
31 : : * upgrade_task_run(task, cluster);
32 : : * upgrade_task_free(task);
33 : : * }
34 : : *
35 : : * Note that multiple steps can be added to a given task. When there are
36 : : * multiple steps, the task will run all of the steps consecutively in the same
37 : : * database connection before freeing the connection and moving on. In other
38 : : * words, it only ever initiates one connection to each database in the
39 : : * cluster for a given run.
40 : : *
41 : : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
42 : : * src/bin/pg_upgrade/task.c
43 : : */
44 : :
45 : : #include "postgres_fe.h"
46 : :
47 : : #include "common/connect.h"
48 : : #include "fe_utils/string_utils.h"
49 : : #include "pg_upgrade.h"
50 : :
51 : : /*
52 : : * dbs_complete stores the number of databases that we have completed
53 : : * processing. When this value equals the number of databases in the cluster,
54 : : * the task is finished.
55 : : */
56 : : static int dbs_complete;
57 : :
58 : : /*
59 : : * dbs_processing stores the index of the next database in the cluster's array
60 : : * of databases that will be picked up for processing. It will always be
61 : : * greater than or equal to dbs_complete.
62 : : */
63 : : static int dbs_processing;
64 : :
65 : : /*
66 : : * This struct stores the information for a single step of a task. Note that
67 : : * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
68 : : * All steps in a task are run in a single connection before moving on to the
69 : : * next database (which requires a new connection).
70 : : */
71 : : typedef struct UpgradeTaskStep
72 : : {
73 : : UpgradeTaskProcessCB process_cb; /* processes the results of the query */
74 : : bool free_result; /* should we free the result? */
75 : : void *arg; /* pointer passed to process_cb */
76 : : } UpgradeTaskStep;
77 : :
78 : : /*
79 : : * This struct is a thin wrapper around an array of steps, i.e.,
80 : : * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
81 : : */
82 : : struct UpgradeTask
83 : : {
84 : : UpgradeTaskStep *steps;
85 : : int num_steps;
86 : : PQExpBuffer queries;
87 : : };
88 : :
89 : : /*
90 : : * The different states for a parallel slot.
91 : : */
92 : : typedef enum UpgradeTaskSlotState
93 : : {
94 : : FREE, /* slot available for use in a new database */
95 : : CONNECTING, /* waiting for connection to be established */
96 : : RUNNING_QUERIES, /* running/processing queries in the task */
97 : : } UpgradeTaskSlotState;
98 : :
99 : : /*
100 : : * We maintain an array of user_opts.jobs slots to execute the task.
101 : : */
102 : : typedef struct UpgradeTaskSlot
103 : : {
104 : : UpgradeTaskSlotState state; /* state of the slot */
105 : : int db_idx; /* index of the database assigned to slot */
106 : : int step_idx; /* index of the current step of task */
107 : : PGconn *conn; /* current connection managed by slot */
108 : : bool ready; /* slot is ready for processing */
109 : : bool select_mode; /* select() mode: true->read, false->write */
110 : : int sock; /* file descriptor for connection's socket */
111 : : } UpgradeTaskSlot;
112 : :
113 : : /*
114 : : * Initializes an UpgradeTask.
115 : : */
116 : : UpgradeTask *
355 nathan@postgresql.or 117 :CBC 86 : upgrade_task_create(void)
118 : : {
119 : 86 : UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
120 : :
121 : 86 : task->queries = createPQExpBuffer();
122 : :
123 : : /* All tasks must first set a secure search_path. */
124 : 86 : upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
125 : :
126 : 86 : return task;
127 : : }
128 : :
129 : : /*
130 : : * Frees all storage associated with an UpgradeTask.
131 : : */
132 : : void
133 : 86 : upgrade_task_free(UpgradeTask *task)
134 : : {
135 : 86 : destroyPQExpBuffer(task->queries);
136 : 86 : pg_free(task->steps);
137 : 86 : pg_free(task);
138 : 86 : }
139 : :
140 : : /*
141 : : * Adds a step to an UpgradeTask. The steps will be executed in each database
142 : : * in the order in which they are added.
143 : : *
144 : : * task: task object that must have been initialized via upgrade_task_create()
145 : : * query: the query text
146 : : * process_cb: function that processes the results of the query
147 : : * free_result: should we free the PGresult, or leave it to the caller?
148 : : * arg: pointer to task-specific data that is passed to each callback
149 : : */
150 : : void
151 : 200 : upgrade_task_add_step(UpgradeTask *task, const char *query,
152 : : UpgradeTaskProcessCB process_cb, bool free_result,
153 : : void *arg)
154 : : {
155 : : UpgradeTaskStep *new_step;
156 : :
157 : 400 : task->steps = pg_realloc(task->steps,
158 : 200 : ++task->num_steps * sizeof(UpgradeTaskStep));
159 : :
160 : 200 : new_step = &task->steps[task->num_steps - 1];
161 : 200 : new_step->process_cb = process_cb;
162 : 200 : new_step->free_result = free_result;
163 : 200 : new_step->arg = arg;
164 : :
165 : 200 : appendPQExpBuffer(task->queries, "%s;", query);
166 : 200 : }
167 : :
168 : : /*
169 : : * Build a connection string for the slot's current database and asynchronously
170 : : * start a new connection, but do not wait for the connection to be
171 : : * established.
172 : : */
173 : : static void
174 : 268 : start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
175 : : {
176 : : PQExpBufferData conn_opts;
177 : 268 : DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178 : :
179 : : /* Build connection string with proper quoting */
180 : 268 : initPQExpBuffer(&conn_opts);
181 : 268 : appendPQExpBufferStr(&conn_opts, "dbname=");
182 : 268 : appendConnStrVal(&conn_opts, dbinfo->db_name);
183 : 268 : appendPQExpBufferStr(&conn_opts, " user=");
184 : 268 : appendConnStrVal(&conn_opts, os_info.user);
185 : 268 : appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186 [ + - ]: 268 : if (cluster->sockdir)
187 : : {
188 : 268 : appendPQExpBufferStr(&conn_opts, " host=");
189 : 268 : appendConnStrVal(&conn_opts, cluster->sockdir);
190 : : }
191 : :
192 : 268 : slot->conn = PQconnectStart(conn_opts.data);
193 : :
194 [ - + ]: 268 : if (!slot->conn)
82 peter@eisentraut.org 195 :UBC 0 : pg_fatal("out of memory");
196 : :
355 nathan@postgresql.or 197 :CBC 268 : termPQExpBuffer(&conn_opts);
198 : 268 : }
199 : :
200 : : /*
201 : : * Run the process_cb callback function to process the result of a query, and
202 : : * free the result if the caller indicated we should do so.
203 : : */
204 : : static void
205 : 628 : process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
206 : : const UpgradeTask *task)
207 : : {
208 : 628 : UpgradeTaskStep *steps = &task->steps[slot->step_idx];
209 : 628 : UpgradeTaskProcessCB process_cb = steps->process_cb;
210 : 628 : DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
211 : 628 : PGresult *res = PQgetResult(slot->conn);
212 : :
213 [ + - - + ]: 1256 : if (PQstatus(slot->conn) == CONNECTION_BAD ||
214 [ - - ]: 628 : (PQresultStatus(res) != PGRES_TUPLES_OK &&
355 nathan@postgresql.or 215 :UBC 0 : PQresultStatus(res) != PGRES_COMMAND_OK))
216 : 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
217 : :
218 : : /*
219 : : * We assume that a NULL process_cb callback function means there's
220 : : * nothing to process. This is primarily intended for the initial step in
221 : : * every task that sets a safe search_path.
222 : : */
355 nathan@postgresql.or 223 [ + + ]:CBC 628 : if (process_cb)
224 : 360 : (*process_cb) (dbinfo, res, steps->arg);
225 : :
226 [ + + ]: 628 : if (steps->free_result)
227 : 580 : PQclear(res);
228 : 628 : }
229 : :
230 : : /*
231 : : * Advances the state machine for a given slot as necessary.
232 : : */
233 : : static void
234 : 1197 : process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
235 : : {
236 : : PostgresPollingStatusType status;
237 : :
238 [ - + ]: 1197 : if (!slot->ready)
355 nathan@postgresql.or 239 :UBC 0 : return;
240 : :
355 nathan@postgresql.or 241 [ + + + - ]:CBC 1197 : switch (slot->state)
242 : : {
243 : 354 : case FREE:
244 : :
245 : : /*
246 : : * If all of the databases in the cluster have been processed or
247 : : * are currently being processed by other slots, we are done.
248 : : */
249 [ + + ]: 354 : if (dbs_processing >= cluster->dbarr.ndbs)
250 : 86 : return;
251 : :
252 : : /*
253 : : * Claim the next database in the cluster's array and initiate a
254 : : * new connection.
255 : : */
256 : 268 : slot->db_idx = dbs_processing++;
257 : 268 : slot->state = CONNECTING;
258 : 268 : start_conn(cluster, slot);
259 : :
260 : 268 : return;
261 : :
262 : 536 : case CONNECTING:
263 : :
264 : : /* Check for connection failure. */
265 : 536 : status = PQconnectPoll(slot->conn);
266 [ - + ]: 536 : if (status == PGRES_POLLING_FAILED)
355 nathan@postgresql.or 267 :UBC 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
268 : :
269 : : /* Check whether the connection is still establishing. */
355 nathan@postgresql.or 270 [ + + ]:CBC 536 : if (status != PGRES_POLLING_OK)
271 : : {
272 : 268 : slot->select_mode = (status == PGRES_POLLING_READING);
273 : 268 : return;
274 : : }
275 : :
276 : : /*
277 : : * Move on to running/processing the queries in the task.
278 : : */
279 : 268 : slot->state = RUNNING_QUERIES;
280 : 268 : slot->select_mode = true; /* wait until ready for reading */
281 [ - + ]: 268 : if (!PQsendQuery(slot->conn, task->queries->data))
355 nathan@postgresql.or 282 :UBC 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
283 : :
355 nathan@postgresql.or 284 :CBC 268 : return;
285 : :
286 : 307 : case RUNNING_QUERIES:
287 : :
288 : : /*
289 : : * Consume any available data and clear the read-ready indicator
290 : : * for the connection.
291 : : */
292 [ - + ]: 307 : if (!PQconsumeInput(slot->conn))
355 nathan@postgresql.or 293 :UBC 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
294 : :
295 : : /*
296 : : * Process any results that are ready so that we can free up this
297 : : * slot for another database as soon as possible.
298 : : */
355 nathan@postgresql.or 299 [ + + ]:CBC 935 : for (; slot->step_idx < task->num_steps; slot->step_idx++)
300 : : {
301 : : /* If no more results are available yet, move on. */
302 [ + + ]: 667 : if (PQisBusy(slot->conn))
303 : 39 : return;
304 : :
305 : 628 : process_query_result(cluster, slot, task);
306 : : }
307 : :
308 : : /*
309 : : * If we just finished processing the result of the last step in
310 : : * the task, free the slot. We recursively call this function on
311 : : * the newly-freed slot so that we can start initiating the next
312 : : * connection immediately instead of waiting for the next loop
313 : : * through the slots.
314 : : */
315 : 268 : dbs_complete++;
316 : 268 : PQfinish(slot->conn);
317 : 268 : memset(slot, 0, sizeof(UpgradeTaskSlot));
318 : 268 : slot->ready = true;
319 : :
320 : 268 : process_slot(cluster, slot, task);
321 : :
322 : 268 : return;
323 : : }
324 : : }
325 : :
326 : : /*
327 : : * Returns -1 on error, else the number of ready descriptors.
328 : : */
329 : : static int
330 : 929 : select_loop(int maxFd, fd_set *input, fd_set *output)
331 : : {
332 : 929 : fd_set save_input = *input;
333 : 929 : fd_set save_output = *output;
334 : :
335 [ + + ]: 929 : if (maxFd == 0)
336 : 86 : return 0;
337 : :
338 : : for (;;)
355 nathan@postgresql.or 339 :UBC 0 : {
340 : : int i;
341 : :
355 nathan@postgresql.or 342 :CBC 843 : *input = save_input;
343 : 843 : *output = save_output;
344 : :
345 : 843 : i = select(maxFd + 1, input, output, NULL, NULL);
346 : :
347 : : #ifndef WIN32
348 [ - + - - ]: 843 : if (i < 0 && errno == EINTR)
355 nathan@postgresql.or 349 :UBC 0 : continue;
350 : : #else
351 : : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
352 : : continue;
353 : : #endif
355 nathan@postgresql.or 354 :CBC 843 : return i;
355 : : }
356 : : }
357 : :
358 : : /*
359 : : * Wait on the slots to either finish connecting or to receive query results if
360 : : * possible. This avoids a tight loop in upgrade_task_run().
361 : : */
362 : : static void
363 : 929 : wait_on_slots(UpgradeTaskSlot *slots, int numslots)
364 : : {
365 : : fd_set input;
366 : : fd_set output;
367 : 929 : int maxFd = 0;
368 : :
369 [ + + ]: 15793 : FD_ZERO(&input);
370 [ + + ]: 15793 : FD_ZERO(&output);
371 : :
372 [ + + ]: 1858 : for (int i = 0; i < numslots; i++)
373 : : {
374 : : /*
375 : : * We assume the previous call to process_slot() handled everything
376 : : * that was marked ready in the previous call to wait_on_slots(), if
377 : : * any.
378 : : */
379 : 929 : slots[i].ready = false;
380 : :
381 : : /*
382 : : * This function should only ever see free slots as we are finishing
383 : : * processing the last few databases, at which point we don't have any
384 : : * databases left for them to process. We'll never use these slots
385 : : * again, so we can safely ignore them.
386 : : */
387 [ + + ]: 929 : if (slots[i].state == FREE)
388 : 86 : continue;
389 : :
390 : : /*
391 : : * Add the socket to the set.
392 : : */
393 : 843 : slots[i].sock = PQsocket(slots[i].conn);
394 [ - + ]: 843 : if (slots[i].sock < 0)
355 nathan@postgresql.or 395 :UBC 0 : pg_fatal("invalid socket");
355 nathan@postgresql.or 396 [ + + + + ]:CBC 843 : FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
397 : 843 : maxFd = Max(maxFd, slots[i].sock);
398 : : }
399 : :
400 : : /*
401 : : * If we found socket(s) to wait on, wait.
402 : : */
403 [ - + ]: 929 : if (select_loop(maxFd, &input, &output) == -1)
82 peter@eisentraut.org 404 :UBC 0 : pg_fatal("%s() failed: %m", "select");
405 : :
406 : : /*
407 : : * Mark which sockets appear to be ready.
408 : : */
355 nathan@postgresql.or 409 [ + + ]:CBC 1858 : for (int i = 0; i < numslots; i++)
410 [ + + ]: 1283 : slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
411 [ + + ]: 354 : FD_ISSET(slots[i].sock, &output));
412 : 929 : }
413 : :
414 : : /*
415 : : * Runs all the steps of the task in every database in the cluster using
416 : : * user_opts.jobs parallel slots.
417 : : */
418 : : void
419 : 86 : upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
420 : : {
421 : 86 : int jobs = Max(1, user_opts.jobs);
422 : 86 : UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
423 : :
424 : 86 : dbs_complete = 0;
425 : 86 : dbs_processing = 0;
426 : :
427 : : /*
428 : : * Process every slot the first time round.
429 : : */
430 [ + + ]: 172 : for (int i = 0; i < jobs; i++)
431 : 86 : slots[i].ready = true;
432 : :
433 [ + + ]: 1015 : while (dbs_complete < cluster->dbarr.ndbs)
434 : : {
435 [ + + ]: 1858 : for (int i = 0; i < jobs; i++)
436 : 929 : process_slot(cluster, &slots[i], task);
437 : :
438 : 929 : wait_on_slots(slots, jobs);
439 : : }
440 : :
441 : 86 : pg_free(slots);
442 : 86 : }
|