Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : *
5 : : * Parallel support for pg_dump and pg_restore
6 : : *
7 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_dump/parallel.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : /*
17 : : * Parallel operation works like this:
18 : : *
19 : : * The original, leader process calls ParallelBackupStart(), which forks off
20 : : * the desired number of worker processes, which each enter WaitForCommands().
21 : : *
22 : : * The leader process dispatches an individual work item to one of the worker
23 : : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : : * The worker process receives and decodes the command and passes it to the
26 : : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : : * which are routines of the current archive format. That routine performs
28 : : * the required action (dump or restore) and returns an integer status code.
29 : : * This is passed back to the leader where we pass it to the
30 : : * ParallelCompletionPtr callback function that was passed to
31 : : * DispatchJobForTocEntry(). The callback function does state updating
32 : : * for the leader control logic in pg_backup_archiver.c.
33 : : *
34 : : * In principle additional archive-format-specific information might be needed
35 : : * in commands or worker status responses, but so far that hasn't proved
36 : : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : : * data structures. Remember that we have forked off the workers only after
38 : : * we have read in the catalog. That's why our worker processes can also
39 : : * access the catalog information. (In the Windows case, the workers are
40 : : * threads in the same process. To avoid problems, they work with cloned
41 : : * copies of the Archive data structure; see RunWorker().)
42 : : *
43 : : * In the leader process, the workerStatus field for each worker has one of
44 : : * the following values:
45 : : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : : * WRKR_IDLE: it's waiting for a command
47 : : * WRKR_WORKING: it's working on a command
48 : : * WRKR_TERMINATED: process ended
49 : : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : : * state, and must be NULL in other states.
51 : : */
52 : :
53 : : #include "postgres_fe.h"
54 : :
55 : : #ifndef WIN32
56 : : #include <sys/select.h>
57 : : #include <sys/wait.h>
58 : : #include <signal.h>
59 : : #include <unistd.h>
60 : : #include <fcntl.h>
61 : : #endif
62 : :
63 : : #include "fe_utils/string_utils.h"
64 : : #include "parallel.h"
65 : : #include "pg_backup_utils.h"
66 : : #ifdef WIN32
67 : : #include "port/pg_bswap.h"
68 : : #endif
69 : :
70 : : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 : : #define PIPE_READ 0
72 : : #define PIPE_WRITE 1
73 : :
74 : : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 : :
76 : : /* Worker process statuses */
77 : : typedef enum
78 : : {
79 : : WRKR_NOT_STARTED = 0,
80 : : WRKR_IDLE,
81 : : WRKR_WORKING,
82 : : WRKR_TERMINATED,
83 : : } T_WorkerStatus;
84 : :
85 : : #define WORKER_IS_RUNNING(workerStatus) \
86 : : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 : :
88 : : /*
89 : : * Private per-parallel-worker state (typedef for this is in parallel.h).
90 : : *
91 : : * Much of this is valid only in the leader process (or, on Windows, should
92 : : * be touched only by the leader thread). But the AH field should be touched
93 : : * only by workers. The pipe descriptors are valid everywhere.
94 : : */
95 : : struct ParallelSlot
96 : : {
97 : : T_WorkerStatus workerStatus; /* see enum above */
98 : :
99 : : /* These fields are valid if workerStatus == WRKR_WORKING: */
100 : : ParallelCompletionPtr callback; /* function to call on completion */
101 : : void *callback_data; /* passthrough data for it */
102 : :
103 : : ArchiveHandle *AH; /* Archive data worker is using */
104 : :
105 : : int pipeRead; /* leader's end of the pipes */
106 : : int pipeWrite;
107 : : int pipeRevRead; /* child's end of the pipes */
108 : : int pipeRevWrite;
109 : :
110 : : /* Child process/thread identity info: */
111 : : #ifdef WIN32
112 : : uintptr_t hThread;
113 : : unsigned int threadId;
114 : : #else
115 : : pid_t pid;
116 : : #endif
117 : : };
118 : :
119 : : #ifdef WIN32
120 : :
121 : : /*
122 : : * Structure to hold info passed by _beginthreadex() to the function it calls
123 : : * via its single allowed argument.
124 : : */
125 : : typedef struct
126 : : {
127 : : ArchiveHandle *AH; /* leader database connection */
128 : : ParallelSlot *slot; /* this worker's parallel slot */
129 : : } WorkerInfo;
130 : :
131 : : /* Windows implementation of pipe access */
132 : : static int pgpipe(int handles[2]);
133 : : #define piperead(a,b,c) recv(a,b,c,0)
134 : : #define pipewrite(a,b,c) send(a,b,c,0)
135 : :
136 : : #else /* !WIN32 */
137 : :
138 : : /* Non-Windows implementation of pipe access */
139 : : #define pgpipe(a) pipe(a)
140 : : #define piperead(a,b,c) read(a,b,c)
141 : : #define pipewrite(a,b,c) write(a,b,c)
142 : :
143 : : #endif /* WIN32 */
144 : :
145 : : /*
146 : : * State info for archive_close_connection() shutdown callback.
147 : : */
148 : : typedef struct ShutdownInformation
149 : : {
150 : : ParallelState *pstate;
151 : : Archive *AHX;
152 : : } ShutdownInformation;
153 : :
154 : : static ShutdownInformation shutdown_info;
155 : :
156 : : /*
157 : : * State info for signal handling.
158 : : * We assume signal_info initializes to zeroes.
159 : : *
160 : : * On Unix, myAH is the leader DB connection in the leader process, and the
161 : : * worker's own connection in worker processes. On Windows, we have only one
162 : : * instance of signal_info, so myAH is the leader connection and the worker
163 : : * connections must be dug out of pstate->parallelSlot[].
164 : : */
165 : : typedef struct DumpSignalInformation
166 : : {
167 : : ArchiveHandle *myAH; /* database connection to issue cancel for */
168 : : ParallelState *pstate; /* parallel state, if any */
169 : : bool handler_set; /* signal handler set up in this process? */
170 : : #ifndef WIN32
171 : : bool am_worker; /* am I a worker process? */
172 : : #endif
173 : : } DumpSignalInformation;
174 : :
175 : : static volatile DumpSignalInformation signal_info;
176 : :
177 : : #ifdef WIN32
178 : : static CRITICAL_SECTION signal_info_lock;
179 : : #endif
180 : :
181 : : /*
182 : : * Write a simple string to stderr --- must be safe in a signal handler.
183 : : * We ignore the write() result since there's not much we could do about it.
184 : : * Certain compilers make that harder than it ought to be.
185 : : */
186 : : #define write_stderr(str) \
187 : : do { \
188 : : const char *str_ = (str); \
189 : : int rc_; \
190 : : rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 : : (void) rc_; \
192 : : } while (0)
193 : :
194 : :
195 : : #ifdef WIN32
196 : : /* file-scope variables */
197 : : static DWORD tls_index;
198 : :
199 : : /* globally visible variables (needed by exit_nicely) */
200 : : bool parallel_init_done = false;
201 : : DWORD mainThreadId;
202 : : #endif /* WIN32 */
203 : :
204 : : /* Local function prototypes */
205 : : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 : : static void archive_close_connection(int code, void *arg);
207 : : static void ShutdownWorkersHard(ParallelState *pstate);
208 : : static void WaitForTerminatingWorkers(ParallelState *pstate);
209 : : static void set_cancel_handler(void);
210 : : static void set_cancel_pstate(ParallelState *pstate);
211 : : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 : : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 : : static int GetIdleWorker(ParallelState *pstate);
214 : : static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 : : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 : : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 : : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 : : bool do_wait);
219 : : static char *getMessageFromLeader(int pipefd[2]);
220 : : static void sendMessageToLeader(int pipefd[2], const char *str);
221 : : static int select_loop(int maxFd, fd_set *workerset);
222 : : static char *getMessageFromWorker(ParallelState *pstate,
223 : : bool do_wait, int *worker);
224 : : static void sendMessageToWorker(ParallelState *pstate,
225 : : int worker, const char *str);
226 : : static char *readMessageFromPipe(int fd);
227 : :
228 : : #define messageStartsWith(msg, prefix) \
229 : : (strncmp(msg, prefix, strlen(prefix)) == 0)
230 : :
231 : :
232 : : /*
233 : : * Initialize parallel dump support --- should be called early in process
234 : : * startup. (Currently, this is called whether or not we intend parallel
235 : : * activity.)
236 : : */
237 : : void
4736 heikki.linnakangas@i 238 :CBC 521 : init_parallel_dump_utils(void)
239 : : {
240 : : #ifdef WIN32
241 : : if (!parallel_init_done)
242 : : {
243 : : WSADATA wsaData;
244 : : int err;
245 : :
246 : : /* Prepare for threaded operation */
247 : : tls_index = TlsAlloc();
248 : : mainThreadId = GetCurrentThreadId();
249 : :
250 : : /* Initialize socket access */
251 : : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 : : if (err != 0)
253 : : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254 : :
255 : : parallel_init_done = true;
256 : : }
257 : : #endif
258 : 521 : }
259 : :
260 : : /*
261 : : * Find the ParallelSlot for the current worker process or thread.
262 : : *
263 : : * Returns NULL if no matching slot is found (this implies we're the leader).
264 : : */
265 : : static ParallelSlot *
4739 andrew@dunslane.net 266 :UBC 0 : GetMyPSlot(ParallelState *pstate)
267 : : {
268 : : int i;
269 : :
270 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
271 : : {
272 : : #ifdef WIN32
273 : : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274 : : #else
275 [ # # ]: 0 : if (pstate->parallelSlot[i].pid == getpid())
276 : : #endif
277 : 0 : return &(pstate->parallelSlot[i]);
278 : : }
279 : :
280 : 0 : return NULL;
281 : : }
282 : :
283 : : /*
284 : : * A thread-local version of getLocalPQExpBuffer().
285 : : *
286 : : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 : : * thread, which is much better than one per fmtId/fmtQualifiedId call.
288 : : */
289 : : #ifdef WIN32
290 : : static PQExpBuffer
291 : : getThreadLocalPQExpBuffer(void)
292 : : {
293 : : /*
294 : : * The Tls code goes awry if we use a static var, so we provide for both
295 : : * static and auto, and omit any use of the static var when using Tls. We
296 : : * rely on TlsGetValue() to return 0 if the value is not yet set.
297 : : */
298 : : static PQExpBuffer s_id_return = NULL;
299 : : PQExpBuffer id_return;
300 : :
301 : : if (parallel_init_done)
302 : : id_return = (PQExpBuffer) TlsGetValue(tls_index);
303 : : else
304 : : id_return = s_id_return;
305 : :
306 : : if (id_return) /* first time through? */
307 : : {
308 : : /* same buffer, just wipe contents */
309 : : resetPQExpBuffer(id_return);
310 : : }
311 : : else
312 : : {
313 : : /* new buffer */
314 : : id_return = createPQExpBuffer();
315 : : if (parallel_init_done)
316 : : TlsSetValue(tls_index, id_return);
317 : : else
318 : : s_id_return = id_return;
319 : : }
320 : :
321 : : return id_return;
322 : : }
323 : : #endif /* WIN32 */
324 : :
325 : : /*
326 : : * pg_dump and pg_restore call this to register the cleanup handler
327 : : * as soon as they've created the ArchiveHandle.
328 : : */
329 : : void
4739 andrew@dunslane.net 330 :CBC 364 : on_exit_close_archive(Archive *AHX)
331 : : {
332 : 364 : shutdown_info.AHX = AHX;
333 : 364 : on_exit_nicely(archive_close_connection, &shutdown_info);
334 : 364 : }
335 : :
336 : : /*
337 : : * Update the archive handle in the on_exit callback registered by
338 : : * on_exit_close_archive(). When pg_restore processes a pg_dumpall archive
339 : : * containing multiple databases, each database is restored from a separate
340 : : * archive. After closing one archive and opening the next, we update the
341 : : * shutdown_info to reference the new archive handle so the cleanup callback
342 : : * will close the correct archive on exit.
343 : : */
344 : : void
20 andrew@dunslane.net 345 :GNC 77 : replace_on_exit_close_archive(Archive *AHX)
346 : : {
347 : 77 : shutdown_info.AHX = AHX;
348 : 77 : }
349 : :
350 : : /*
351 : : * on_exit_nicely handler for shutting down database connections and
352 : : * worker processes cleanly.
353 : : */
354 : : static void
4739 andrew@dunslane.net 355 :CBC 288 : archive_close_connection(int code, void *arg)
356 : : {
357 : 288 : ShutdownInformation *si = (ShutdownInformation *) arg;
358 : :
359 [ - + ]: 288 : if (si->pstate)
360 : : {
361 : : /* In parallel mode, must figure out who we are */
4739 andrew@dunslane.net 362 :UBC 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
363 : :
364 [ # # ]: 0 : if (!slot)
365 : : {
366 : : /*
367 : : * We're the leader. Forcibly shut down workers, then close our
368 : : * own database connection, if any.
369 : : */
3573 tgl@sss.pgh.pa.us 370 : 0 : ShutdownWorkersHard(si->pstate);
371 : :
3581 372 [ # # ]: 0 : if (si->AHX)
373 : 0 : DisconnectDatabase(si->AHX);
374 : : }
375 : : else
376 : : {
377 : : /*
378 : : * We're a worker. Shut down our own DB connection if any. On
379 : : * Windows, we also have to close our communication sockets, to
380 : : * emulate what will happen on Unix when the worker process exits.
381 : : * (Without this, if this is a premature exit, the leader would
382 : : * fail to detect it because there would be no EOF condition on
383 : : * the other end of the pipe.)
384 : : */
3456 385 [ # # ]: 0 : if (slot->AH)
386 : 0 : DisconnectDatabase(&(slot->AH->public));
387 : :
388 : : #ifdef WIN32
389 : : closesocket(slot->pipeRevRead);
390 : : closesocket(slot->pipeRevWrite);
391 : : #endif
392 : : }
393 : : }
394 : : else
395 : : {
396 : : /* Non-parallel operation: just kill the leader DB connection */
3581 tgl@sss.pgh.pa.us 397 [ + - ]:CBC 288 : if (si->AHX)
398 : 288 : DisconnectDatabase(si->AHX);
399 : : }
4739 andrew@dunslane.net 400 : 288 : }
401 : :
402 : : /*
403 : : * Forcibly shut down any remaining workers, waiting for them to finish.
404 : : *
405 : : * Note that we don't expect to come here during normal exit (the workers
406 : : * should be long gone, and the ParallelState too). We're only here in a
407 : : * pg_fatal() situation, so intervening to cancel active commands is
408 : : * appropriate.
409 : : */
410 : : static void
4739 andrew@dunslane.net 411 :UBC 0 : ShutdownWorkersHard(ParallelState *pstate)
412 : : {
413 : : int i;
414 : :
415 : : /*
416 : : * Close our write end of the sockets so that any workers waiting for
417 : : * commands know they can exit. (Note: some of the pipeWrite fields might
418 : : * still be zero, if we failed to initialize all the workers. Hence, just
419 : : * ignore errors here.)
420 : : */
421 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
422 : 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
423 : :
424 : : /*
425 : : * Force early termination of any commands currently in progress.
426 : : */
427 : : #ifndef WIN32
428 : : /* On non-Windows, send SIGTERM to each worker process. */
429 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
430 : : {
3573 tgl@sss.pgh.pa.us 431 : 0 : pid_t pid = pstate->parallelSlot[i].pid;
432 : :
433 [ # # ]: 0 : if (pid != 0)
434 : 0 : kill(pid, SIGTERM);
435 : : }
436 : : #else
437 : :
438 : : /*
439 : : * On Windows, send query cancels directly to the workers' backends. Use
440 : : * a critical section to ensure worker threads don't change state.
441 : : */
442 : : EnterCriticalSection(&signal_info_lock);
443 : : for (i = 0; i < pstate->numWorkers; i++)
444 : : {
445 : : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
446 : : char errbuf[1];
447 : :
448 : : if (AH != NULL && AH->connCancel != NULL)
449 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
450 : : }
451 : : LeaveCriticalSection(&signal_info_lock);
452 : : #endif
453 : :
454 : : /* Now wait for them to terminate. */
4739 andrew@dunslane.net 455 : 0 : WaitForTerminatingWorkers(pstate);
456 : 0 : }
457 : :
458 : : /*
459 : : * Wait for all workers to terminate.
460 : : */
461 : : static void
4739 andrew@dunslane.net 462 :CBC 13 : WaitForTerminatingWorkers(ParallelState *pstate)
463 : : {
464 [ + + ]: 41 : while (!HasEveryWorkerTerminated(pstate))
465 : : {
466 : 28 : ParallelSlot *slot = NULL;
467 : : int j;
468 : :
469 : : #ifndef WIN32
470 : : /* On non-Windows, use wait() to wait for next worker to end */
471 : : int status;
472 : 28 : pid_t pid = wait(&status);
473 : :
474 : : /* Find dead worker's slot, and clear the PID field */
475 [ + - ]: 45 : for (j = 0; j < pstate->numWorkers; j++)
476 : : {
3579 tgl@sss.pgh.pa.us 477 : 45 : slot = &(pstate->parallelSlot[j]);
478 [ + + ]: 45 : if (slot->pid == pid)
479 : : {
480 : 28 : slot->pid = 0;
481 : 28 : break;
482 : : }
483 : : }
484 : : #else /* WIN32 */
485 : : /* On Windows, we must use WaitForMultipleObjects() */
486 : : HANDLE *lpHandles = pg_malloc_array(HANDLE, pstate->numWorkers);
487 : : int nrun = 0;
488 : : DWORD ret;
489 : : uintptr_t hThread;
490 : :
491 : : for (j = 0; j < pstate->numWorkers; j++)
492 : : {
493 : : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
494 : : {
495 : : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
496 : : nrun++;
497 : : }
498 : : }
499 : : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
500 : : Assert(ret != WAIT_FAILED);
501 : : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
502 : : free(lpHandles);
503 : :
504 : : /* Find dead worker's slot, and clear the hThread field */
505 : : for (j = 0; j < pstate->numWorkers; j++)
506 : : {
507 : : slot = &(pstate->parallelSlot[j]);
508 : : if (slot->hThread == hThread)
509 : : {
510 : : /* For cleanliness, close handles for dead threads */
511 : : CloseHandle((HANDLE) slot->hThread);
512 : : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
513 : : break;
514 : : }
515 : : }
516 : : #endif /* WIN32 */
517 : :
518 : : /* On all platforms, update workerStatus and te[] as well */
519 [ - + ]: 28 : Assert(j < pstate->numWorkers);
4739 andrew@dunslane.net 520 : 28 : slot->workerStatus = WRKR_TERMINATED;
3456 tgl@sss.pgh.pa.us 521 : 28 : pstate->te[j] = NULL;
522 : : }
4739 andrew@dunslane.net 523 : 13 : }
524 : :
525 : :
526 : : /*
527 : : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
528 : : *
529 : : * This doesn't quite belong in this module, but it needs access to the
530 : : * ParallelState data, so there's not really a better place either.
531 : : *
532 : : * When we get a cancel interrupt, we could just die, but in pg_restore that
533 : : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
534 : : * for a long time. Instead, we try to send a cancel request and then die.
535 : : * pg_dump probably doesn't really need this, but we might as well use it
536 : : * there too. Note that sending the cancel directly from the signal handler
537 : : * is safe because PQcancel() is written to make it so.
538 : : *
539 : : * In parallel operation on Unix, each process is responsible for canceling
540 : : * its own connection (this must be so because nobody else has access to it).
541 : : * Furthermore, the leader process should attempt to forward its signal to
542 : : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
543 : : * needed because typing control-C at the console would deliver SIGINT to
544 : : * every member of the terminal process group --- but in other scenarios it
545 : : * might be that only the leader gets signaled.
546 : : *
547 : : * On Windows, the cancel handler runs in a separate thread, because that's
548 : : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
549 : : * cancels on all active connections, and then return FALSE, which will allow
550 : : * the process to die. For safety's sake, we use a critical section to
551 : : * protect the PGcancel structures against being changed while the signal
552 : : * thread runs.
553 : : */
554 : :
555 : : #ifndef WIN32
556 : :
557 : : /*
558 : : * Signal handler (Unix only)
559 : : */
560 : : static void
3578 tgl@sss.pgh.pa.us 561 :UBC 0 : sigTermHandler(SIGNAL_ARGS)
562 : : {
563 : : int i;
564 : : char errbuf[1];
565 : :
566 : : /*
567 : : * Some platforms allow delivery of new signals to interrupt an active
568 : : * signal handler. That could muck up our attempt to send PQcancel, so
569 : : * disable the signals that set_cancel_handler enabled.
570 : : */
3573 571 : 0 : pqsignal(SIGINT, SIG_IGN);
572 : 0 : pqsignal(SIGTERM, SIG_IGN);
573 : 0 : pqsignal(SIGQUIT, SIG_IGN);
574 : :
575 : : /*
576 : : * If we're in the leader, forward signal to all workers. (It seems best
577 : : * to do this before PQcancel; killing the leader transaction will result
578 : : * in invalid-snapshot errors from active workers, which maybe we can
579 : : * quiet by killing workers first.) Ignore any errors.
580 : : */
581 [ # # ]: 0 : if (signal_info.pstate != NULL)
582 : : {
583 [ # # ]: 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
584 : : {
585 : 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
586 : :
587 [ # # ]: 0 : if (pid != 0)
588 : 0 : kill(pid, SIGTERM);
589 : : }
590 : : }
591 : :
592 : : /*
593 : : * Send QueryCancel if we have a connection to send to. Ignore errors,
594 : : * there's not much we can do about them anyway.
595 : : */
596 [ # # # # ]: 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
597 : 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
598 : :
599 : : /*
600 : : * Report we're quitting, using nothing more complicated than write(2).
601 : : * When in parallel operation, only the leader process should do this.
602 : : */
603 [ # # ]: 0 : if (!signal_info.am_worker)
604 : : {
605 [ # # ]: 0 : if (progname)
606 : : {
607 : 0 : write_stderr(progname);
608 : 0 : write_stderr(": ");
609 : : }
610 : 0 : write_stderr("terminated by user\n");
611 : : }
612 : :
613 : : /*
614 : : * And die, using _exit() not exit() because the latter will invoke atexit
615 : : * handlers that can fail if we interrupted related code.
616 : : */
2246 617 : 0 : _exit(1);
618 : : }
619 : :
620 : : /*
621 : : * Enable cancel interrupt handler, if not already done.
622 : : */
623 : : static void
767 dgustafsson@postgres 624 :CBC 771 : set_cancel_handler(void)
625 : : {
626 : : /*
627 : : * When forking, signal_info.handler_set will propagate into the new
628 : : * process, but that's fine because the signal handler state does too.
629 : : */
3573 tgl@sss.pgh.pa.us 630 [ + + ]: 771 : if (!signal_info.handler_set)
631 : : {
632 : 311 : signal_info.handler_set = true;
633 : :
634 : 311 : pqsignal(SIGINT, sigTermHandler);
635 : 311 : pqsignal(SIGTERM, sigTermHandler);
636 : 311 : pqsignal(SIGQUIT, sigTermHandler);
637 : : }
638 : 771 : }
639 : :
640 : : #else /* WIN32 */
641 : :
642 : : /*
643 : : * Console interrupt handler --- runs in a newly-started thread.
644 : : *
645 : : * After stopping other threads and sending cancel requests on all open
646 : : * connections, we return FALSE which will allow the default ExitProcess()
647 : : * action to be taken.
648 : : */
649 : : static BOOL WINAPI
650 : : consoleHandler(DWORD dwCtrlType)
651 : : {
652 : : int i;
653 : : char errbuf[1];
654 : :
655 : : if (dwCtrlType == CTRL_C_EVENT ||
656 : : dwCtrlType == CTRL_BREAK_EVENT)
657 : : {
658 : : /* Critical section prevents changing data we look at here */
659 : : EnterCriticalSection(&signal_info_lock);
660 : :
661 : : /*
662 : : * If in parallel mode, stop worker threads and send QueryCancel to
663 : : * their connected backends. The main point of stopping the worker
664 : : * threads is to keep them from reporting the query cancels as errors,
665 : : * which would clutter the user's screen. We needn't stop the leader
666 : : * thread since it won't be doing much anyway. Do this before
667 : : * canceling the main transaction, else we might get invalid-snapshot
668 : : * errors reported before we can stop the workers. Ignore errors,
669 : : * there's not much we can do about them anyway.
670 : : */
671 : : if (signal_info.pstate != NULL)
672 : : {
673 : : for (i = 0; i < signal_info.pstate->numWorkers; i++)
674 : : {
675 : : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
676 : : ArchiveHandle *AH = slot->AH;
677 : : HANDLE hThread = (HANDLE) slot->hThread;
678 : :
679 : : /*
680 : : * Using TerminateThread here may leave some resources leaked,
681 : : * but it doesn't matter since we're about to end the whole
682 : : * process.
683 : : */
684 : : if (hThread != INVALID_HANDLE_VALUE)
685 : : TerminateThread(hThread, 0);
686 : :
687 : : if (AH != NULL && AH->connCancel != NULL)
688 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
689 : : }
690 : : }
691 : :
692 : : /*
693 : : * Send QueryCancel to leader connection, if enabled. Ignore errors,
694 : : * there's not much we can do about them anyway.
695 : : */
696 : : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
697 : : (void) PQcancel(signal_info.myAH->connCancel,
698 : : errbuf, sizeof(errbuf));
699 : :
700 : : LeaveCriticalSection(&signal_info_lock);
701 : :
702 : : /*
703 : : * Report we're quitting, using nothing more complicated than
704 : : * write(2). (We might be able to get away with using pg_log_*()
705 : : * here, but since we terminated other threads uncleanly above, it
706 : : * seems better to assume as little as possible.)
707 : : */
708 : : if (progname)
709 : : {
710 : : write_stderr(progname);
711 : : write_stderr(": ");
712 : : }
713 : : write_stderr("terminated by user\n");
714 : : }
715 : :
716 : : /* Always return FALSE to allow signal handling to continue */
717 : : return FALSE;
718 : : }
719 : :
720 : : /*
721 : : * Enable cancel interrupt handler, if not already done.
722 : : */
723 : : static void
724 : : set_cancel_handler(void)
725 : : {
726 : : if (!signal_info.handler_set)
727 : : {
728 : : signal_info.handler_set = true;
729 : :
730 : : InitializeCriticalSection(&signal_info_lock);
731 : :
732 : : SetConsoleCtrlHandler(consoleHandler, TRUE);
733 : : }
734 : : }
735 : :
736 : : #endif /* WIN32 */
737 : :
738 : :
739 : : /*
740 : : * set_archive_cancel_info
741 : : *
742 : : * Fill AH->connCancel with cancellation info for the specified database
743 : : * connection; or clear it if conn is NULL.
744 : : */
745 : : void
746 : 771 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
747 : : {
748 : : PGcancel *oldConnCancel;
749 : :
750 : : /*
751 : : * Activate the interrupt handler if we didn't yet in this process. On
752 : : * Windows, this also initializes signal_info_lock; therefore it's
753 : : * important that this happen at least once before we fork off any
754 : : * threads.
755 : : */
767 dgustafsson@postgres 756 : 771 : set_cancel_handler();
757 : :
758 : : /*
759 : : * On Unix, we assume that storing a pointer value is atomic with respect
760 : : * to any possible signal interrupt. On Windows, use a critical section.
761 : : */
762 : :
763 : : #ifdef WIN32
764 : : EnterCriticalSection(&signal_info_lock);
765 : : #endif
766 : :
767 : : /* Free the old one if we have one */
3573 tgl@sss.pgh.pa.us 768 : 771 : oldConnCancel = AH->connCancel;
769 : : /* be sure interrupt handler doesn't use pointer while freeing */
770 : 771 : AH->connCancel = NULL;
771 : :
772 [ + + ]: 771 : if (oldConnCancel != NULL)
773 : 411 : PQfreeCancel(oldConnCancel);
774 : :
775 : : /* Set the new one if specified */
776 [ + + ]: 771 : if (conn)
777 : 413 : AH->connCancel = PQgetCancel(conn);
778 : :
779 : : /*
780 : : * On Unix, there's only ever one active ArchiveHandle per process, so we
781 : : * can just set signal_info.myAH unconditionally. On Windows, do that
782 : : * only in the main thread; worker threads have to make sure their
783 : : * ArchiveHandle appears in the pstate data, which is dealt with in
784 : : * RunWorker().
785 : : */
786 : : #ifndef WIN32
787 : 771 : signal_info.myAH = AH;
788 : : #else
789 : : if (mainThreadId == GetCurrentThreadId())
790 : : signal_info.myAH = AH;
791 : : #endif
792 : :
793 : : #ifdef WIN32
794 : : LeaveCriticalSection(&signal_info_lock);
795 : : #endif
796 : 771 : }
797 : :
798 : : /*
799 : : * set_cancel_pstate
800 : : *
801 : : * Set signal_info.pstate to point to the specified ParallelState, if any.
802 : : * We need this mainly to have an interlock against Windows signal thread.
803 : : */
804 : : static void
805 : 26 : set_cancel_pstate(ParallelState *pstate)
806 : : {
807 : : #ifdef WIN32
808 : : EnterCriticalSection(&signal_info_lock);
809 : : #endif
810 : :
811 : 26 : signal_info.pstate = pstate;
812 : :
813 : : #ifdef WIN32
814 : : LeaveCriticalSection(&signal_info_lock);
815 : : #endif
816 : 26 : }
817 : :
818 : : /*
819 : : * set_cancel_slot_archive
820 : : *
821 : : * Set ParallelSlot's AH field to point to the specified archive, if any.
822 : : * We need this mainly to have an interlock against Windows signal thread.
823 : : */
824 : : static void
825 : 56 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
826 : : {
827 : : #ifdef WIN32
828 : : EnterCriticalSection(&signal_info_lock);
829 : : #endif
830 : :
3456 831 : 56 : slot->AH = AH;
832 : :
833 : : #ifdef WIN32
834 : : LeaveCriticalSection(&signal_info_lock);
835 : : #endif
3573 836 : 56 : }
837 : :
838 : :
839 : : /*
840 : : * This function is called by both Unix and Windows variants to set up
841 : : * and run a worker process. Caller should exit the process (or thread)
842 : : * upon return.
843 : : */
844 : : static void
845 : 28 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
846 : : {
847 : : int pipefd[2];
848 : :
849 : : /* fetch child ends of pipes */
850 : 28 : pipefd[PIPE_READ] = slot->pipeRevRead;
851 : 28 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
852 : :
853 : : /*
854 : : * Clone the archive so that we have our own state to work with, and in
855 : : * particular our own database connection.
856 : : *
857 : : * We clone on Unix as well as Windows, even though technically we don't
858 : : * need to because fork() gives us a copy in our own address space
859 : : * already. But CloneArchive resets the state information and also clones
860 : : * the database connection which both seem kinda helpful.
861 : : */
862 : 28 : AH = CloneArchive(AH);
863 : :
864 : : /* Remember cloned archive where signal handler can find it */
865 : 28 : set_cancel_slot_archive(slot, AH);
866 : :
867 : : /*
868 : : * Call the setup worker function that's defined in the ArchiveHandle.
869 : : */
3714 870 : 28 : (AH->SetupWorkerPtr) ((Archive *) AH);
871 : :
872 : : /*
873 : : * Execute commands until done.
874 : : */
875 : 28 : WaitForCommands(AH, pipefd);
876 : :
877 : : /*
878 : : * Disconnect from database and clean up.
879 : : */
3573 880 : 28 : set_cancel_slot_archive(slot, NULL);
881 : 28 : DisconnectDatabase(&(AH->public));
882 : 28 : DeCloneArchive(AH);
4739 andrew@dunslane.net 883 : 28 : }
884 : :
885 : : /*
886 : : * Thread base function for Windows
887 : : */
888 : : #ifdef WIN32
889 : : static unsigned __stdcall
890 : : init_spawned_worker_win32(WorkerInfo *wi)
891 : : {
892 : : ArchiveHandle *AH = wi->AH;
893 : : ParallelSlot *slot = wi->slot;
894 : :
895 : : /* Don't need WorkerInfo anymore */
896 : : free(wi);
897 : :
898 : : /* Run the worker ... */
899 : : RunWorker(AH, slot);
900 : :
901 : : /* Exit the thread */
902 : : _endthreadex(0);
903 : : return 0;
904 : : }
905 : : #endif /* WIN32 */
906 : :
907 : : /*
908 : : * This function starts a parallel dump or restore by spawning off the worker
909 : : * processes. For Windows, it creates a number of threads; on Unix the
910 : : * workers are created with fork().
911 : : */
912 : : ParallelState *
3714 tgl@sss.pgh.pa.us 913 : 70 : ParallelBackupStart(ArchiveHandle *AH)
914 : : {
915 : : ParallelState *pstate;
916 : : int i;
917 : :
4739 andrew@dunslane.net 918 [ - + ]: 70 : Assert(AH->public.numWorkers > 0);
919 : :
30 michael@paquier.xyz 920 :GNC 70 : pstate = pg_malloc_object(ParallelState);
921 : :
4739 andrew@dunslane.net 922 :CBC 70 : pstate->numWorkers = AH->public.numWorkers;
3456 tgl@sss.pgh.pa.us 923 : 70 : pstate->te = NULL;
4739 andrew@dunslane.net 924 : 70 : pstate->parallelSlot = NULL;
925 : :
926 [ + + ]: 70 : if (AH->public.numWorkers == 1)
927 : 57 : return pstate;
928 : :
929 : : /* Create status arrays, being sure to initialize all fields to 0 */
30 michael@paquier.xyz 930 :GNC 13 : pstate->te =
931 : 13 : pg_malloc0_array(TocEntry *, pstate->numWorkers);
932 : 13 : pstate->parallelSlot =
933 : 13 : pg_malloc0_array(ParallelSlot, pstate->numWorkers);
934 : :
935 : : #ifdef WIN32
936 : : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
937 : : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
938 : : #endif
939 : :
940 : : /*
941 : : * Set the pstate in shutdown_info, to tell the exit handler that it must
942 : : * clean up workers as well as the main database connection. But we don't
943 : : * set this in signal_info yet, because we don't want child processes to
944 : : * inherit non-NULL signal_info.pstate.
945 : : */
3573 tgl@sss.pgh.pa.us 946 :CBC 13 : shutdown_info.pstate = pstate;
947 : :
948 : : /*
949 : : * Temporarily disable query cancellation on the leader connection. This
950 : : * ensures that child processes won't inherit valid AH->connCancel
951 : : * settings and thus won't try to issue cancels against the leader's
952 : : * connection. No harm is done if we fail while it's disabled, because
953 : : * the leader connection is idle at this point anyway.
954 : : */
955 : 13 : set_archive_cancel_info(AH, NULL);
956 : :
957 : : /* Ensure stdio state is quiesced before forking */
958 : 13 : fflush(NULL);
959 : :
960 : : /* Create desired number of workers */
4739 andrew@dunslane.net 961 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
962 : : {
963 : : #ifdef WIN32
964 : : WorkerInfo *wi;
965 : : uintptr_t handle;
966 : : #else
967 : : pid_t pid;
968 : : #endif
3573 tgl@sss.pgh.pa.us 969 : 28 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
970 : : int pipeMW[2],
971 : : pipeWM[2];
972 : :
973 : : /* Create communication pipes for this worker */
4739 andrew@dunslane.net 974 [ + - - + ]: 28 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
1437 tgl@sss.pgh.pa.us 975 :UBC 0 : pg_fatal("could not create communication channels: %m");
976 : :
977 : : /* leader's ends of the pipes */
3573 tgl@sss.pgh.pa.us 978 :CBC 28 : slot->pipeRead = pipeWM[PIPE_READ];
979 : 28 : slot->pipeWrite = pipeMW[PIPE_WRITE];
980 : : /* child's ends of the pipes */
981 : 28 : slot->pipeRevRead = pipeMW[PIPE_READ];
982 : 28 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
983 : :
984 : : #ifdef WIN32
985 : : /* Create transient structure to pass args to worker function */
986 : : wi = pg_malloc_object(WorkerInfo);
987 : :
988 : : wi->AH = AH;
989 : : wi->slot = slot;
990 : :
991 : : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
992 : : wi, 0, &(slot->threadId));
993 : : slot->hThread = handle;
994 : : slot->workerStatus = WRKR_IDLE;
995 : : #else /* !WIN32 */
4739 andrew@dunslane.net 996 : 28 : pid = fork();
997 [ + + ]: 56 : if (pid == 0)
998 : : {
999 : : /* we are the worker */
1000 : : int j;
1001 : :
1002 : : /* this is needed for GetMyPSlot() */
3573 tgl@sss.pgh.pa.us 1003 : 28 : slot->pid = getpid();
1004 : :
1005 : : /* instruct signal handler that we're in a worker now */
1006 : 28 : signal_info.am_worker = true;
1007 : :
1008 : : /* close read end of Worker -> Leader */
4739 andrew@dunslane.net 1009 : 28 : closesocket(pipeWM[PIPE_READ]);
1010 : : /* close write end of Leader -> Worker */
1011 : 28 : closesocket(pipeMW[PIPE_WRITE]);
1012 : :
1013 : : /*
1014 : : * Close all inherited fds for communication of the leader with
1015 : : * previously-forked workers.
1016 : : */
1017 [ + + ]: 45 : for (j = 0; j < i; j++)
1018 : : {
1019 : 17 : closesocket(pstate->parallelSlot[j].pipeRead);
1020 : 17 : closesocket(pstate->parallelSlot[j].pipeWrite);
1021 : : }
1022 : :
1023 : : /* Run the worker ... */
3573 tgl@sss.pgh.pa.us 1024 : 28 : RunWorker(AH, slot);
1025 : :
1026 : : /* We can just exit(0) when done */
4739 andrew@dunslane.net 1027 : 28 : exit(0);
1028 : : }
1029 [ - + ]: 28 : else if (pid < 0)
1030 : : {
1031 : : /* fork failed */
1437 tgl@sss.pgh.pa.us 1032 :UBC 0 : pg_fatal("could not create worker process: %m");
1033 : : }
1034 : :
1035 : : /* In Leader after successful fork */
3573 tgl@sss.pgh.pa.us 1036 :CBC 28 : slot->pid = pid;
2235 1037 : 28 : slot->workerStatus = WRKR_IDLE;
1038 : :
1039 : : /* close read end of Leader -> Worker */
4739 andrew@dunslane.net 1040 : 28 : closesocket(pipeMW[PIPE_READ]);
1041 : : /* close write end of Worker -> Leader */
1042 : 28 : closesocket(pipeWM[PIPE_WRITE]);
1043 : : #endif /* WIN32 */
1044 : : }
1045 : :
1046 : : /*
1047 : : * Having forked off the workers, disable SIGPIPE so that leader isn't
1048 : : * killed if it tries to send a command to a dead worker. We don't want
1049 : : * the workers to inherit this setting, though.
1050 : : */
1051 : : #ifndef WIN32
3573 tgl@sss.pgh.pa.us 1052 : 13 : pqsignal(SIGPIPE, SIG_IGN);
1053 : : #endif
1054 : :
1055 : : /*
1056 : : * Re-establish query cancellation on the leader connection.
1057 : : */
1058 : 13 : set_archive_cancel_info(AH, AH->connection);
1059 : :
1060 : : /*
1061 : : * Tell the cancel signal handler to forward signals to worker processes,
1062 : : * too. (As with query cancel, we did not need this earlier because the
1063 : : * workers have not yet been given anything to do; if we die before this
1064 : : * point, any already-started workers will see EOF and quit promptly.)
1065 : : */
1066 : 13 : set_cancel_pstate(pstate);
1067 : :
4739 andrew@dunslane.net 1068 : 13 : return pstate;
1069 : : }
1070 : :
1071 : : /*
1072 : : * Close down a parallel dump or restore.
1073 : : */
1074 : : void
1075 : 70 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1076 : : {
1077 : : int i;
1078 : :
1079 : : /* No work if non-parallel */
1080 [ + + ]: 70 : if (pstate->numWorkers == 1)
1081 : 57 : return;
1082 : :
1083 : : /* There should not be any unfinished jobs */
1084 [ - + ]: 13 : Assert(IsEveryWorkerIdle(pstate));
1085 : :
1086 : : /* Close the sockets so that the workers know they can exit */
1087 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
1088 : : {
1089 : 28 : closesocket(pstate->parallelSlot[i].pipeRead);
1090 : 28 : closesocket(pstate->parallelSlot[i].pipeWrite);
1091 : : }
1092 : :
1093 : : /* Wait for them to exit */
1094 : 13 : WaitForTerminatingWorkers(pstate);
1095 : :
1096 : : /*
1097 : : * Unlink pstate from shutdown_info, so the exit handler will not try to
1098 : : * use it; and likewise unlink from signal_info.
1099 : : */
1100 : 13 : shutdown_info.pstate = NULL;
3573 tgl@sss.pgh.pa.us 1101 : 13 : set_cancel_pstate(NULL);
1102 : :
1103 : : /* Release state (mere neatnik-ism, since we're about to terminate) */
3456 1104 : 13 : free(pstate->te);
4739 andrew@dunslane.net 1105 : 13 : free(pstate->parallelSlot);
1106 : 13 : free(pstate);
1107 : : }
1108 : :
1109 : : /*
1110 : : * These next four functions handle construction and parsing of the command
1111 : : * strings and response strings for parallel workers.
1112 : : *
1113 : : * Currently, these can be the same regardless of which archive format we are
1114 : : * processing. In future, we might want to let format modules override these
1115 : : * functions to add format-specific data to a command or response.
1116 : : */
1117 : :
1118 : : /*
1119 : : * buildWorkerCommand: format a command string to send to a worker.
1120 : : *
1121 : : * The string is built in the caller-supplied buffer of size buflen.
1122 : : */
1123 : : static void
3456 tgl@sss.pgh.pa.us 1124 : 118 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1125 : : char *buf, int buflen)
1126 : : {
1127 [ + + ]: 118 : if (act == ACT_DUMP)
1128 : 72 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1129 [ + - ]: 46 : else if (act == ACT_RESTORE)
1130 : 46 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1131 : : else
3456 tgl@sss.pgh.pa.us 1132 :UBC 0 : Assert(false);
3456 tgl@sss.pgh.pa.us 1133 :CBC 118 : }
1134 : :
1135 : : /*
1136 : : * parseWorkerCommand: interpret a command string in a worker.
1137 : : */
1138 : : static void
1139 : 118 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1140 : : const char *msg)
1141 : : {
1142 : : DumpId dumpId;
1143 : : int nBytes;
1144 : :
1145 [ + + ]: 118 : if (messageStartsWith(msg, "DUMP "))
1146 : : {
1147 : 72 : *act = ACT_DUMP;
1148 : 72 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1149 [ - + ]: 72 : Assert(nBytes == strlen(msg));
1150 : 72 : *te = getTocEntryByDumpId(AH, dumpId);
1151 [ - + ]: 72 : Assert(*te != NULL);
1152 : : }
1153 [ + - ]: 46 : else if (messageStartsWith(msg, "RESTORE "))
1154 : : {
1155 : 46 : *act = ACT_RESTORE;
1156 : 46 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1157 [ - + ]: 46 : Assert(nBytes == strlen(msg));
1158 : 46 : *te = getTocEntryByDumpId(AH, dumpId);
1159 [ - + ]: 46 : Assert(*te != NULL);
1160 : : }
1161 : : else
1437 tgl@sss.pgh.pa.us 1162 :UBC 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1163 : : msg);
3456 tgl@sss.pgh.pa.us 1164 :CBC 118 : }
1165 : :
1166 : : /*
1167 : : * buildWorkerResponse: format a response string to send to the leader.
1168 : : *
1169 : : * The string is built in the caller-supplied buffer of size buflen.
1170 : : */
1171 : : static void
1172 : 118 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1173 : : char *buf, int buflen)
1174 : : {
1175 [ - + ]: 118 : snprintf(buf, buflen, "OK %d %d %d",
1176 : : te->dumpId,
1177 : : status,
1178 : : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1179 : 118 : }
1180 : :
1181 : : /*
1182 : : * parseWorkerResponse: parse the status message returned by a worker.
1183 : : *
1184 : : * Returns the integer status code, and may update fields of AH and/or te.
1185 : : */
1186 : : static int
1187 : 118 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1188 : : const char *msg)
1189 : : {
1190 : : DumpId dumpId;
1191 : : int nBytes,
1192 : : n_errors;
1193 : 118 : int status = 0;
1194 : :
1195 [ + - ]: 118 : if (messageStartsWith(msg, "OK "))
1196 : : {
1197 : 118 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1198 : :
1199 [ - + ]: 118 : Assert(dumpId == te->dumpId);
1200 [ - + ]: 118 : Assert(nBytes == strlen(msg));
1201 : :
1202 : 118 : AH->public.n_errors += n_errors;
1203 : : }
1204 : : else
1437 tgl@sss.pgh.pa.us 1205 :UBC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1206 : : msg);
1207 : :
3456 tgl@sss.pgh.pa.us 1208 :CBC 118 : return status;
1209 : : }
1210 : :
1211 : : /*
1212 : : * Dispatch a job to some free worker.
1213 : : *
1214 : : * te is the TocEntry to be processed, act is the action to be taken on it.
1215 : : * callback is the function to call on completion of the job.
1216 : : *
1217 : : * If no worker is currently available, this will block, and previously
1218 : : * registered callback functions may be called.
1219 : : */
1220 : : void
1221 : 118 : DispatchJobForTocEntry(ArchiveHandle *AH,
1222 : : ParallelState *pstate,
1223 : : TocEntry *te,
1224 : : T_Action act,
1225 : : ParallelCompletionPtr callback,
1226 : : void *callback_data)
1227 : : {
1228 : : int worker;
1229 : : char buf[256];
1230 : :
1231 : : /* Get a worker, waiting if none are idle */
1232 [ + + ]: 173 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1233 : 55 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1234 : :
1235 : : /* Construct and send command string */
1236 : 118 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1237 : :
1238 : 118 : sendMessageToWorker(pstate, worker, buf);
1239 : :
1240 : : /* Remember worker is busy, and which TocEntry it's working on */
4739 andrew@dunslane.net 1241 : 118 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
3456 tgl@sss.pgh.pa.us 1242 : 118 : pstate->parallelSlot[worker].callback = callback;
1243 : 118 : pstate->parallelSlot[worker].callback_data = callback_data;
1244 : 118 : pstate->te[worker] = te;
4739 andrew@dunslane.net 1245 : 118 : }
1246 : :
1247 : : /*
1248 : : * Find an idle worker and return its slot number.
1249 : : * Return NO_SLOT if none are idle.
1250 : : */
1251 : : static int
1252 : 270 : GetIdleWorker(ParallelState *pstate)
1253 : : {
1254 : : int i;
1255 : :
1256 [ + + ]: 679 : for (i = 0; i < pstate->numWorkers; i++)
1257 : : {
1258 [ + + ]: 538 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1259 : 129 : return i;
1260 : : }
1261 : 141 : return NO_SLOT;
1262 : : }
1263 : :
1264 : : /*
1265 : : * Return true iff no worker is running.
1266 : : */
1267 : : static bool
1268 : 41 : HasEveryWorkerTerminated(ParallelState *pstate)
1269 : : {
1270 : : int i;
1271 : :
1272 [ + + ]: 81 : for (i = 0; i < pstate->numWorkers; i++)
1273 : : {
2235 tgl@sss.pgh.pa.us 1274 [ + + - + ]: 68 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4739 andrew@dunslane.net 1275 : 28 : return false;
1276 : : }
1277 : 13 : return true;
1278 : : }
1279 : :
1280 : : /*
1281 : : * Return true iff every worker is in the WRKR_IDLE state.
1282 : : */
1283 : : bool
1284 : 72 : IsEveryWorkerIdle(ParallelState *pstate)
1285 : : {
1286 : : int i;
1287 : :
1288 [ + + ]: 156 : for (i = 0; i < pstate->numWorkers; i++)
1289 : : {
1290 [ + + ]: 122 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1291 : 38 : return false;
1292 : : }
1293 : 34 : return true;
1294 : : }
1295 : :
1296 : : /*
1297 : : * Acquire lock on a table to be dumped by a worker process.
1298 : : *
1299 : : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1300 : : * it's no problem for a worker to get one too, but if anything else besides
1301 : : * pg_dump is running, there's a possible deadlock:
1302 : : *
1303 : : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1304 : : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1305 : : * because the leader holds a conflicting ACCESS SHARE lock).
1306 : : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1307 : : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1308 : : * 4) Now we have a deadlock, since the leader is effectively waiting for
1309 : : * the worker. The server cannot detect that, however.
1310 : : *
1311 : : * To prevent an infinite wait, prior to touching a table in a worker, request
1312 : : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1313 : : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1314 : : * so we have a deadlock. We must fail the backup in that case.
1315 : : */
1316 : : static void
3578 tgl@sss.pgh.pa.us 1317 : 72 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1318 : : {
1319 : : const char *qualId;
1320 : : PQExpBuffer query;
1321 : : PGresult *res;
1322 : :
1323 : : /* Nothing to do for BLOBS */
1324 [ + + ]: 72 : if (strcmp(te->desc, "BLOBS") == 0)
1325 : 5 : return;
1326 : :
1327 : 67 : query = createPQExpBuffer();
1328 : :
2767 1329 : 67 : qualId = fmtQualifiedId(te->namespace, te->tag);
1330 : :
4739 andrew@dunslane.net 1331 : 67 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1332 : : qualId);
1333 : :
1334 : 67 : res = PQexec(AH->connection, query->data);
1335 : :
1336 [ + - - + ]: 67 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1437 tgl@sss.pgh.pa.us 1337 :UBC 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1338 : : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1339 : : "on the table after the pg_dump parent process had gotten the "
1340 : : "initial ACCESS SHARE lock on the table.", qualId);
1341 : :
4739 andrew@dunslane.net 1342 :CBC 67 : PQclear(res);
1343 : 67 : destroyPQExpBuffer(query);
1344 : : }
1345 : :
1346 : : /*
1347 : : * WaitForCommands: main routine for a worker process.
1348 : : *
1349 : : * Read and execute commands from the leader until we see EOF on the pipe.
1350 : : */
1351 : : static void
3714 tgl@sss.pgh.pa.us 1352 : 28 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1353 : : {
1354 : : char *command;
1355 : : TocEntry *te;
1356 : : T_Action act;
3456 1357 : 28 : int status = 0;
1358 : : char buf[256];
1359 : :
1360 : : for (;;)
1361 : : {
2100 andres@anarazel.de 1362 [ + + ]: 146 : if (!(command = getMessageFromLeader(pipefd)))
1363 : : {
1364 : : /* EOF, so done */
4739 andrew@dunslane.net 1365 : 28 : return;
1366 : : }
1367 : :
1368 : : /* Decode the command */
3456 tgl@sss.pgh.pa.us 1369 : 118 : parseWorkerCommand(AH, &te, &act, command);
1370 : :
1371 [ + + ]: 118 : if (act == ACT_DUMP)
1372 : : {
1373 : : /* Acquire lock on this table within the worker's session */
3578 1374 : 72 : lockTableForWorker(AH, te);
1375 : :
1376 : : /* Perform the dump command */
3456 1377 : 72 : status = (AH->WorkerJobDumpPtr) (AH, te);
1378 : : }
1379 [ + - ]: 46 : else if (act == ACT_RESTORE)
1380 : : {
1381 : : /* Perform the restore command */
1382 : 46 : status = (AH->WorkerJobRestorePtr) (AH, te);
1383 : : }
1384 : : else
3456 tgl@sss.pgh.pa.us 1385 :UBC 0 : Assert(false);
1386 : :
1387 : : /* Return status to leader */
3456 tgl@sss.pgh.pa.us 1388 :CBC 118 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1389 : :
2100 andres@anarazel.de 1390 : 118 : sendMessageToLeader(pipefd, buf);
1391 : :
1392 : : /* command was pg_malloc'd and we are responsible for free()ing it. */
4627 sfrost@snowman.net 1393 : 118 : free(command);
1394 : : }
1395 : : }
1396 : :
1397 : : /*
1398 : : * Check for status messages from workers.
1399 : : *
1400 : : * If do_wait is true, wait to get a status message; otherwise, just return
1401 : : * immediately if there is none available.
1402 : : *
1403 : : * When we get a status message, we pass the status code to the callback
1404 : : * function that was specified to DispatchJobForTocEntry, then reset the
1405 : : * worker status to IDLE.
1406 : : *
1407 : : * Returns true if we collected a status message, else false.
1408 : : *
1409 : : * XXX is it worth checking for more than one status message per call?
1410 : : * It seems somewhat unlikely that multiple workers would finish at exactly
1411 : : * the same time.
1412 : : */
1413 : : static bool
4739 andrew@dunslane.net 1414 : 223 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1415 : : {
1416 : : int worker;
1417 : : char *msg;
1418 : :
1419 : : /* Try to collect a status message */
1420 : 223 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1421 : :
1422 [ + + ]: 223 : if (!msg)
1423 : : {
1424 : : /* If do_wait is true, we must have detected EOF on some socket */
1425 [ - + ]: 105 : if (do_wait)
1437 tgl@sss.pgh.pa.us 1426 :UBC 0 : pg_fatal("a worker process died unexpectedly");
3456 tgl@sss.pgh.pa.us 1427 :CBC 105 : return false;
1428 : : }
1429 : :
1430 : : /* Process it and update our idea of the worker's status */
4739 andrew@dunslane.net 1431 [ + - ]: 118 : if (messageStartsWith(msg, "OK "))
1432 : : {
3456 tgl@sss.pgh.pa.us 1433 : 118 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1434 : 118 : TocEntry *te = pstate->te[worker];
1435 : : int status;
1436 : :
1437 : 118 : status = parseWorkerResponse(AH, te, msg);
1438 : 118 : slot->callback(AH, te, status, slot->callback_data);
1439 : 118 : slot->workerStatus = WRKR_IDLE;
1440 : 118 : pstate->te[worker] = NULL;
1441 : : }
1442 : : else
1437 tgl@sss.pgh.pa.us 1443 :UBC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1444 : : msg);
1445 : :
1446 : : /* Free the string returned from getMessageFromWorker */
4739 andrew@dunslane.net 1447 :CBC 118 : free(msg);
1448 : :
3456 tgl@sss.pgh.pa.us 1449 : 118 : return true;
1450 : : }
1451 : :
1452 : : /*
1453 : : * Check for status results from workers, waiting if necessary.
1454 : : *
1455 : : * Available wait modes are:
1456 : : * WFW_NO_WAIT: reap any available status, but don't block
1457 : : * WFW_GOT_STATUS: wait for at least one more worker to finish
1458 : : * WFW_ONE_IDLE: wait for at least one worker to be idle
1459 : : * WFW_ALL_IDLE: wait for all workers to be idle
1460 : : *
1461 : : * Any received results are passed to the callback specified to
1462 : : * DispatchJobForTocEntry.
1463 : : *
1464 : : * This function is executed in the leader process.
1465 : : */
1466 : : void
1467 : 121 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1468 : : {
1469 : 121 : bool do_wait = false;
1470 : :
1471 : : /*
1472 : : * In GOT_STATUS mode, always block waiting for a message, since we can't
1473 : : * return till we get something. In other modes, we don't block the first
1474 : : * time through the loop.
1475 : : */
1476 [ + + ]: 121 : if (mode == WFW_GOT_STATUS)
1477 : : {
1478 : : /* Assert that caller knows what it's doing */
1479 [ - + ]: 11 : Assert(!IsEveryWorkerIdle(pstate));
1480 : 11 : do_wait = true;
1481 : : }
1482 : :
1483 : : for (;;)
1484 : : {
1485 : : /*
1486 : : * Check for status messages, even if we don't need to block. We do
1487 : : * not try very hard to reap all available messages, though, since
1488 : : * there's unlikely to be more than one.
1489 : : */
1490 [ + + ]: 223 : if (ListenToWorkers(AH, pstate, do_wait))
1491 : : {
1492 : : /*
1493 : : * If we got a message, we are done by definition for GOT_STATUS
1494 : : * mode, and we can also be certain that there's at least one idle
1495 : : * worker. So we're done in all but ALL_IDLE mode.
1496 : : */
1497 [ + + ]: 118 : if (mode != WFW_ALL_IDLE)
1498 : 101 : return;
1499 : : }
1500 : :
1501 : : /* Check whether we must wait for new status messages */
1502 [ - - + + : 122 : switch (mode)
- ]
1503 : : {
3456 tgl@sss.pgh.pa.us 1504 :UBC 0 : case WFW_NO_WAIT:
1505 : 0 : return; /* never wait */
1506 : 0 : case WFW_GOT_STATUS:
1507 : 0 : Assert(false); /* can't get here, because we waited */
1508 : : break;
3456 tgl@sss.pgh.pa.us 1509 :CBC 97 : case WFW_ONE_IDLE:
1510 [ + + ]: 97 : if (GetIdleWorker(pstate) != NO_SLOT)
1511 : 11 : return;
1512 : 86 : break;
1513 : 25 : case WFW_ALL_IDLE:
1514 [ + + ]: 25 : if (IsEveryWorkerIdle(pstate))
1515 : 9 : return;
1516 : 16 : break;
1517 : : }
1518 : :
1519 : : /* Loop back, and this time wait for something to happen */
1520 : 102 : do_wait = true;
1521 : : }
1522 : : }
1523 : :
1524 : : /*
1525 : : * Read one command message from the leader, blocking if necessary
1526 : : * until one is available, and return it as a malloc'd string.
1527 : : * On EOF, return NULL.
1528 : : *
1529 : : * This function is executed in worker processes.
1530 : : */
1531 : : static char *
2100 andres@anarazel.de 1532 : 146 : getMessageFromLeader(int pipefd[2])
1533 : : {
4739 andrew@dunslane.net 1534 : 146 : return readMessageFromPipe(pipefd[PIPE_READ]);
1535 : : }
1536 : :
1537 : : /*
1538 : : * Send a status message to the leader.
1539 : : *
1540 : : * This function is executed in worker processes.
1541 : : */
1542 : : static void
2100 andres@anarazel.de 1543 : 118 : sendMessageToLeader(int pipefd[2], const char *str)
1544 : : {
4739 andrew@dunslane.net 1545 : 118 : int len = strlen(str) + 1;
1546 : :
1547 [ - + ]: 118 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1437 tgl@sss.pgh.pa.us 1548 :UBC 0 : pg_fatal("could not write to the communication channel: %m");
4739 andrew@dunslane.net 1549 :CBC 118 : }
1550 : :
1551 : : /*
1552 : : * Wait until some descriptor in "workerset" becomes readable.
1553 : : * Returns -1 on error, else the number of readable descriptors.
1554 : : */
1555 : : static int
1556 : 113 : select_loop(int maxFd, fd_set *workerset)
1557 : : {
1558 : : int i;
1559 : 113 : fd_set saveSet = *workerset;
1560 : :
1561 : : for (;;)
1562 : : {
1563 : 113 : *workerset = saveSet;
1564 : 113 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1565 : :
1566 : : #ifndef WIN32
1567 [ - + - - ]: 113 : if (i < 0 && errno == EINTR)
4739 andrew@dunslane.net 1568 :UBC 0 : continue;
1569 : : #else
1570 : : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1571 : : continue;
1572 : : #endif
4739 andrew@dunslane.net 1573 :CBC 113 : break;
1574 : : }
1575 : :
1576 : 113 : return i;
1577 : : }
1578 : :
1579 : :
1580 : : /*
1581 : : * Check for messages from worker processes.
1582 : : *
1583 : : * If a message is available, return it as a malloc'd string, and put the
1584 : : * index of the sending worker in *worker.
1585 : : *
1586 : : * If nothing is available, wait if "do_wait" is true, else return NULL.
1587 : : *
1588 : : * If we detect EOF on any socket, we'll return NULL. It's not great that
1589 : : * that's hard to distinguish from the no-data-available case, but for now
1590 : : * our one caller is okay with that.
1591 : : *
1592 : : * This function is executed in the leader process.
1593 : : */
1594 : : static char *
1595 : 223 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1596 : : {
1597 : : int i;
1598 : : fd_set workerset;
1599 : 223 : int maxFd = -1;
1600 : 223 : struct timeval nowait = {0, 0};
1601 : :
1602 : : /* construct bitmap of socket descriptors for select() */
1603 [ + + ]: 3791 : FD_ZERO(&workerset);
1604 [ + + ]: 756 : for (i = 0; i < pstate->numWorkers; i++)
1605 : : {
2235 tgl@sss.pgh.pa.us 1606 [ + + - + ]: 533 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4739 andrew@dunslane.net 1607 :UBC 0 : continue;
4739 andrew@dunslane.net 1608 :CBC 533 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1609 [ + - ]: 533 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1610 : 533 : maxFd = pstate->parallelSlot[i].pipeRead;
1611 : : }
1612 : :
1613 [ + + ]: 223 : if (do_wait)
1614 : : {
1615 : 113 : i = select_loop(maxFd, &workerset);
1616 [ - + ]: 113 : Assert(i != 0);
1617 : : }
1618 : : else
1619 : : {
1620 [ + + ]: 110 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1621 : 105 : return NULL;
1622 : : }
1623 : :
1624 [ - + ]: 118 : if (i < 0)
1437 tgl@sss.pgh.pa.us 1625 :UBC 0 : pg_fatal("%s() failed: %m", "select");
1626 : :
4739 andrew@dunslane.net 1627 [ + - ]:CBC 195 : for (i = 0; i < pstate->numWorkers; i++)
1628 : : {
1629 : : char *msg;
1630 : :
2235 tgl@sss.pgh.pa.us 1631 [ + + - + ]: 195 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
2235 tgl@sss.pgh.pa.us 1632 :UBC 0 : continue;
4739 andrew@dunslane.net 1633 [ + + ]:CBC 195 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1634 : 77 : continue;
1635 : :
1636 : : /*
1637 : : * Read the message if any. If the socket is ready because of EOF,
1638 : : * we'll return NULL instead (and the socket will stay ready, so the
1639 : : * condition will persist).
1640 : : *
1641 : : * Note: because this is a blocking read, we'll wait if only part of
1642 : : * the message is available. Waiting a long time would be bad, but
1643 : : * since worker status messages are short and are always sent in one
1644 : : * operation, it shouldn't be a problem in practice.
1645 : : */
1646 : 118 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1647 : 118 : *worker = i;
1648 : 118 : return msg;
1649 : : }
4739 andrew@dunslane.net 1650 :UBC 0 : Assert(false);
1651 : : return NULL;
1652 : : }
1653 : :
1654 : : /*
1655 : : * Send a command message to the specified worker process.
1656 : : *
1657 : : * This function is executed in the leader process.
1658 : : */
1659 : : static void
4739 andrew@dunslane.net 1660 :CBC 118 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1661 : : {
1662 : 118 : int len = strlen(str) + 1;
1663 : :
1664 [ - + ]: 118 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1665 : : {
1437 tgl@sss.pgh.pa.us 1666 :UBC 0 : pg_fatal("could not write to the communication channel: %m");
1667 : : }
4739 andrew@dunslane.net 1668 :CBC 118 : }
1669 : :
1670 : : /*
1671 : : * Read one message from the specified pipe (fd), blocking if necessary
1672 : : * until one is available, and return it as a malloc'd string.
1673 : : * On EOF, return NULL.
1674 : : *
1675 : : * A "message" on the channel is just a null-terminated string.
1676 : : */
1677 : : static char *
1678 : 264 : readMessageFromPipe(int fd)
1679 : : {
1680 : : char *msg;
1681 : : int msgsize,
1682 : : bufsize;
1683 : : int ret;
1684 : :
1685 : : /*
1686 : : * In theory, if we let piperead() read multiple bytes, it might give us
1687 : : * back fragments of multiple messages. (That can't actually occur, since
1688 : : * neither leader nor workers send more than one message without waiting
1689 : : * for a reply, but we don't wish to assume that here.) For simplicity,
1690 : : * read a byte at a time until we get the terminating '\0'. This method
1691 : : * is a bit inefficient, but since this is only used for relatively short
1692 : : * command and status strings, it shouldn't matter.
1693 : : */
1694 : 264 : bufsize = 64; /* could be any number */
1695 : 264 : msg = (char *) pg_malloc(bufsize);
1696 : 264 : msgsize = 0;
1697 : : for (;;)
1698 : : {
3578 tgl@sss.pgh.pa.us 1699 [ - + ]: 2762 : Assert(msgsize < bufsize);
4739 andrew@dunslane.net 1700 : 2762 : ret = piperead(fd, msg + msgsize, 1);
1701 [ + + ]: 2762 : if (ret <= 0)
3578 tgl@sss.pgh.pa.us 1702 : 28 : break; /* error or connection closure */
1703 : :
4739 andrew@dunslane.net 1704 [ - + ]: 2734 : Assert(ret == 1);
1705 : :
1706 [ + + ]: 2734 : if (msg[msgsize] == '\0')
3578 tgl@sss.pgh.pa.us 1707 : 236 : return msg; /* collected whole message */
1708 : :
4739 andrew@dunslane.net 1709 : 2498 : msgsize++;
3578 tgl@sss.pgh.pa.us 1710 [ - + ]: 2498 : if (msgsize == bufsize) /* enlarge buffer if needed */
1711 : : {
3578 tgl@sss.pgh.pa.us 1712 :UBC 0 : bufsize += 16; /* could be any number */
4135 1713 : 0 : msg = (char *) pg_realloc(msg, bufsize);
1714 : : }
1715 : : }
1716 : :
1717 : : /* Other end has closed the connection */
4135 tgl@sss.pgh.pa.us 1718 :CBC 28 : pg_free(msg);
4433 sfrost@snowman.net 1719 : 28 : return NULL;
1720 : : }
1721 : :
1722 : : #ifdef WIN32
1723 : :
1724 : : /*
1725 : : * This is a replacement version of pipe(2) for Windows which allows the pipe
1726 : : * handles to be used in select().
1727 : : *
1728 : : * Reads and writes on the pipe must go through piperead()/pipewrite().
1729 : : *
1730 : : * For consistency with Unix we declare the returned handles as "int".
1731 : : * This is okay even on WIN64 because system handles are not more than
1732 : : * 32 bits wide, but we do have to do some casting.
1733 : : */
1734 : : static int
1735 : : pgpipe(int handles[2])
1736 : : {
1737 : : pgsocket s,
1738 : : tmp_sock;
1739 : : struct sockaddr_in serv_addr;
1740 : : int len = sizeof(serv_addr);
1741 : :
1742 : : /* We have to use the Unix socket invalid file descriptor value here. */
1743 : : handles[0] = handles[1] = -1;
1744 : :
1745 : : /*
1746 : : * setup listen socket
1747 : : */
1748 : : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1749 : : {
1750 : : pg_log_error("pgpipe: could not create socket: error code %d",
1751 : : WSAGetLastError());
1752 : : return -1;
1753 : : }
1754 : :
1755 : : memset(&serv_addr, 0, sizeof(serv_addr));
1756 : : serv_addr.sin_family = AF_INET;
1757 : : serv_addr.sin_port = pg_hton16(0);
1758 : : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1759 : : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1760 : : {
1761 : : pg_log_error("pgpipe: could not bind: error code %d",
1762 : : WSAGetLastError());
1763 : : closesocket(s);
1764 : : return -1;
1765 : : }
1766 : : if (listen(s, 1) == SOCKET_ERROR)
1767 : : {
1768 : : pg_log_error("pgpipe: could not listen: error code %d",
1769 : : WSAGetLastError());
1770 : : closesocket(s);
1771 : : return -1;
1772 : : }
1773 : : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1774 : : {
1775 : : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1776 : : WSAGetLastError());
1777 : : closesocket(s);
1778 : : return -1;
1779 : : }
1780 : :
1781 : : /*
1782 : : * setup pipe handles
1783 : : */
1784 : : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1785 : : {
1786 : : pg_log_error("pgpipe: could not create second socket: error code %d",
1787 : : WSAGetLastError());
1788 : : closesocket(s);
1789 : : return -1;
1790 : : }
1791 : : handles[1] = (int) tmp_sock;
1792 : :
1793 : : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1794 : : {
1795 : : pg_log_error("pgpipe: could not connect socket: error code %d",
1796 : : WSAGetLastError());
1797 : : closesocket(handles[1]);
1798 : : handles[1] = -1;
1799 : : closesocket(s);
1800 : : return -1;
1801 : : }
1802 : : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1803 : : {
1804 : : pg_log_error("pgpipe: could not accept connection: error code %d",
1805 : : WSAGetLastError());
1806 : : closesocket(handles[1]);
1807 : : handles[1] = -1;
1808 : : closesocket(s);
1809 : : return -1;
1810 : : }
1811 : : handles[0] = (int) tmp_sock;
1812 : :
1813 : : closesocket(s);
1814 : : return 0;
1815 : : }
1816 : :
1817 : : #endif /* WIN32 */
|