Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : *
5 : : * Parallel support for pg_dump and pg_restore
6 : : *
7 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_dump/parallel.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : /*
17 : : * Parallel operation works like this:
18 : : *
19 : : * The original, leader process calls ParallelBackupStart(), which forks off
20 : : * the desired number of worker processes, which each enter WaitForCommands().
21 : : *
22 : : * The leader process dispatches an individual work item to one of the worker
23 : : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : : * The worker process receives and decodes the command and passes it to the
26 : : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : : * which are routines of the current archive format. That routine performs
28 : : * the required action (dump or restore) and returns an integer status code.
29 : : * This is passed back to the leader where we pass it to the
30 : : * ParallelCompletionPtr callback function that was passed to
31 : : * DispatchJobForTocEntry(). The callback function does state updating
32 : : * for the leader control logic in pg_backup_archiver.c.
33 : : *
34 : : * In principle additional archive-format-specific information might be needed
35 : : * in commands or worker status responses, but so far that hasn't proved
36 : : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : : * data structures. Remember that we have forked off the workers only after
38 : : * we have read in the catalog. That's why our worker processes can also
39 : : * access the catalog information. (In the Windows case, the workers are
40 : : * threads in the same process. To avoid problems, they work with cloned
41 : : * copies of the Archive data structure; see RunWorker().)
42 : : *
43 : : * In the leader process, the workerStatus field for each worker has one of
44 : : * the following values:
45 : : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : : * WRKR_IDLE: it's waiting for a command
47 : : * WRKR_WORKING: it's working on a command
48 : : * WRKR_TERMINATED: process ended
49 : : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : : * state, and must be NULL in other states.
51 : : */
52 : :
53 : : #include "postgres_fe.h"
54 : :
55 : : #ifndef WIN32
56 : : #include <sys/select.h>
57 : : #include <sys/wait.h>
58 : : #include <signal.h>
59 : : #include <unistd.h>
60 : : #include <fcntl.h>
61 : : #endif
62 : :
63 : : #include "fe_utils/string_utils.h"
64 : : #include "parallel.h"
65 : : #include "pg_backup_utils.h"
66 : : #ifdef WIN32
67 : : #include "port/pg_bswap.h"
68 : : #endif
69 : :
70 : : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 : : #define PIPE_READ 0
72 : : #define PIPE_WRITE 1
73 : :
74 : : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 : :
76 : : /* Worker process statuses */
77 : : typedef enum
78 : : {
79 : : WRKR_NOT_STARTED = 0,
80 : : WRKR_IDLE,
81 : : WRKR_WORKING,
82 : : WRKR_TERMINATED,
83 : : } T_WorkerStatus;
84 : :
85 : : #define WORKER_IS_RUNNING(workerStatus) \
86 : : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 : :
88 : : /*
89 : : * Private per-parallel-worker state (typedef for this is in parallel.h).
90 : : *
91 : : * Much of this is valid only in the leader process (or, on Windows, should
92 : : * be touched only by the leader thread). But the AH field should be touched
93 : : * only by workers. The pipe descriptors are valid everywhere.
94 : : */
95 : : struct ParallelSlot
96 : : {
97 : : T_WorkerStatus workerStatus; /* see enum above */
98 : :
99 : : /* These fields are valid if workerStatus == WRKR_WORKING: */
100 : : ParallelCompletionPtr callback; /* function to call on completion */
101 : : void *callback_data; /* passthrough data for it */
102 : :
103 : : ArchiveHandle *AH; /* Archive data worker is using */
104 : :
105 : : int pipeRead; /* leader's end of the pipes */
106 : : int pipeWrite;
107 : : int pipeRevRead; /* child's end of the pipes */
108 : : int pipeRevWrite;
109 : :
110 : : /* Child process/thread identity info: */
111 : : #ifdef WIN32
112 : : uintptr_t hThread;
113 : : unsigned int threadId;
114 : : #else
115 : : pid_t pid;
116 : : #endif
117 : : };
118 : :
119 : : #ifdef WIN32
120 : :
121 : : /*
122 : : * Structure to hold info passed by _beginthreadex() to the function it calls
123 : : * via its single allowed argument.
124 : : */
125 : : typedef struct
126 : : {
127 : : ArchiveHandle *AH; /* leader database connection */
128 : : ParallelSlot *slot; /* this worker's parallel slot */
129 : : } WorkerInfo;
130 : :
131 : : /* Windows implementation of pipe access */
132 : : static int pgpipe(int handles[2]);
133 : : #define piperead(a,b,c) recv(a,b,c,0)
134 : : #define pipewrite(a,b,c) send(a,b,c,0)
135 : :
136 : : #else /* !WIN32 */
137 : :
138 : : /* Non-Windows implementation of pipe access */
139 : : #define pgpipe(a) pipe(a)
140 : : #define piperead(a,b,c) read(a,b,c)
141 : : #define pipewrite(a,b,c) write(a,b,c)
142 : :
143 : : #endif /* WIN32 */
144 : :
145 : : /*
146 : : * State info for archive_close_connection() shutdown callback.
147 : : */
148 : : typedef struct ShutdownInformation
149 : : {
150 : : ParallelState *pstate;
151 : : Archive *AHX;
152 : : } ShutdownInformation;
153 : :
154 : : static ShutdownInformation shutdown_info;
155 : :
156 : : /*
157 : : * State info for signal handling.
158 : : * We assume signal_info initializes to zeroes.
159 : : *
160 : : * On Unix, myAH is the leader DB connection in the leader process, and the
161 : : * worker's own connection in worker processes. On Windows, we have only one
162 : : * instance of signal_info, so myAH is the leader connection and the worker
163 : : * connections must be dug out of pstate->parallelSlot[].
164 : : */
165 : : typedef struct DumpSignalInformation
166 : : {
167 : : ArchiveHandle *myAH; /* database connection to issue cancel for */
168 : : ParallelState *pstate; /* parallel state, if any */
169 : : bool handler_set; /* signal handler set up in this process? */
170 : : #ifndef WIN32
171 : : bool am_worker; /* am I a worker process? */
172 : : #endif
173 : : } DumpSignalInformation;
174 : :
175 : : static volatile DumpSignalInformation signal_info;
176 : :
177 : : #ifdef WIN32
178 : : static CRITICAL_SECTION signal_info_lock;
179 : : #endif
180 : :
181 : : /*
182 : : * Write a simple string to stderr --- must be safe in a signal handler.
183 : : * We ignore the write() result since there's not much we could do about it.
184 : : * Certain compilers make that harder than it ought to be.
185 : : */
186 : : #define write_stderr(str) \
187 : : do { \
188 : : const char *str_ = (str); \
189 : : int rc_; \
190 : : rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 : : (void) rc_; \
192 : : } while (0)
193 : :
194 : :
195 : : #ifdef WIN32
196 : : /* file-scope variables */
197 : : static DWORD tls_index;
198 : :
199 : : /* globally visible variables (needed by exit_nicely) */
200 : : bool parallel_init_done = false;
201 : : DWORD mainThreadId;
202 : : #endif /* WIN32 */
203 : :
204 : : /* Local function prototypes */
205 : : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 : : static void archive_close_connection(int code, void *arg);
207 : : static void ShutdownWorkersHard(ParallelState *pstate);
208 : : static void WaitForTerminatingWorkers(ParallelState *pstate);
209 : : static void set_cancel_handler(void);
210 : : static void set_cancel_pstate(ParallelState *pstate);
211 : : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 : : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 : : static int GetIdleWorker(ParallelState *pstate);
214 : : static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 : : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 : : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 : : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 : : bool do_wait);
219 : : static char *getMessageFromLeader(int pipefd[2]);
220 : : static void sendMessageToLeader(int pipefd[2], const char *str);
221 : : static int select_loop(int maxFd, fd_set *workerset);
222 : : static char *getMessageFromWorker(ParallelState *pstate,
223 : : bool do_wait, int *worker);
224 : : static void sendMessageToWorker(ParallelState *pstate,
225 : : int worker, const char *str);
226 : : static char *readMessageFromPipe(int fd);
227 : :
228 : : #define messageStartsWith(msg, prefix) \
229 : : (strncmp(msg, prefix, strlen(prefix)) == 0)
230 : :
231 : :
232 : : /*
233 : : * Initialize parallel dump support --- should be called early in process
234 : : * startup. (Currently, this is called whether or not we intend parallel
235 : : * activity.)
236 : : */
237 : : void
4546 heikki.linnakangas@i 238 :GIC 386 : init_parallel_dump_utils(void)
239 : : {
240 : : #ifdef WIN32
241 : : if (!parallel_init_done)
242 : : {
243 : : WSADATA wsaData;
244 : : int err;
245 : :
246 : : /* Prepare for threaded operation */
247 : : tls_index = TlsAlloc();
248 : : mainThreadId = GetCurrentThreadId();
249 : :
250 : : /* Initialize socket access */
251 : : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 : : if (err != 0)
253 : : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254 : :
255 : : parallel_init_done = true;
256 : : }
257 : : #endif
258 : 386 : }
259 : :
260 : : /*
261 : : * Find the ParallelSlot for the current worker process or thread.
262 : : *
263 : : * Returns NULL if no matching slot is found (this implies we're the leader).
264 : : */
265 : : static ParallelSlot *
4549 andrew@dunslane.net 266 :UIC 0 : GetMyPSlot(ParallelState *pstate)
267 : : {
268 : : int i;
269 : :
270 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
271 : : {
272 : : #ifdef WIN32
273 : : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274 : : #else
275 [ # # ]: 0 : if (pstate->parallelSlot[i].pid == getpid())
276 : : #endif
277 : 0 : return &(pstate->parallelSlot[i]);
278 : : }
279 : :
280 : 0 : return NULL;
281 : : }
282 : :
283 : : /*
284 : : * A thread-local version of getLocalPQExpBuffer().
285 : : *
286 : : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 : : * thread, which is much better than one per fmtId/fmtQualifiedId call.
288 : : */
289 : : #ifdef WIN32
290 : : static PQExpBuffer
291 : : getThreadLocalPQExpBuffer(void)
292 : : {
293 : : /*
294 : : * The Tls code goes awry if we use a static var, so we provide for both
295 : : * static and auto, and omit any use of the static var when using Tls. We
296 : : * rely on TlsGetValue() to return 0 if the value is not yet set.
297 : : */
298 : : static PQExpBuffer s_id_return = NULL;
299 : : PQExpBuffer id_return;
300 : :
301 : : if (parallel_init_done)
302 : : id_return = (PQExpBuffer) TlsGetValue(tls_index);
303 : : else
304 : : id_return = s_id_return;
305 : :
306 : : if (id_return) /* first time through? */
307 : : {
308 : : /* same buffer, just wipe contents */
309 : : resetPQExpBuffer(id_return);
310 : : }
311 : : else
312 : : {
313 : : /* new buffer */
314 : : id_return = createPQExpBuffer();
315 : : if (parallel_init_done)
316 : : TlsSetValue(tls_index, id_return);
317 : : else
318 : : s_id_return = id_return;
319 : : }
320 : :
321 : : return id_return;
322 : : }
323 : : #endif /* WIN32 */
324 : :
325 : : /*
326 : : * pg_dump and pg_restore call this to register the cleanup handler
327 : : * as soon as they've created the ArchiveHandle.
328 : : */
329 : : void
4549 andrew@dunslane.net 330 :GIC 263 : on_exit_close_archive(Archive *AHX)
331 : : {
332 : 263 : shutdown_info.AHX = AHX;
333 : 263 : on_exit_nicely(archive_close_connection, &shutdown_info);
334 : 263 : }
335 : :
336 : : /*
337 : : * on_exit_nicely handler for shutting down database connections and
338 : : * worker processes cleanly.
339 : : */
340 : : static void
341 : 202 : archive_close_connection(int code, void *arg)
342 : : {
343 : 202 : ShutdownInformation *si = (ShutdownInformation *) arg;
344 : :
345 [ - + ]: 202 : if (si->pstate)
346 : : {
347 : : /* In parallel mode, must figure out who we are */
4549 andrew@dunslane.net 348 :UIC 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
349 : :
350 [ # # ]: 0 : if (!slot)
351 : : {
352 : : /*
353 : : * We're the leader. Forcibly shut down workers, then close our
354 : : * own database connection, if any.
355 : : */
3383 tgl@sss.pgh.pa.us 356 : 0 : ShutdownWorkersHard(si->pstate);
357 : :
3391 358 [ # # ]: 0 : if (si->AHX)
359 : 0 : DisconnectDatabase(si->AHX);
360 : : }
361 : : else
362 : : {
363 : : /*
364 : : * We're a worker. Shut down our own DB connection if any. On
365 : : * Windows, we also have to close our communication sockets, to
366 : : * emulate what will happen on Unix when the worker process exits.
367 : : * (Without this, if this is a premature exit, the leader would
368 : : * fail to detect it because there would be no EOF condition on
369 : : * the other end of the pipe.)
370 : : */
3266 371 [ # # ]: 0 : if (slot->AH)
372 : 0 : DisconnectDatabase(&(slot->AH->public));
373 : :
374 : : #ifdef WIN32
375 : : closesocket(slot->pipeRevRead);
376 : : closesocket(slot->pipeRevWrite);
377 : : #endif
378 : : }
379 : : }
380 : : else
381 : : {
382 : : /* Non-parallel operation: just kill the leader DB connection */
3391 tgl@sss.pgh.pa.us 383 [ + - ]:GIC 202 : if (si->AHX)
384 : 202 : DisconnectDatabase(si->AHX);
385 : : }
4549 andrew@dunslane.net 386 : 202 : }
387 : :
388 : : /*
389 : : * Forcibly shut down any remaining workers, waiting for them to finish.
390 : : *
391 : : * Note that we don't expect to come here during normal exit (the workers
392 : : * should be long gone, and the ParallelState too). We're only here in a
393 : : * pg_fatal() situation, so intervening to cancel active commands is
394 : : * appropriate.
395 : : */
396 : : static void
4549 andrew@dunslane.net 397 :UIC 0 : ShutdownWorkersHard(ParallelState *pstate)
398 : : {
399 : : int i;
400 : :
401 : : /*
402 : : * Close our write end of the sockets so that any workers waiting for
403 : : * commands know they can exit. (Note: some of the pipeWrite fields might
404 : : * still be zero, if we failed to initialize all the workers. Hence, just
405 : : * ignore errors here.)
406 : : */
407 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
408 : 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
409 : :
410 : : /*
411 : : * Force early termination of any commands currently in progress.
412 : : */
413 : : #ifndef WIN32
414 : : /* On non-Windows, send SIGTERM to each worker process. */
415 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
416 : : {
3383 tgl@sss.pgh.pa.us 417 : 0 : pid_t pid = pstate->parallelSlot[i].pid;
418 : :
419 [ # # ]: 0 : if (pid != 0)
420 : 0 : kill(pid, SIGTERM);
421 : : }
422 : : #else
423 : :
424 : : /*
425 : : * On Windows, send query cancels directly to the workers' backends. Use
426 : : * a critical section to ensure worker threads don't change state.
427 : : */
428 : : EnterCriticalSection(&signal_info_lock);
429 : : for (i = 0; i < pstate->numWorkers; i++)
430 : : {
431 : : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
432 : : char errbuf[1];
433 : :
434 : : if (AH != NULL && AH->connCancel != NULL)
435 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
436 : : }
437 : : LeaveCriticalSection(&signal_info_lock);
438 : : #endif
439 : :
440 : : /* Now wait for them to terminate. */
4549 andrew@dunslane.net 441 : 0 : WaitForTerminatingWorkers(pstate);
442 : 0 : }
443 : :
444 : : /*
445 : : * Wait for all workers to terminate.
446 : : */
447 : : static void
4549 andrew@dunslane.net 448 :GIC 13 : WaitForTerminatingWorkers(ParallelState *pstate)
449 : : {
450 [ + + ]: 41 : while (!HasEveryWorkerTerminated(pstate))
451 : : {
452 : 28 : ParallelSlot *slot = NULL;
453 : : int j;
454 : :
455 : : #ifndef WIN32
456 : : /* On non-Windows, use wait() to wait for next worker to end */
457 : : int status;
458 : 28 : pid_t pid = wait(&status);
459 : :
460 : : /* Find dead worker's slot, and clear the PID field */
461 [ + - ]: 45 : for (j = 0; j < pstate->numWorkers; j++)
462 : : {
3389 tgl@sss.pgh.pa.us 463 : 45 : slot = &(pstate->parallelSlot[j]);
464 [ + + ]: 45 : if (slot->pid == pid)
465 : : {
466 : 28 : slot->pid = 0;
467 : 28 : break;
468 : : }
469 : : }
470 : : #else /* WIN32 */
471 : : /* On Windows, we must use WaitForMultipleObjects() */
472 : : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
473 : : int nrun = 0;
474 : : DWORD ret;
475 : : uintptr_t hThread;
476 : :
477 : : for (j = 0; j < pstate->numWorkers; j++)
478 : : {
479 : : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
480 : : {
481 : : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
482 : : nrun++;
483 : : }
484 : : }
485 : : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
486 : : Assert(ret != WAIT_FAILED);
487 : : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
488 : : free(lpHandles);
489 : :
490 : : /* Find dead worker's slot, and clear the hThread field */
491 : : for (j = 0; j < pstate->numWorkers; j++)
492 : : {
493 : : slot = &(pstate->parallelSlot[j]);
494 : : if (slot->hThread == hThread)
495 : : {
496 : : /* For cleanliness, close handles for dead threads */
497 : : CloseHandle((HANDLE) slot->hThread);
498 : : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
499 : : break;
500 : : }
501 : : }
502 : : #endif /* WIN32 */
503 : :
504 : : /* On all platforms, update workerStatus and te[] as well */
505 [ - + ]: 28 : Assert(j < pstate->numWorkers);
4549 andrew@dunslane.net 506 : 28 : slot->workerStatus = WRKR_TERMINATED;
3266 tgl@sss.pgh.pa.us 507 : 28 : pstate->te[j] = NULL;
508 : : }
4549 andrew@dunslane.net 509 : 13 : }
510 : :
511 : :
512 : : /*
513 : : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
514 : : *
515 : : * This doesn't quite belong in this module, but it needs access to the
516 : : * ParallelState data, so there's not really a better place either.
517 : : *
518 : : * When we get a cancel interrupt, we could just die, but in pg_restore that
519 : : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
520 : : * for a long time. Instead, we try to send a cancel request and then die.
521 : : * pg_dump probably doesn't really need this, but we might as well use it
522 : : * there too. Note that sending the cancel directly from the signal handler
523 : : * is safe because PQcancel() is written to make it so.
524 : : *
525 : : * In parallel operation on Unix, each process is responsible for canceling
526 : : * its own connection (this must be so because nobody else has access to it).
527 : : * Furthermore, the leader process should attempt to forward its signal to
528 : : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
529 : : * needed because typing control-C at the console would deliver SIGINT to
530 : : * every member of the terminal process group --- but in other scenarios it
531 : : * might be that only the leader gets signaled.
532 : : *
533 : : * On Windows, the cancel handler runs in a separate thread, because that's
534 : : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
535 : : * cancels on all active connections, and then return FALSE, which will allow
536 : : * the process to die. For safety's sake, we use a critical section to
537 : : * protect the PGcancel structures against being changed while the signal
538 : : * thread runs.
539 : : */
540 : :
541 : : #ifndef WIN32
542 : :
543 : : /*
544 : : * Signal handler (Unix only)
545 : : */
546 : : static void
3388 tgl@sss.pgh.pa.us 547 :UIC 0 : sigTermHandler(SIGNAL_ARGS)
548 : : {
549 : : int i;
550 : : char errbuf[1];
551 : :
552 : : /*
553 : : * Some platforms allow delivery of new signals to interrupt an active
554 : : * signal handler. That could muck up our attempt to send PQcancel, so
555 : : * disable the signals that set_cancel_handler enabled.
556 : : */
3383 557 : 0 : pqsignal(SIGINT, SIG_IGN);
558 : 0 : pqsignal(SIGTERM, SIG_IGN);
559 : 0 : pqsignal(SIGQUIT, SIG_IGN);
560 : :
561 : : /*
562 : : * If we're in the leader, forward signal to all workers. (It seems best
563 : : * to do this before PQcancel; killing the leader transaction will result
564 : : * in invalid-snapshot errors from active workers, which maybe we can
565 : : * quiet by killing workers first.) Ignore any errors.
566 : : */
567 [ # # ]: 0 : if (signal_info.pstate != NULL)
568 : : {
569 [ # # ]: 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
570 : : {
571 : 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
572 : :
573 [ # # ]: 0 : if (pid != 0)
574 : 0 : kill(pid, SIGTERM);
575 : : }
576 : : }
577 : :
578 : : /*
579 : : * Send QueryCancel if we have a connection to send to. Ignore errors,
580 : : * there's not much we can do about them anyway.
581 : : */
582 [ # # # # ]: 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
583 : 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
584 : :
585 : : /*
586 : : * Report we're quitting, using nothing more complicated than write(2).
587 : : * When in parallel operation, only the leader process should do this.
588 : : */
589 [ # # ]: 0 : if (!signal_info.am_worker)
590 : : {
591 [ # # ]: 0 : if (progname)
592 : : {
593 : 0 : write_stderr(progname);
594 : 0 : write_stderr(": ");
595 : : }
596 : 0 : write_stderr("terminated by user\n");
597 : : }
598 : :
599 : : /*
600 : : * And die, using _exit() not exit() because the latter will invoke atexit
601 : : * handlers that can fail if we interrupted related code.
602 : : */
2056 603 : 0 : _exit(1);
604 : : }
605 : :
606 : : /*
607 : : * Enable cancel interrupt handler, if not already done.
608 : : */
609 : : static void
577 dgustafsson@postgres 610 :GIC 613 : set_cancel_handler(void)
611 : : {
612 : : /*
613 : : * When forking, signal_info.handler_set will propagate into the new
614 : : * process, but that's fine because the signal handler state does too.
615 : : */
3383 tgl@sss.pgh.pa.us 616 [ + + ]: 613 : if (!signal_info.handler_set)
617 : : {
618 : 234 : signal_info.handler_set = true;
619 : :
620 : 234 : pqsignal(SIGINT, sigTermHandler);
621 : 234 : pqsignal(SIGTERM, sigTermHandler);
622 : 234 : pqsignal(SIGQUIT, sigTermHandler);
623 : : }
624 : 613 : }
625 : :
626 : : #else /* WIN32 */
627 : :
628 : : /*
629 : : * Console interrupt handler --- runs in a newly-started thread.
630 : : *
631 : : * After stopping other threads and sending cancel requests on all open
632 : : * connections, we return FALSE which will allow the default ExitProcess()
633 : : * action to be taken.
634 : : */
635 : : static BOOL WINAPI
636 : : consoleHandler(DWORD dwCtrlType)
637 : : {
638 : : int i;
639 : : char errbuf[1];
640 : :
641 : : if (dwCtrlType == CTRL_C_EVENT ||
642 : : dwCtrlType == CTRL_BREAK_EVENT)
643 : : {
644 : : /* Critical section prevents changing data we look at here */
645 : : EnterCriticalSection(&signal_info_lock);
646 : :
647 : : /*
648 : : * If in parallel mode, stop worker threads and send QueryCancel to
649 : : * their connected backends. The main point of stopping the worker
650 : : * threads is to keep them from reporting the query cancels as errors,
651 : : * which would clutter the user's screen. We needn't stop the leader
652 : : * thread since it won't be doing much anyway. Do this before
653 : : * canceling the main transaction, else we might get invalid-snapshot
654 : : * errors reported before we can stop the workers. Ignore errors,
655 : : * there's not much we can do about them anyway.
656 : : */
657 : : if (signal_info.pstate != NULL)
658 : : {
659 : : for (i = 0; i < signal_info.pstate->numWorkers; i++)
660 : : {
661 : : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
662 : : ArchiveHandle *AH = slot->AH;
663 : : HANDLE hThread = (HANDLE) slot->hThread;
664 : :
665 : : /*
666 : : * Using TerminateThread here may leave some resources leaked,
667 : : * but it doesn't matter since we're about to end the whole
668 : : * process.
669 : : */
670 : : if (hThread != INVALID_HANDLE_VALUE)
671 : : TerminateThread(hThread, 0);
672 : :
673 : : if (AH != NULL && AH->connCancel != NULL)
674 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
675 : : }
676 : : }
677 : :
678 : : /*
679 : : * Send QueryCancel to leader connection, if enabled. Ignore errors,
680 : : * there's not much we can do about them anyway.
681 : : */
682 : : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
683 : : (void) PQcancel(signal_info.myAH->connCancel,
684 : : errbuf, sizeof(errbuf));
685 : :
686 : : LeaveCriticalSection(&signal_info_lock);
687 : :
688 : : /*
689 : : * Report we're quitting, using nothing more complicated than
690 : : * write(2). (We might be able to get away with using pg_log_*()
691 : : * here, but since we terminated other threads uncleanly above, it
692 : : * seems better to assume as little as possible.)
693 : : */
694 : : if (progname)
695 : : {
696 : : write_stderr(progname);
697 : : write_stderr(": ");
698 : : }
699 : : write_stderr("terminated by user\n");
700 : : }
701 : :
702 : : /* Always return FALSE to allow signal handling to continue */
703 : : return FALSE;
704 : : }
705 : :
706 : : /*
707 : : * Enable cancel interrupt handler, if not already done.
708 : : */
709 : : static void
710 : : set_cancel_handler(void)
711 : : {
712 : : if (!signal_info.handler_set)
713 : : {
714 : : signal_info.handler_set = true;
715 : :
716 : : InitializeCriticalSection(&signal_info_lock);
717 : :
718 : : SetConsoleCtrlHandler(consoleHandler, TRUE);
719 : : }
720 : : }
721 : :
722 : : #endif /* WIN32 */
723 : :
724 : :
725 : : /*
726 : : * set_archive_cancel_info
727 : : *
728 : : * Fill AH->connCancel with cancellation info for the specified database
729 : : * connection; or clear it if conn is NULL.
730 : : */
731 : : void
732 : 613 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
733 : : {
734 : : PGcancel *oldConnCancel;
735 : :
736 : : /*
737 : : * Activate the interrupt handler if we didn't yet in this process. On
738 : : * Windows, this also initializes signal_info_lock; therefore it's
739 : : * important that this happen at least once before we fork off any
740 : : * threads.
741 : : */
577 dgustafsson@postgres 742 : 613 : set_cancel_handler();
743 : :
744 : : /*
745 : : * On Unix, we assume that storing a pointer value is atomic with respect
746 : : * to any possible signal interrupt. On Windows, use a critical section.
747 : : */
748 : :
749 : : #ifdef WIN32
750 : : EnterCriticalSection(&signal_info_lock);
751 : : #endif
752 : :
753 : : /* Free the old one if we have one */
3383 tgl@sss.pgh.pa.us 754 : 613 : oldConnCancel = AH->connCancel;
755 : : /* be sure interrupt handler doesn't use pointer while freeing */
756 : 613 : AH->connCancel = NULL;
757 : :
758 [ + + ]: 613 : if (oldConnCancel != NULL)
759 : 330 : PQfreeCancel(oldConnCancel);
760 : :
761 : : /* Set the new one if specified */
762 [ + + ]: 613 : if (conn)
763 : 332 : AH->connCancel = PQgetCancel(conn);
764 : :
765 : : /*
766 : : * On Unix, there's only ever one active ArchiveHandle per process, so we
767 : : * can just set signal_info.myAH unconditionally. On Windows, do that
768 : : * only in the main thread; worker threads have to make sure their
769 : : * ArchiveHandle appears in the pstate data, which is dealt with in
770 : : * RunWorker().
771 : : */
772 : : #ifndef WIN32
773 : 613 : signal_info.myAH = AH;
774 : : #else
775 : : if (mainThreadId == GetCurrentThreadId())
776 : : signal_info.myAH = AH;
777 : : #endif
778 : :
779 : : #ifdef WIN32
780 : : LeaveCriticalSection(&signal_info_lock);
781 : : #endif
782 : 613 : }
783 : :
784 : : /*
785 : : * set_cancel_pstate
786 : : *
787 : : * Set signal_info.pstate to point to the specified ParallelState, if any.
788 : : * We need this mainly to have an interlock against Windows signal thread.
789 : : */
790 : : static void
791 : 26 : set_cancel_pstate(ParallelState *pstate)
792 : : {
793 : : #ifdef WIN32
794 : : EnterCriticalSection(&signal_info_lock);
795 : : #endif
796 : :
797 : 26 : signal_info.pstate = pstate;
798 : :
799 : : #ifdef WIN32
800 : : LeaveCriticalSection(&signal_info_lock);
801 : : #endif
802 : 26 : }
803 : :
804 : : /*
805 : : * set_cancel_slot_archive
806 : : *
807 : : * Set ParallelSlot's AH field to point to the specified archive, if any.
808 : : * We need this mainly to have an interlock against Windows signal thread.
809 : : */
810 : : static void
811 : 56 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
812 : : {
813 : : #ifdef WIN32
814 : : EnterCriticalSection(&signal_info_lock);
815 : : #endif
816 : :
3266 817 : 56 : slot->AH = AH;
818 : :
819 : : #ifdef WIN32
820 : : LeaveCriticalSection(&signal_info_lock);
821 : : #endif
3383 822 : 56 : }
823 : :
824 : :
825 : : /*
826 : : * This function is called by both Unix and Windows variants to set up
827 : : * and run a worker process. Caller should exit the process (or thread)
828 : : * upon return.
829 : : */
830 : : static void
831 : 28 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
832 : : {
833 : : int pipefd[2];
834 : :
835 : : /* fetch child ends of pipes */
836 : 28 : pipefd[PIPE_READ] = slot->pipeRevRead;
837 : 28 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
838 : :
839 : : /*
840 : : * Clone the archive so that we have our own state to work with, and in
841 : : * particular our own database connection.
842 : : *
843 : : * We clone on Unix as well as Windows, even though technically we don't
844 : : * need to because fork() gives us a copy in our own address space
845 : : * already. But CloneArchive resets the state information and also clones
846 : : * the database connection which both seem kinda helpful.
847 : : */
848 : 28 : AH = CloneArchive(AH);
849 : :
850 : : /* Remember cloned archive where signal handler can find it */
851 : 28 : set_cancel_slot_archive(slot, AH);
852 : :
853 : : /*
854 : : * Call the setup worker function that's defined in the ArchiveHandle.
855 : : */
3524 856 : 28 : (AH->SetupWorkerPtr) ((Archive *) AH);
857 : :
858 : : /*
859 : : * Execute commands until done.
860 : : */
861 : 28 : WaitForCommands(AH, pipefd);
862 : :
863 : : /*
864 : : * Disconnect from database and clean up.
865 : : */
3383 866 : 28 : set_cancel_slot_archive(slot, NULL);
867 : 28 : DisconnectDatabase(&(AH->public));
868 : 28 : DeCloneArchive(AH);
4549 andrew@dunslane.net 869 : 28 : }
870 : :
871 : : /*
872 : : * Thread base function for Windows
873 : : */
874 : : #ifdef WIN32
875 : : static unsigned __stdcall
876 : : init_spawned_worker_win32(WorkerInfo *wi)
877 : : {
878 : : ArchiveHandle *AH = wi->AH;
879 : : ParallelSlot *slot = wi->slot;
880 : :
881 : : /* Don't need WorkerInfo anymore */
882 : : free(wi);
883 : :
884 : : /* Run the worker ... */
885 : : RunWorker(AH, slot);
886 : :
887 : : /* Exit the thread */
888 : : _endthreadex(0);
889 : : return 0;
890 : : }
891 : : #endif /* WIN32 */
892 : :
893 : : /*
894 : : * This function starts a parallel dump or restore by spawning off the worker
895 : : * processes. For Windows, it creates a number of threads; on Unix the
896 : : * workers are created with fork().
897 : : */
898 : : ParallelState *
3524 tgl@sss.pgh.pa.us 899 : 15 : ParallelBackupStart(ArchiveHandle *AH)
900 : : {
901 : : ParallelState *pstate;
902 : : int i;
903 : :
4549 andrew@dunslane.net 904 [ - + ]: 15 : Assert(AH->public.numWorkers > 0);
905 : :
906 : 15 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
907 : :
908 : 15 : pstate->numWorkers = AH->public.numWorkers;
3266 tgl@sss.pgh.pa.us 909 : 15 : pstate->te = NULL;
4549 andrew@dunslane.net 910 : 15 : pstate->parallelSlot = NULL;
911 : :
912 [ + + ]: 15 : if (AH->public.numWorkers == 1)
913 : 2 : return pstate;
914 : :
915 : : /* Create status arrays, being sure to initialize all fields to 0 */
3266 tgl@sss.pgh.pa.us 916 : 13 : pstate->te = (TocEntry **)
917 : 13 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
918 : 13 : pstate->parallelSlot = (ParallelSlot *)
919 : 13 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
920 : :
921 : : #ifdef WIN32
922 : : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
923 : : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
924 : : #endif
925 : :
926 : : /*
927 : : * Set the pstate in shutdown_info, to tell the exit handler that it must
928 : : * clean up workers as well as the main database connection. But we don't
929 : : * set this in signal_info yet, because we don't want child processes to
930 : : * inherit non-NULL signal_info.pstate.
931 : : */
3383 932 : 13 : shutdown_info.pstate = pstate;
933 : :
934 : : /*
935 : : * Temporarily disable query cancellation on the leader connection. This
936 : : * ensures that child processes won't inherit valid AH->connCancel
937 : : * settings and thus won't try to issue cancels against the leader's
938 : : * connection. No harm is done if we fail while it's disabled, because
939 : : * the leader connection is idle at this point anyway.
940 : : */
941 : 13 : set_archive_cancel_info(AH, NULL);
942 : :
943 : : /* Ensure stdio state is quiesced before forking */
944 : 13 : fflush(NULL);
945 : :
946 : : /* Create desired number of workers */
4549 andrew@dunslane.net 947 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
948 : : {
949 : : #ifdef WIN32
950 : : WorkerInfo *wi;
951 : : uintptr_t handle;
952 : : #else
953 : : pid_t pid;
954 : : #endif
3383 tgl@sss.pgh.pa.us 955 : 28 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
956 : : int pipeMW[2],
957 : : pipeWM[2];
958 : :
959 : : /* Create communication pipes for this worker */
4549 andrew@dunslane.net 960 [ + - - + ]: 28 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
1247 tgl@sss.pgh.pa.us 961 :UIC 0 : pg_fatal("could not create communication channels: %m");
962 : :
963 : : /* leader's ends of the pipes */
3383 tgl@sss.pgh.pa.us 964 :GIC 28 : slot->pipeRead = pipeWM[PIPE_READ];
965 : 28 : slot->pipeWrite = pipeMW[PIPE_WRITE];
966 : : /* child's ends of the pipes */
967 : 28 : slot->pipeRevRead = pipeMW[PIPE_READ];
968 : 28 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
969 : :
970 : : #ifdef WIN32
971 : : /* Create transient structure to pass args to worker function */
972 : : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
973 : :
974 : : wi->AH = AH;
975 : : wi->slot = slot;
976 : :
977 : : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
978 : : wi, 0, &(slot->threadId));
979 : : slot->hThread = handle;
980 : : slot->workerStatus = WRKR_IDLE;
981 : : #else /* !WIN32 */
4549 andrew@dunslane.net 982 : 28 : pid = fork();
983 [ + + ]: 56 : if (pid == 0)
984 : : {
985 : : /* we are the worker */
986 : : int j;
987 : :
988 : : /* this is needed for GetMyPSlot() */
3383 tgl@sss.pgh.pa.us 989 : 28 : slot->pid = getpid();
990 : :
991 : : /* instruct signal handler that we're in a worker now */
992 : 28 : signal_info.am_worker = true;
993 : :
994 : : /* close read end of Worker -> Leader */
4549 andrew@dunslane.net 995 : 28 : closesocket(pipeWM[PIPE_READ]);
996 : : /* close write end of Leader -> Worker */
997 : 28 : closesocket(pipeMW[PIPE_WRITE]);
998 : :
999 : : /*
1000 : : * Close all inherited fds for communication of the leader with
1001 : : * previously-forked workers.
1002 : : */
1003 [ + + ]: 45 : for (j = 0; j < i; j++)
1004 : : {
1005 : 17 : closesocket(pstate->parallelSlot[j].pipeRead);
1006 : 17 : closesocket(pstate->parallelSlot[j].pipeWrite);
1007 : : }
1008 : :
1009 : : /* Run the worker ... */
3383 tgl@sss.pgh.pa.us 1010 : 28 : RunWorker(AH, slot);
1011 : :
1012 : : /* We can just exit(0) when done */
4549 andrew@dunslane.net 1013 : 28 : exit(0);
1014 : : }
1015 [ - + ]: 28 : else if (pid < 0)
1016 : : {
1017 : : /* fork failed */
1247 tgl@sss.pgh.pa.us 1018 :UIC 0 : pg_fatal("could not create worker process: %m");
1019 : : }
1020 : :
1021 : : /* In Leader after successful fork */
3383 tgl@sss.pgh.pa.us 1022 :GIC 28 : slot->pid = pid;
2045 1023 : 28 : slot->workerStatus = WRKR_IDLE;
1024 : :
1025 : : /* close read end of Leader -> Worker */
4549 andrew@dunslane.net 1026 : 28 : closesocket(pipeMW[PIPE_READ]);
1027 : : /* close write end of Worker -> Leader */
1028 : 28 : closesocket(pipeWM[PIPE_WRITE]);
1029 : : #endif /* WIN32 */
1030 : : }
1031 : :
1032 : : /*
1033 : : * Having forked off the workers, disable SIGPIPE so that leader isn't
1034 : : * killed if it tries to send a command to a dead worker. We don't want
1035 : : * the workers to inherit this setting, though.
1036 : : */
1037 : : #ifndef WIN32
3383 tgl@sss.pgh.pa.us 1038 : 13 : pqsignal(SIGPIPE, SIG_IGN);
1039 : : #endif
1040 : :
1041 : : /*
1042 : : * Re-establish query cancellation on the leader connection.
1043 : : */
1044 : 13 : set_archive_cancel_info(AH, AH->connection);
1045 : :
1046 : : /*
1047 : : * Tell the cancel signal handler to forward signals to worker processes,
1048 : : * too. (As with query cancel, we did not need this earlier because the
1049 : : * workers have not yet been given anything to do; if we die before this
1050 : : * point, any already-started workers will see EOF and quit promptly.)
1051 : : */
1052 : 13 : set_cancel_pstate(pstate);
1053 : :
4549 andrew@dunslane.net 1054 : 13 : return pstate;
1055 : : }
1056 : :
1057 : : /*
1058 : : * Close down a parallel dump or restore.
1059 : : */
1060 : : void
1061 : 15 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1062 : : {
1063 : : int i;
1064 : :
1065 : : /* No work if non-parallel */
1066 [ + + ]: 15 : if (pstate->numWorkers == 1)
1067 : 2 : return;
1068 : :
1069 : : /* There should not be any unfinished jobs */
1070 [ - + ]: 13 : Assert(IsEveryWorkerIdle(pstate));
1071 : :
1072 : : /* Close the sockets so that the workers know they can exit */
1073 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
1074 : : {
1075 : 28 : closesocket(pstate->parallelSlot[i].pipeRead);
1076 : 28 : closesocket(pstate->parallelSlot[i].pipeWrite);
1077 : : }
1078 : :
1079 : : /* Wait for them to exit */
1080 : 13 : WaitForTerminatingWorkers(pstate);
1081 : :
1082 : : /*
1083 : : * Unlink pstate from shutdown_info, so the exit handler will not try to
1084 : : * use it; and likewise unlink from signal_info.
1085 : : */
1086 : 13 : shutdown_info.pstate = NULL;
3383 tgl@sss.pgh.pa.us 1087 : 13 : set_cancel_pstate(NULL);
1088 : :
1089 : : /* Release state (mere neatnik-ism, since we're about to terminate) */
3266 1090 : 13 : free(pstate->te);
4549 andrew@dunslane.net 1091 : 13 : free(pstate->parallelSlot);
1092 : 13 : free(pstate);
1093 : : }
1094 : :
1095 : : /*
1096 : : * These next four functions handle construction and parsing of the command
1097 : : * strings and response strings for parallel workers.
1098 : : *
1099 : : * Currently, these can be the same regardless of which archive format we are
1100 : : * processing. In future, we might want to let format modules override these
1101 : : * functions to add format-specific data to a command or response.
1102 : : */
1103 : :
1104 : : /*
1105 : : * buildWorkerCommand: format a command string to send to a worker.
1106 : : *
1107 : : * The string is built in the caller-supplied buffer of size buflen.
1108 : : */
1109 : : static void
3266 tgl@sss.pgh.pa.us 1110 : 228 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1111 : : char *buf, int buflen)
1112 : : {
1113 [ + + ]: 228 : if (act == ACT_DUMP)
1114 : 182 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1115 [ + - ]: 46 : else if (act == ACT_RESTORE)
1116 : 46 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1117 : : else
3266 tgl@sss.pgh.pa.us 1118 :UIC 0 : Assert(false);
3266 tgl@sss.pgh.pa.us 1119 :GIC 228 : }
1120 : :
1121 : : /*
1122 : : * parseWorkerCommand: interpret a command string in a worker.
1123 : : */
1124 : : static void
1125 : 228 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1126 : : const char *msg)
1127 : : {
1128 : : DumpId dumpId;
1129 : : int nBytes;
1130 : :
1131 [ + + ]: 228 : if (messageStartsWith(msg, "DUMP "))
1132 : : {
1133 : 182 : *act = ACT_DUMP;
1134 : 182 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1135 [ - + ]: 182 : Assert(nBytes == strlen(msg));
1136 : 182 : *te = getTocEntryByDumpId(AH, dumpId);
1137 [ - + ]: 182 : Assert(*te != NULL);
1138 : : }
1139 [ + - ]: 46 : else if (messageStartsWith(msg, "RESTORE "))
1140 : : {
1141 : 46 : *act = ACT_RESTORE;
1142 : 46 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1143 [ - + ]: 46 : Assert(nBytes == strlen(msg));
1144 : 46 : *te = getTocEntryByDumpId(AH, dumpId);
1145 [ - + ]: 46 : Assert(*te != NULL);
1146 : : }
1147 : : else
1247 tgl@sss.pgh.pa.us 1148 :UIC 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1149 : : msg);
3266 tgl@sss.pgh.pa.us 1150 :GIC 228 : }
1151 : :
1152 : : /*
1153 : : * buildWorkerResponse: format a response string to send to the leader.
1154 : : *
1155 : : * The string is built in the caller-supplied buffer of size buflen.
1156 : : */
1157 : : static void
1158 : 228 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1159 : : char *buf, int buflen)
1160 : : {
1161 [ - + ]: 228 : snprintf(buf, buflen, "OK %d %d %d",
1162 : : te->dumpId,
1163 : : status,
1164 : : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1165 : 228 : }
1166 : :
1167 : : /*
1168 : : * parseWorkerResponse: parse the status message returned by a worker.
1169 : : *
1170 : : * Returns the integer status code, and may update fields of AH and/or te.
1171 : : */
1172 : : static int
1173 : 228 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1174 : : const char *msg)
1175 : : {
1176 : : DumpId dumpId;
1177 : : int nBytes,
1178 : : n_errors;
1179 : 228 : int status = 0;
1180 : :
1181 [ + - ]: 228 : if (messageStartsWith(msg, "OK "))
1182 : : {
1183 : 228 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1184 : :
1185 [ - + ]: 228 : Assert(dumpId == te->dumpId);
1186 [ - + ]: 228 : Assert(nBytes == strlen(msg));
1187 : :
1188 : 228 : AH->public.n_errors += n_errors;
1189 : : }
1190 : : else
1247 tgl@sss.pgh.pa.us 1191 :UIC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1192 : : msg);
1193 : :
3266 tgl@sss.pgh.pa.us 1194 :GIC 228 : return status;
1195 : : }
1196 : :
1197 : : /*
1198 : : * Dispatch a job to some free worker.
1199 : : *
1200 : : * te is the TocEntry to be processed, act is the action to be taken on it.
1201 : : * callback is the function to call on completion of the job.
1202 : : *
1203 : : * If no worker is currently available, this will block, and previously
1204 : : * registered callback functions may be called.
1205 : : */
1206 : : void
1207 : 228 : DispatchJobForTocEntry(ArchiveHandle *AH,
1208 : : ParallelState *pstate,
1209 : : TocEntry *te,
1210 : : T_Action act,
1211 : : ParallelCompletionPtr callback,
1212 : : void *callback_data)
1213 : : {
1214 : : int worker;
1215 : : char buf[256];
1216 : :
1217 : : /* Get a worker, waiting if none are idle */
1218 [ + + ]: 393 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1219 : 165 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1220 : :
1221 : : /* Construct and send command string */
1222 : 228 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1223 : :
1224 : 228 : sendMessageToWorker(pstate, worker, buf);
1225 : :
1226 : : /* Remember worker is busy, and which TocEntry it's working on */
4549 andrew@dunslane.net 1227 : 228 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
3266 tgl@sss.pgh.pa.us 1228 : 228 : pstate->parallelSlot[worker].callback = callback;
1229 : 228 : pstate->parallelSlot[worker].callback_data = callback_data;
1230 : 228 : pstate->te[worker] = te;
4549 andrew@dunslane.net 1231 : 228 : }
1232 : :
1233 : : /*
1234 : : * Find an idle worker and return its slot number.
1235 : : * Return NO_SLOT if none are idle.
1236 : : */
1237 : : static int
1238 : 596 : GetIdleWorker(ParallelState *pstate)
1239 : : {
1240 : : int i;
1241 : :
1242 [ + + ]: 1478 : for (i = 0; i < pstate->numWorkers; i++)
1243 : : {
1244 [ + + ]: 1121 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1245 : 239 : return i;
1246 : : }
1247 : 357 : return NO_SLOT;
1248 : : }
1249 : :
1250 : : /*
1251 : : * Return true iff no worker is running.
1252 : : */
1253 : : static bool
1254 : 41 : HasEveryWorkerTerminated(ParallelState *pstate)
1255 : : {
1256 : : int i;
1257 : :
1258 [ + + ]: 77 : for (i = 0; i < pstate->numWorkers; i++)
1259 : : {
2045 tgl@sss.pgh.pa.us 1260 [ + + - + ]: 64 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4549 andrew@dunslane.net 1261 : 28 : return false;
1262 : : }
1263 : 13 : return true;
1264 : : }
1265 : :
1266 : : /*
1267 : : * Return true iff every worker is in the WRKR_IDLE state.
1268 : : */
1269 : : bool
1270 : 71 : IsEveryWorkerIdle(ParallelState *pstate)
1271 : : {
1272 : : int i;
1273 : :
1274 [ + + ]: 154 : for (i = 0; i < pstate->numWorkers; i++)
1275 : : {
1276 [ + + ]: 120 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1277 : 37 : return false;
1278 : : }
1279 : 34 : return true;
1280 : : }
1281 : :
1282 : : /*
1283 : : * Acquire lock on a table to be dumped by a worker process.
1284 : : *
1285 : : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1286 : : * it's no problem for a worker to get one too, but if anything else besides
1287 : : * pg_dump is running, there's a possible deadlock:
1288 : : *
1289 : : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1290 : : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1291 : : * because the leader holds a conflicting ACCESS SHARE lock).
1292 : : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1293 : : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1294 : : * 4) Now we have a deadlock, since the leader is effectively waiting for
1295 : : * the worker. The server cannot detect that, however.
1296 : : *
1297 : : * To prevent an infinite wait, prior to touching a table in a worker, request
1298 : : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1299 : : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1300 : : * so we have a deadlock. We must fail the backup in that case.
1301 : : */
1302 : : static void
3388 tgl@sss.pgh.pa.us 1303 : 182 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1304 : : {
1305 : : const char *qualId;
1306 : : PQExpBuffer query;
1307 : : PGresult *res;
1308 : :
1309 : : /* Nothing to do for BLOBS */
1310 [ + + ]: 182 : if (strcmp(te->desc, "BLOBS") == 0)
1311 : 8 : return;
1312 : :
1313 : 174 : query = createPQExpBuffer();
1314 : :
2577 1315 : 174 : qualId = fmtQualifiedId(te->namespace, te->tag);
1316 : :
4549 andrew@dunslane.net 1317 : 174 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1318 : : qualId);
1319 : :
1320 : 174 : res = PQexec(AH->connection, query->data);
1321 : :
1322 [ + - - + ]: 174 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1247 tgl@sss.pgh.pa.us 1323 :UIC 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1324 : : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 : : "on the table after the pg_dump parent process had gotten the "
1326 : : "initial ACCESS SHARE lock on the table.", qualId);
1327 : :
4549 andrew@dunslane.net 1328 :GIC 174 : PQclear(res);
1329 : 174 : destroyPQExpBuffer(query);
1330 : : }
1331 : :
1332 : : /*
1333 : : * WaitForCommands: main routine for a worker process.
1334 : : *
1335 : : * Read and execute commands from the leader until we see EOF on the pipe.
1336 : : */
1337 : : static void
3524 tgl@sss.pgh.pa.us 1338 : 28 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1339 : : {
1340 : : char *command;
1341 : : TocEntry *te;
1342 : : T_Action act;
3266 1343 : 28 : int status = 0;
1344 : : char buf[256];
1345 : :
1346 : : for (;;)
1347 : : {
1910 andres@anarazel.de 1348 [ + + ]: 256 : if (!(command = getMessageFromLeader(pipefd)))
1349 : : {
1350 : : /* EOF, so done */
4549 andrew@dunslane.net 1351 : 28 : return;
1352 : : }
1353 : :
1354 : : /* Decode the command */
3266 tgl@sss.pgh.pa.us 1355 : 228 : parseWorkerCommand(AH, &te, &act, command);
1356 : :
1357 [ + + ]: 228 : if (act == ACT_DUMP)
1358 : : {
1359 : : /* Acquire lock on this table within the worker's session */
3388 1360 : 182 : lockTableForWorker(AH, te);
1361 : :
1362 : : /* Perform the dump command */
3266 1363 : 182 : status = (AH->WorkerJobDumpPtr) (AH, te);
1364 : : }
1365 [ + - ]: 46 : else if (act == ACT_RESTORE)
1366 : : {
1367 : : /* Perform the restore command */
1368 : 46 : status = (AH->WorkerJobRestorePtr) (AH, te);
1369 : : }
1370 : : else
3266 tgl@sss.pgh.pa.us 1371 :UIC 0 : Assert(false);
1372 : :
1373 : : /* Return status to leader */
3266 tgl@sss.pgh.pa.us 1374 :GIC 228 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1375 : :
1910 andres@anarazel.de 1376 : 228 : sendMessageToLeader(pipefd, buf);
1377 : :
1378 : : /* command was pg_malloc'd and we are responsible for free()ing it. */
4437 sfrost@snowman.net 1379 : 228 : free(command);
1380 : : }
1381 : : }
1382 : :
1383 : : /*
1384 : : * Check for status messages from workers.
1385 : : *
1386 : : * If do_wait is true, wait to get a status message; otherwise, just return
1387 : : * immediately if there is none available.
1388 : : *
1389 : : * When we get a status message, we pass the status code to the callback
1390 : : * function that was specified to DispatchJobForTocEntry, then reset the
1391 : : * worker status to IDLE.
1392 : : *
1393 : : * Returns true if we collected a status message, else false.
1394 : : *
1395 : : * XXX is it worth checking for more than one status message per call?
1396 : : * It seems somewhat unlikely that multiple workers would finish at exactly
1397 : : * the same time.
1398 : : */
1399 : : static bool
4549 andrew@dunslane.net 1400 : 438 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1401 : : {
1402 : : int worker;
1403 : : char *msg;
1404 : :
1405 : : /* Try to collect a status message */
1406 : 438 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1407 : :
1408 [ + + ]: 438 : if (!msg)
1409 : : {
1410 : : /* If do_wait is true, we must have detected EOF on some socket */
1411 [ - + ]: 210 : if (do_wait)
1247 tgl@sss.pgh.pa.us 1412 :UIC 0 : pg_fatal("a worker process died unexpectedly");
3266 tgl@sss.pgh.pa.us 1413 :GIC 210 : return false;
1414 : : }
1415 : :
1416 : : /* Process it and update our idea of the worker's status */
4549 andrew@dunslane.net 1417 [ + - ]: 228 : if (messageStartsWith(msg, "OK "))
1418 : : {
3266 tgl@sss.pgh.pa.us 1419 : 228 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1420 : 228 : TocEntry *te = pstate->te[worker];
1421 : : int status;
1422 : :
1423 : 228 : status = parseWorkerResponse(AH, te, msg);
1424 : 228 : slot->callback(AH, te, status, slot->callback_data);
1425 : 228 : slot->workerStatus = WRKR_IDLE;
1426 : 228 : pstate->te[worker] = NULL;
1427 : : }
1428 : : else
1247 tgl@sss.pgh.pa.us 1429 :UIC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1430 : : msg);
1431 : :
1432 : : /* Free the string returned from getMessageFromWorker */
4549 andrew@dunslane.net 1433 :GIC 228 : free(msg);
1434 : :
3266 tgl@sss.pgh.pa.us 1435 : 228 : return true;
1436 : : }
1437 : :
1438 : : /*
1439 : : * Check for status results from workers, waiting if necessary.
1440 : : *
1441 : : * Available wait modes are:
1442 : : * WFW_NO_WAIT: reap any available status, but don't block
1443 : : * WFW_GOT_STATUS: wait for at least one more worker to finish
1444 : : * WFW_ONE_IDLE: wait for at least one worker to be idle
1445 : : * WFW_ALL_IDLE: wait for all workers to be idle
1446 : : *
1447 : : * Any received results are passed to the callback specified to
1448 : : * DispatchJobForTocEntry.
1449 : : *
1450 : : * This function is executed in the leader process.
1451 : : */
1452 : : void
1453 : 231 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1454 : : {
1455 : 231 : bool do_wait = false;
1456 : :
1457 : : /*
1458 : : * In GOT_STATUS mode, always block waiting for a message, since we can't
1459 : : * return till we get something. In other modes, we don't block the first
1460 : : * time through the loop.
1461 : : */
1462 [ + + ]: 231 : if (mode == WFW_GOT_STATUS)
1463 : : {
1464 : : /* Assert that caller knows what it's doing */
1465 [ - + ]: 11 : Assert(!IsEveryWorkerIdle(pstate));
1466 : 11 : do_wait = true;
1467 : : }
1468 : :
1469 : : for (;;)
1470 : : {
1471 : : /*
1472 : : * Check for status messages, even if we don't need to block. We do
1473 : : * not try very hard to reap all available messages, though, since
1474 : : * there's unlikely to be more than one.
1475 : : */
1476 [ + + ]: 438 : if (ListenToWorkers(AH, pstate, do_wait))
1477 : : {
1478 : : /*
1479 : : * If we got a message, we are done by definition for GOT_STATUS
1480 : : * mode, and we can also be certain that there's at least one idle
1481 : : * worker. So we're done in all but ALL_IDLE mode.
1482 : : */
1483 [ + + ]: 228 : if (mode != WFW_ALL_IDLE)
1484 : 211 : return;
1485 : : }
1486 : :
1487 : : /* Check whether we must wait for new status messages */
1488 [ - - + + : 227 : switch (mode)
- ]
1489 : : {
3266 tgl@sss.pgh.pa.us 1490 :UIC 0 : case WFW_NO_WAIT:
1491 : 0 : return; /* never wait */
1492 : 0 : case WFW_GOT_STATUS:
1493 : 0 : Assert(false); /* can't get here, because we waited */
1494 : : break;
3266 tgl@sss.pgh.pa.us 1495 :GIC 203 : case WFW_ONE_IDLE:
1496 [ + + ]: 203 : if (GetIdleWorker(pstate) != NO_SLOT)
1497 : 11 : return;
1498 : 192 : break;
1499 : 24 : case WFW_ALL_IDLE:
1500 [ + + ]: 24 : if (IsEveryWorkerIdle(pstate))
1501 : 9 : return;
1502 : 15 : break;
1503 : : }
1504 : :
1505 : : /* Loop back, and this time wait for something to happen */
1506 : 207 : do_wait = true;
1507 : : }
1508 : : }
1509 : :
1510 : : /*
1511 : : * Read one command message from the leader, blocking if necessary
1512 : : * until one is available, and return it as a malloc'd string.
1513 : : * On EOF, return NULL.
1514 : : *
1515 : : * This function is executed in worker processes.
1516 : : */
1517 : : static char *
1910 andres@anarazel.de 1518 : 256 : getMessageFromLeader(int pipefd[2])
1519 : : {
4549 andrew@dunslane.net 1520 : 256 : return readMessageFromPipe(pipefd[PIPE_READ]);
1521 : : }
1522 : :
1523 : : /*
1524 : : * Send a status message to the leader.
1525 : : *
1526 : : * This function is executed in worker processes.
1527 : : */
1528 : : static void
1910 andres@anarazel.de 1529 : 228 : sendMessageToLeader(int pipefd[2], const char *str)
1530 : : {
4549 andrew@dunslane.net 1531 : 228 : int len = strlen(str) + 1;
1532 : :
1533 [ - + ]: 228 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1247 tgl@sss.pgh.pa.us 1534 :UIC 0 : pg_fatal("could not write to the communication channel: %m");
4549 andrew@dunslane.net 1535 :GIC 228 : }
1536 : :
1537 : : /*
1538 : : * Wait until some descriptor in "workerset" becomes readable.
1539 : : * Returns -1 on error, else the number of readable descriptors.
1540 : : */
1541 : : static int
1542 : 218 : select_loop(int maxFd, fd_set *workerset)
1543 : : {
1544 : : int i;
1545 : 218 : fd_set saveSet = *workerset;
1546 : :
1547 : : for (;;)
1548 : : {
1549 : 218 : *workerset = saveSet;
1550 : 218 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1551 : :
1552 : : #ifndef WIN32
1553 [ - + - - ]: 218 : if (i < 0 && errno == EINTR)
4549 andrew@dunslane.net 1554 :UIC 0 : continue;
1555 : : #else
1556 : : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1557 : : continue;
1558 : : #endif
4549 andrew@dunslane.net 1559 :GIC 218 : break;
1560 : : }
1561 : :
1562 : 218 : return i;
1563 : : }
1564 : :
1565 : :
1566 : : /*
1567 : : * Check for messages from worker processes.
1568 : : *
1569 : : * If a message is available, return it as a malloc'd string, and put the
1570 : : * index of the sending worker in *worker.
1571 : : *
1572 : : * If nothing is available, wait if "do_wait" is true, else return NULL.
1573 : : *
1574 : : * If we detect EOF on any socket, we'll return NULL. It's not great that
1575 : : * that's hard to distinguish from the no-data-available case, but for now
1576 : : * our one caller is okay with that.
1577 : : *
1578 : : * This function is executed in the leader process.
1579 : : */
1580 : : static char *
1581 : 438 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1582 : : {
1583 : : int i;
1584 : : fd_set workerset;
1585 : 438 : int maxFd = -1;
1586 : 438 : struct timeval nowait = {0, 0};
1587 : :
1588 : : /* construct bitmap of socket descriptors for select() */
1589 [ + + ]: 7446 : FD_ZERO(&workerset);
1590 [ + + ]: 1400 : for (i = 0; i < pstate->numWorkers; i++)
1591 : : {
2045 tgl@sss.pgh.pa.us 1592 [ + + - + ]: 962 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4549 andrew@dunslane.net 1593 :UIC 0 : continue;
4549 andrew@dunslane.net 1594 :GIC 962 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1595 [ + - ]: 962 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1596 : 962 : maxFd = pstate->parallelSlot[i].pipeRead;
1597 : : }
1598 : :
1599 [ + + ]: 438 : if (do_wait)
1600 : : {
1601 : 218 : i = select_loop(maxFd, &workerset);
1602 [ - + ]: 218 : Assert(i != 0);
1603 : : }
1604 : : else
1605 : : {
1606 [ + + ]: 220 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1607 : 210 : return NULL;
1608 : : }
1609 : :
1610 [ - + ]: 228 : if (i < 0)
1247 tgl@sss.pgh.pa.us 1611 :UIC 0 : pg_fatal("%s() failed: %m", "select");
1612 : :
4549 andrew@dunslane.net 1613 [ + - ]:GIC 349 : for (i = 0; i < pstate->numWorkers; i++)
1614 : : {
1615 : : char *msg;
1616 : :
2045 tgl@sss.pgh.pa.us 1617 [ + + - + ]: 349 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
2045 tgl@sss.pgh.pa.us 1618 :UIC 0 : continue;
4549 andrew@dunslane.net 1619 [ + + ]:GIC 349 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1620 : 121 : continue;
1621 : :
1622 : : /*
1623 : : * Read the message if any. If the socket is ready because of EOF,
1624 : : * we'll return NULL instead (and the socket will stay ready, so the
1625 : : * condition will persist).
1626 : : *
1627 : : * Note: because this is a blocking read, we'll wait if only part of
1628 : : * the message is available. Waiting a long time would be bad, but
1629 : : * since worker status messages are short and are always sent in one
1630 : : * operation, it shouldn't be a problem in practice.
1631 : : */
1632 : 228 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1633 : 228 : *worker = i;
1634 : 228 : return msg;
1635 : : }
4549 andrew@dunslane.net 1636 :UIC 0 : Assert(false);
1637 : : return NULL;
1638 : : }
1639 : :
1640 : : /*
1641 : : * Send a command message to the specified worker process.
1642 : : *
1643 : : * This function is executed in the leader process.
1644 : : */
1645 : : static void
4549 andrew@dunslane.net 1646 :GIC 228 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1647 : : {
1648 : 228 : int len = strlen(str) + 1;
1649 : :
1650 [ - + ]: 228 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1651 : : {
1247 tgl@sss.pgh.pa.us 1652 :UIC 0 : pg_fatal("could not write to the communication channel: %m");
1653 : : }
4549 andrew@dunslane.net 1654 :GIC 228 : }
1655 : :
1656 : : /*
1657 : : * Read one message from the specified pipe (fd), blocking if necessary
1658 : : * until one is available, and return it as a malloc'd string.
1659 : : * On EOF, return NULL.
1660 : : *
1661 : : * A "message" on the channel is just a null-terminated string.
1662 : : */
1663 : : static char *
1664 : 484 : readMessageFromPipe(int fd)
1665 : : {
1666 : : char *msg;
1667 : : int msgsize,
1668 : : bufsize;
1669 : : int ret;
1670 : :
1671 : : /*
1672 : : * In theory, if we let piperead() read multiple bytes, it might give us
1673 : : * back fragments of multiple messages. (That can't actually occur, since
1674 : : * neither leader nor workers send more than one message without waiting
1675 : : * for a reply, but we don't wish to assume that here.) For simplicity,
1676 : : * read a byte at a time until we get the terminating '\0'. This method
1677 : : * is a bit inefficient, but since this is only used for relatively short
1678 : : * command and status strings, it shouldn't matter.
1679 : : */
1680 : 484 : bufsize = 64; /* could be any number */
1681 : 484 : msg = (char *) pg_malloc(bufsize);
1682 : 484 : msgsize = 0;
1683 : : for (;;)
1684 : : {
3388 tgl@sss.pgh.pa.us 1685 [ - + ]: 5182 : Assert(msgsize < bufsize);
4549 andrew@dunslane.net 1686 : 5182 : ret = piperead(fd, msg + msgsize, 1);
1687 [ + + ]: 5182 : if (ret <= 0)
3388 tgl@sss.pgh.pa.us 1688 : 28 : break; /* error or connection closure */
1689 : :
4549 andrew@dunslane.net 1690 [ - + ]: 5154 : Assert(ret == 1);
1691 : :
1692 [ + + ]: 5154 : if (msg[msgsize] == '\0')
3388 tgl@sss.pgh.pa.us 1693 : 456 : return msg; /* collected whole message */
1694 : :
4549 andrew@dunslane.net 1695 : 4698 : msgsize++;
3388 tgl@sss.pgh.pa.us 1696 [ - + ]: 4698 : if (msgsize == bufsize) /* enlarge buffer if needed */
1697 : : {
3388 tgl@sss.pgh.pa.us 1698 :UIC 0 : bufsize += 16; /* could be any number */
3945 1699 : 0 : msg = (char *) pg_realloc(msg, bufsize);
1700 : : }
1701 : : }
1702 : :
1703 : : /* Other end has closed the connection */
3945 tgl@sss.pgh.pa.us 1704 :GIC 28 : pg_free(msg);
4243 sfrost@snowman.net 1705 : 28 : return NULL;
1706 : : }
1707 : :
1708 : : #ifdef WIN32
1709 : :
1710 : : /*
1711 : : * This is a replacement version of pipe(2) for Windows which allows the pipe
1712 : : * handles to be used in select().
1713 : : *
1714 : : * Reads and writes on the pipe must go through piperead()/pipewrite().
1715 : : *
1716 : : * For consistency with Unix we declare the returned handles as "int".
1717 : : * This is okay even on WIN64 because system handles are not more than
1718 : : * 32 bits wide, but we do have to do some casting.
1719 : : */
1720 : : static int
1721 : : pgpipe(int handles[2])
1722 : : {
1723 : : pgsocket s,
1724 : : tmp_sock;
1725 : : struct sockaddr_in serv_addr;
1726 : : int len = sizeof(serv_addr);
1727 : :
1728 : : /* We have to use the Unix socket invalid file descriptor value here. */
1729 : : handles[0] = handles[1] = -1;
1730 : :
1731 : : /*
1732 : : * setup listen socket
1733 : : */
1734 : : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1735 : : {
1736 : : pg_log_error("pgpipe: could not create socket: error code %d",
1737 : : WSAGetLastError());
1738 : : return -1;
1739 : : }
1740 : :
1741 : : memset(&serv_addr, 0, sizeof(serv_addr));
1742 : : serv_addr.sin_family = AF_INET;
1743 : : serv_addr.sin_port = pg_hton16(0);
1744 : : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1745 : : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1746 : : {
1747 : : pg_log_error("pgpipe: could not bind: error code %d",
1748 : : WSAGetLastError());
1749 : : closesocket(s);
1750 : : return -1;
1751 : : }
1752 : : if (listen(s, 1) == SOCKET_ERROR)
1753 : : {
1754 : : pg_log_error("pgpipe: could not listen: error code %d",
1755 : : WSAGetLastError());
1756 : : closesocket(s);
1757 : : return -1;
1758 : : }
1759 : : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1760 : : {
1761 : : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1762 : : WSAGetLastError());
1763 : : closesocket(s);
1764 : : return -1;
1765 : : }
1766 : :
1767 : : /*
1768 : : * setup pipe handles
1769 : : */
1770 : : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1771 : : {
1772 : : pg_log_error("pgpipe: could not create second socket: error code %d",
1773 : : WSAGetLastError());
1774 : : closesocket(s);
1775 : : return -1;
1776 : : }
1777 : : handles[1] = (int) tmp_sock;
1778 : :
1779 : : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1780 : : {
1781 : : pg_log_error("pgpipe: could not connect socket: error code %d",
1782 : : WSAGetLastError());
1783 : : closesocket(handles[1]);
1784 : : handles[1] = -1;
1785 : : closesocket(s);
1786 : : return -1;
1787 : : }
1788 : : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1789 : : {
1790 : : pg_log_error("pgpipe: could not accept connection: error code %d",
1791 : : WSAGetLastError());
1792 : : closesocket(handles[1]);
1793 : : handles[1] = -1;
1794 : : closesocket(s);
1795 : : return -1;
1796 : : }
1797 : : handles[0] = (int) tmp_sock;
1798 : :
1799 : : closesocket(s);
1800 : : return 0;
1801 : : }
1802 : :
1803 : : #endif /* WIN32 */
|