Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel_slot.c
4 : : * Parallel support for front-end parallel database connections
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * src/fe_utils/parallel_slot.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #if defined(WIN32) && FD_SETSIZE < 1024
16 : : #error FD_SETSIZE needs to have been increased
17 : : #endif
18 : :
19 : : #include "postgres_fe.h"
20 : :
21 : : #include <sys/select.h>
22 : :
23 : : #include "common/logging.h"
24 : : #include "fe_utils/cancel.h"
25 : : #include "fe_utils/parallel_slot.h"
26 : : #include "fe_utils/query_utils.h"
27 : :
28 : : #define ERRCODE_UNDEFINED_TABLE "42P01"
29 : :
30 : : static int select_loop(int maxFd, fd_set *workerset);
31 : : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32 : :
33 : : /*
34 : : * Process (and delete) a query result. Returns true if there's no problem,
35 : : * false otherwise. It's up to the handler to decide what constitutes a
36 : : * problem.
37 : : */
38 : : static bool
1774 rhaas@postgresql.org 39 :CBC 13084 : processQueryResult(ParallelSlot *slot, PGresult *result)
40 : : {
41 [ - + ]: 13084 : Assert(slot->handler != NULL);
42 : :
43 : : /* On failure, the handler should return NULL after freeing the result */
44 [ + + ]: 13084 : if (!slot->handler(result, slot->connection, slot->handler_context))
45 : 6 : return false;
46 : :
47 : : /* Ok, we have to free it ourself */
48 : 13078 : PQclear(result);
49 : 13078 : return true;
50 : : }
51 : :
52 : : /*
53 : : * Consume all the results generated for the given connection until
54 : : * nothing remains. If at least one error is encountered, return false.
55 : : * Note that this will block if the connection is busy.
56 : : */
57 : : static bool
58 : 252 : consumeQueryResult(ParallelSlot *slot)
59 : : {
60 : 252 : bool ok = true;
61 : : PGresult *result;
62 : :
63 : 252 : SetCancelConn(slot->connection);
64 [ + + ]: 503 : while ((result = PQgetResult(slot->connection)) != NULL)
65 : : {
66 [ + + ]: 251 : if (!processQueryResult(slot, result))
67 : 6 : ok = false;
68 : : }
69 : 252 : ResetCancelConn();
70 : 252 : return ok;
71 : : }
72 : :
73 : : /*
74 : : * Wait until a file descriptor from the given set becomes readable.
75 : : *
76 : : * Returns the number of ready descriptors, or -1 on failure (including
77 : : * getting a cancel request).
78 : : */
79 : : static int
2017 tgl@sss.pgh.pa.us 80 : 12881 : select_loop(int maxFd, fd_set *workerset)
81 : : {
82 : : int i;
2341 michael@paquier.xyz 83 : 12881 : fd_set saveSet = *workerset;
84 : :
85 [ - + ]: 12881 : if (CancelRequested)
2341 michael@paquier.xyz 86 :UBC 0 : return -1;
87 : :
88 : : for (;;)
89 : 0 : {
90 : : /*
91 : : * On Windows, we need to check once in a while for cancel requests;
92 : : * on other platforms we rely on select() returning when interrupted.
93 : : */
94 : : struct timeval *tvp;
95 : : #ifdef WIN32
96 : : struct timeval tv = {0, 1000000};
97 : :
98 : : tvp = &tv;
99 : : #else
2341 michael@paquier.xyz 100 :CBC 12881 : tvp = NULL;
101 : : #endif
102 : :
103 : 12881 : *workerset = saveSet;
104 : 12881 : i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 : :
106 : : #ifdef WIN32
107 : : if (i == SOCKET_ERROR)
108 : : {
109 : : i = -1;
110 : :
111 : : if (WSAGetLastError() == WSAEINTR)
112 : : errno = EINTR;
113 : : }
114 : : #endif
115 : :
116 [ - + - - ]: 12881 : if (i < 0 && errno == EINTR)
2341 michael@paquier.xyz 117 :UBC 0 : continue; /* ignore this */
2341 michael@paquier.xyz 118 [ + - - + ]:CBC 12881 : if (i < 0 || CancelRequested)
2017 tgl@sss.pgh.pa.us 119 :UBC 0 : return -1; /* but not this */
2341 michael@paquier.xyz 120 [ - + ]:CBC 12881 : if (i == 0)
2341 michael@paquier.xyz 121 :UBC 0 : continue; /* timeout (Win32 only) */
2341 michael@paquier.xyz 122 :CBC 12881 : break;
123 : : }
124 : :
125 : 12881 : return i;
126 : : }
127 : :
128 : : /*
129 : : * Return the offset of a suitable idle slot, or -1 if none are available. If
130 : : * the given dbname is not null, only idle slots connected to the given
131 : : * database are considered suitable, otherwise all idle connected slots are
132 : : * considered suitable.
133 : : */
134 : : static int
1740 rhaas@postgresql.org 135 : 25965 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
136 : : {
137 : : int i;
138 : :
139 [ + + ]: 38982 : for (i = 0; i < sa->numslots; i++)
140 : : {
141 [ + + ]: 26080 : if (sa->slots[i].inUse)
142 : 12996 : continue;
143 : :
144 [ + + ]: 13084 : if (sa->slots[i].connection == NULL)
145 : 7 : continue;
146 : :
147 [ + + ]: 13077 : if (dbname == NULL ||
148 [ + + ]: 7732 : strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 : 13063 : return i;
150 : : }
151 : 12902 : return -1;
152 : : }
153 : :
154 : : /*
155 : : * Return the offset of the first slot without a database connection, or -1 if
156 : : * all slots are connected.
157 : : */
158 : : static int
159 : 13081 : find_unconnected_slot(const ParallelSlotArray *sa)
160 : : {
161 : : int i;
162 : :
163 [ + + ]: 26051 : for (i = 0; i < sa->numslots; i++)
164 : : {
165 [ + + ]: 13156 : if (sa->slots[i].inUse)
166 : 12956 : continue;
167 : :
168 [ + + ]: 200 : if (sa->slots[i].connection == NULL)
169 : 186 : return i;
170 : : }
171 : :
172 : 12895 : return -1;
173 : : }
174 : :
175 : : /*
176 : : * Return the offset of the first idle slot, or -1 if all slots are busy.
177 : : */
178 : : static int
179 : 12895 : find_any_idle_slot(const ParallelSlotArray *sa)
180 : : {
181 : : int i;
182 : :
183 [ + + ]: 25847 : for (i = 0; i < sa->numslots; i++)
184 [ + + ]: 12966 : if (!sa->slots[i].inUse)
185 : 14 : return i;
186 : :
187 : 12881 : return -1;
188 : : }
189 : :
190 : : /*
191 : : * Wait for any slot's connection to have query results, consume the results,
192 : : * and update the slot's status as appropriate. Returns true on success,
193 : : * false on cancellation, on error, or if no slots are connected.
194 : : */
195 : : static bool
196 : 12881 : wait_on_slots(ParallelSlotArray *sa)
197 : : {
198 : : int i;
199 : : fd_set slotset;
200 : 12881 : int maxFd = 0;
201 : 12881 : PGconn *cancelconn = NULL;
202 : :
203 : : /* We must reconstruct the fd_set for each call to select_loop */
204 [ + + ]: 218977 : FD_ZERO(&slotset);
205 : :
206 [ + + ]: 25833 : for (i = 0; i < sa->numslots; i++)
207 : : {
208 : : int sock;
209 : :
210 : : /* We shouldn't get here if we still have slots without connections */
211 [ - + ]: 12952 : Assert(sa->slots[i].connection != NULL);
212 : :
213 : 12952 : sock = PQsocket(sa->slots[i].connection);
214 : :
215 : : /*
216 : : * We don't really expect any connections to lose their sockets after
217 : : * startup, but just in case, cope by ignoring them.
218 : : */
219 [ - + ]: 12952 : if (sock < 0)
1740 rhaas@postgresql.org 220 :UBC 0 : continue;
221 : :
222 : : /* Keep track of the first valid connection we see. */
1740 rhaas@postgresql.org 223 [ + + ]:CBC 12952 : if (cancelconn == NULL)
224 : 12881 : cancelconn = sa->slots[i].connection;
225 : :
226 : 12952 : FD_SET(sock, &slotset);
227 [ + - ]: 12952 : if (sock > maxFd)
228 : 12952 : maxFd = sock;
229 : : }
230 : :
231 : : /*
232 : : * If we get this far with no valid connections, processing cannot
233 : : * continue.
234 : : */
235 [ - + ]: 12881 : if (cancelconn == NULL)
1740 rhaas@postgresql.org 236 :UBC 0 : return false;
237 : :
1206 michael@paquier.xyz 238 :CBC 12881 : SetCancelConn(cancelconn);
1740 rhaas@postgresql.org 239 : 12881 : i = select_loop(maxFd, &slotset);
240 : 12881 : ResetCancelConn();
241 : :
242 : : /* failure? */
243 [ - + ]: 12881 : if (i < 0)
1740 rhaas@postgresql.org 244 :UBC 0 : return false;
245 : :
1740 rhaas@postgresql.org 246 [ + + ]:CBC 25833 : for (i = 0; i < sa->numslots; i++)
247 : : {
248 : : int sock;
249 : :
250 : 12952 : sock = PQsocket(sa->slots[i].connection);
251 : :
252 [ + - + + ]: 12952 : if (sock >= 0 && FD_ISSET(sock, &slotset))
253 : : {
254 : : /* select() says input is available, so consume it */
255 : 12882 : PQconsumeInput(sa->slots[i].connection);
256 : : }
257 : :
258 : : /* Collect result(s) as long as any are available */
259 [ + + ]: 25785 : while (!PQisBusy(sa->slots[i].connection))
260 : : {
261 : 25666 : PGresult *result = PQgetResult(sa->slots[i].connection);
262 : :
263 [ + + ]: 25666 : if (result != NULL)
264 : : {
265 : : /* Handle and discard the command result */
266 [ - + ]: 12833 : if (!processQueryResult(&sa->slots[i], result))
1740 rhaas@postgresql.org 267 :UBC 0 : return false;
268 : : }
269 : : else
270 : : {
271 : : /* This connection has become idle */
6 nathan@postgresql.or 272 :GNC 12833 : ParallelSlotSetIdle(&sa->slots[i]);
1740 rhaas@postgresql.org 273 :CBC 12833 : break;
274 : : }
275 : : }
276 : : }
277 : 12881 : return true;
278 : : }
279 : :
280 : : /*
281 : : * Open a new database connection using the stored connection parameters and
282 : : * optionally a given dbname if not null, execute the stored initial command if
283 : : * any, and associate the new connection with the given slot.
284 : : */
285 : : static void
286 : 21 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
287 : : {
288 : : const char *old_override;
289 : 21 : ParallelSlot *slot = &sa->slots[slotno];
290 : :
291 : 21 : old_override = sa->cparams->override_dbname;
292 [ + + ]: 21 : if (dbname)
293 : 17 : sa->cparams->override_dbname = dbname;
294 : 21 : slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
295 : 21 : sa->cparams->override_dbname = old_override;
296 : :
297 : : /*
298 : : * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
299 : : * FD_SET() and allied macros. Windows defines it as a ceiling on the
300 : : * count of file descriptors in the set, not a ceiling on the value of
301 : : * each file descriptor; see
302 : : * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
303 : : * and
304 : : * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
305 : : * We can't ignore that, because Windows starts file descriptors at a
306 : : * higher value, delays reuse, and skips values. With less than ten
307 : : * concurrent file descriptors, opened and closed rapidly, one can reach
308 : : * file descriptor 1024.
309 : : *
310 : : * Doing a hard exit here is a bit grotty, but it doesn't seem worth
311 : : * complicating the API to make it less grotty.
312 : : */
313 : : #ifdef WIN32
314 : : if (slotno >= FD_SETSIZE)
315 : : {
316 : : pg_log_error("too many jobs for this platform: %d", slotno);
317 : : exit(1);
318 : : }
319 : : #else
320 : : {
793 noah@leadboat.com 321 : 21 : int fd = PQsocket(slot->connection);
322 : :
323 [ - + ]: 21 : if (fd >= FD_SETSIZE)
324 : : {
793 noah@leadboat.com 325 :UBC 0 : pg_log_error("socket file descriptor out of range for select(): %d",
326 : : fd);
327 : 0 : pg_log_error_hint("Try fewer jobs.");
328 : 0 : exit(1);
329 : : }
330 : : }
331 : : #endif
332 : :
333 : : /* Setup the connection using the supplied command, if any. */
1740 rhaas@postgresql.org 334 [ - + ]:CBC 21 : if (sa->initcmd)
1740 rhaas@postgresql.org 335 :UBC 0 : executeCommand(slot->connection, sa->initcmd, sa->echo);
2341 michael@paquier.xyz 336 :CBC 21 : }
337 : :
338 : : /*
339 : : * ParallelSlotsGetIdle
340 : : * Return a connection slot that is ready to execute a command.
341 : : *
342 : : * The slot returned is chosen as follows:
343 : : *
344 : : * If any idle slot already has an open connection, and if either dbname is
345 : : * null or the existing connection is to the given database, that slot will be
346 : : * returned allowing the connection to be reused.
347 : : *
348 : : * Otherwise, if any idle slot is not yet connected to any database, the slot
349 : : * will be returned with its connection opened using the stored cparams and
350 : : * optionally the given dbname if not null.
351 : : *
352 : : * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
353 : : * after having its connection disconnected and reconnected using the stored
354 : : * cparams and optionally the given dbname if not null.
355 : : *
356 : : * Otherwise, if any slots have connections that are busy, we loop on select()
357 : : * until one socket becomes available. When this happens, we read the whole
358 : : * set and mark as free all sockets that become available. We then select a
359 : : * slot using the same rules as above.
360 : : *
361 : : * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
362 : : *
363 : : * For any connection created, if the stored initcmd is not null, it will be
364 : : * executed as a command on the newly formed connection before the slot is
365 : : * returned.
366 : : *
367 : : * If an error occurs, NULL is returned.
368 : : */
369 : : ParallelSlot *
1740 rhaas@postgresql.org 370 : 13084 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
371 : : {
372 : : int offset;
373 : :
374 [ - + ]: 13084 : Assert(sa);
375 [ + - ]: 13084 : Assert(sa->numslots > 0);
376 : :
377 : : while (1)
378 : : {
379 : : /* First choice: a slot already connected to the desired database. */
380 : 25965 : offset = find_matching_idle_slot(sa, dbname);
381 [ + + ]: 25965 : if (offset >= 0)
382 : : {
383 : 13063 : sa->slots[offset].inUse = true;
384 : 13063 : return &sa->slots[offset];
385 : : }
386 : :
387 : : /* Second choice: a slot not connected to any database. */
388 : 12902 : offset = find_unconnected_slot(sa);
389 [ + + ]: 12902 : if (offset >= 0)
390 : : {
391 : 7 : connect_slot(sa, offset, dbname);
392 : 7 : sa->slots[offset].inUse = true;
393 : 7 : return &sa->slots[offset];
394 : : }
395 : :
396 : : /* Third choice: a slot connected to the wrong database. */
397 : 12895 : offset = find_any_idle_slot(sa);
398 [ + + ]: 12895 : if (offset >= 0)
399 : : {
400 : 14 : disconnectDatabase(sa->slots[offset].connection);
401 : 14 : sa->slots[offset].connection = NULL;
402 : 14 : connect_slot(sa, offset, dbname);
403 : 14 : sa->slots[offset].inUse = true;
404 : 14 : return &sa->slots[offset];
405 : : }
406 : :
407 : : /*
408 : : * Fourth choice: block until one or more slots become available. If
409 : : * any slots hit a fatal error, we'll find out about that here and
410 : : * return NULL.
411 : : */
412 [ - + ]: 12881 : if (!wait_on_slots(sa))
1740 rhaas@postgresql.org 413 :UBC 0 : return NULL;
414 : : }
415 : : }
416 : :
417 : : /*
418 : : * ParallelSlotsSetup
419 : : * Prepare a set of parallel slots but do not connect to any database.
420 : : *
421 : : * This creates and initializes a set of slots, marking all parallel slots as
422 : : * free and ready to use. Establishing connections is delayed until requesting
423 : : * a free slot. The cparams, progname, echo, and initcmd are stored for later
424 : : * use and must remain valid for the lifetime of the returned array.
425 : : */
426 : : ParallelSlotArray *
1740 rhaas@postgresql.org 427 :CBC 182 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
428 : : bool echo, const char *initcmd)
429 : : {
430 : : ParallelSlotArray *sa;
431 : :
432 [ - + ]: 182 : Assert(numslots > 0);
433 [ - + ]: 182 : Assert(cparams != NULL);
434 [ - + ]: 182 : Assert(progname != NULL);
435 : :
436 : 182 : sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
437 : 182 : numslots * sizeof(ParallelSlot));
438 : :
439 : 182 : sa->numslots = numslots;
440 : 182 : sa->cparams = cparams;
441 : 182 : sa->progname = progname;
442 : 182 : sa->echo = echo;
443 : 182 : sa->initcmd = initcmd;
444 : :
445 : 182 : return sa;
446 : : }
447 : :
448 : : /*
449 : : * ParallelSlotsAdoptConn
450 : : * Assign an open connection to the slots array for reuse.
451 : : *
452 : : * This turns over ownership of an open connection to a slots array. The
453 : : * caller should not further use or close the connection. All the connection's
454 : : * parameters (user, host, port, etc.) except possibly dbname should match
455 : : * those of the slots array's cparams, as given in ParallelSlotsSetup. If
456 : : * these parameters differ, subsequent behavior is undefined.
457 : : */
458 : : void
459 : 179 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
460 : : {
461 : : int offset;
462 : :
463 : 179 : offset = find_unconnected_slot(sa);
464 [ + - ]: 179 : if (offset >= 0)
465 : 179 : sa->slots[offset].connection = conn;
466 : : else
1740 rhaas@postgresql.org 467 :UBC 0 : disconnectDatabase(conn);
2341 michael@paquier.xyz 468 :CBC 179 : }
469 : :
470 : : /*
471 : : * ParallelSlotsTerminate
472 : : * Clean up a set of parallel slots
473 : : *
474 : : * Iterate through all connections in a given set of ParallelSlots and
475 : : * terminate all connections.
476 : : */
477 : : void
1740 rhaas@postgresql.org 478 : 182 : ParallelSlotsTerminate(ParallelSlotArray *sa)
479 : : {
480 : : int i;
481 : :
482 [ + + ]: 368 : for (i = 0; i < sa->numslots; i++)
483 : : {
484 : 186 : PGconn *conn = sa->slots[i].connection;
485 : :
2341 michael@paquier.xyz 486 [ - + ]: 186 : if (conn == NULL)
2341 michael@paquier.xyz 487 :UBC 0 : continue;
488 : :
2341 michael@paquier.xyz 489 :CBC 186 : disconnectDatabase(conn);
490 : : }
491 : 182 : }
492 : :
493 : : /*
494 : : * ParallelSlotsWaitCompletion
495 : : *
496 : : * Wait for all connections to finish, returning false if at least one
497 : : * error has been found on the way.
498 : : */
499 : : bool
1740 rhaas@postgresql.org 500 : 247 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
501 : : {
502 : : int i;
503 : :
504 [ + + ]: 493 : for (i = 0; i < sa->numslots; i++)
505 : : {
506 [ - + ]: 252 : if (sa->slots[i].connection == NULL)
1740 rhaas@postgresql.org 507 :UBC 0 : continue;
1740 rhaas@postgresql.org 508 [ + + ]:CBC 252 : if (!consumeQueryResult(&sa->slots[i]))
1774 509 : 6 : return false;
510 : : /* Mark connection as idle */
6 nathan@postgresql.or 511 :GNC 246 : ParallelSlotSetIdle(&sa->slots[i]);
512 : : }
513 : :
1774 rhaas@postgresql.org 514 :CBC 241 : return true;
515 : : }
516 : :
517 : : /*
518 : : * TableCommandResultHandler
519 : : *
520 : : * ParallelSlotResultHandler for results of commands (not queries) against
521 : : * tables.
522 : : *
523 : : * Requires that the result status is either PGRES_COMMAND_OK or an error about
524 : : * a missing table. This is useful for utilities that compile a list of tables
525 : : * to process and then run commands (vacuum, reindex, or whatever) against
526 : : * those tables, as there is a race condition between the time the list is
527 : : * compiled and the time the command attempts to open the table.
528 : : *
529 : : * For missing tables, logs an error but allows processing to continue.
530 : : *
531 : : * For all other errors, logs an error and terminates further processing.
532 : : *
533 : : * res: PGresult from the query executed on the slot's connection
534 : : * conn: connection belonging to the slot
535 : : * context: unused
536 : : */
537 : : bool
538 : 5349 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
539 : : {
1740 540 [ - + ]: 5349 : Assert(res != NULL);
541 [ - + ]: 5349 : Assert(conn != NULL);
542 : :
543 : : /*
544 : : * If it's an error, report it. Errors about a missing table are harmless
545 : : * so we continue processing; but die for other errors.
546 : : */
1774 547 [ + + ]: 5349 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
548 : : {
549 : 6 : char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
550 : :
551 : 6 : pg_log_error("processing of database \"%s\" failed: %s",
552 : : PQdb(conn), PQerrorMessage(conn));
553 : :
554 [ + - + - ]: 6 : if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
555 : : {
556 : 6 : PQclear(res);
2341 michael@paquier.xyz 557 : 6 : return false;
558 : : }
559 : : }
560 : :
561 : 5343 : return true;
562 : : }
|