Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * libpq-be-fe-helpers.h
4 : : * Helper functions for using libpq in extensions
5 : : *
6 : : * Code built directly into the backend is not allowed to link to libpq
7 : : * directly. Extension code is allowed to use libpq however. However, libpq
8 : : * used in extensions has to be careful not to block inside libpq, otherwise
9 : : * interrupts will not be processed, leading to issues like unresolvable
10 : : * deadlocks. Backend code also needs to take care to acquire/release an
11 : : * external fd for the connection, otherwise fd.c's accounting of fd's is
12 : : * broken.
13 : : *
14 : : * This file provides helper functions to make it easier to comply with these
15 : : * rules. It is a header only library as it needs to be linked into each
16 : : * extension using libpq, and it seems too small to be worth adding a
17 : : * dedicated static library for.
18 : : *
19 : : * TODO: For historical reasons the connections established here are not put
20 : : * into non-blocking mode. That can lead to blocking even when only the async
21 : : * libpq functions are used. This should be fixed.
22 : : *
23 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
24 : : * Portions Copyright (c) 1994, Regents of the University of California
25 : : *
26 : : * src/include/libpq/libpq-be-fe-helpers.h
27 : : *
28 : : *-------------------------------------------------------------------------
29 : : */
30 : : #ifndef LIBPQ_BE_FE_HELPERS_H
31 : : #define LIBPQ_BE_FE_HELPERS_H
32 : :
33 : : #include "libpq/libpq-be-fe.h"
34 : : #include "miscadmin.h"
35 : : #include "storage/fd.h"
36 : : #include "storage/latch.h"
37 : : #include "utils/timestamp.h"
38 : : #include "utils/wait_event.h"
39 : :
40 : :
41 : : static inline void libpqsrv_connect_prepare(void);
42 : : static inline void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info);
43 : : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
44 : : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
45 : :
46 : : /*
47 : : * Start a connection using PQconnectStart().
48 : : *
49 : : * The returned connection has not yet completed its startup sequence. Callers
50 : : * may perform per-connection setup, such as installing a notice receiver,
51 : : * before calling libpqsrv_connect_complete().
52 : : *
53 : : * Callers must call libpqsrv_connect_complete(), even if this function returns
54 : : * NULL, because libpqsrv_connect_prepare() may already have reserved an
55 : : * external FD that must be released.
56 : : */
57 : : static inline PGconn *
7 fujii@postgresql.org 58 :GNC 25 : libpqsrv_connect_start(const char *conninfo)
59 : : {
60 : 25 : libpqsrv_connect_prepare();
61 : :
62 : 25 : return PQconnectStart(conninfo);
63 : : }
64 : :
65 : : /*
66 : : * PQconnectdb() wrapper that reserves a file descriptor and processes
67 : : * interrupts during connection establishment.
68 : : *
69 : : * Throws an error if AcquireExternalFD() fails, but does not throw if
70 : : * connection establishment itself fails. Callers need to use PQstatus() to
71 : : * check if connection establishment succeeded.
72 : : */
73 : : static inline PGconn *
1223 andres@anarazel.de 74 :ECB (25) : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
75 : : {
76 : : PGconn *conn;
77 : :
78 : : conn = libpqsrv_connect_start(conninfo);
79 : :
80 : : libpqsrv_connect_complete(conn, wait_event_info);
81 : :
82 : (25) : return conn;
83 : : }
84 : :
85 : : /*
86 : : * Start a connection using PQconnectStartParams().
87 : : *
88 : : * See libpqsrv_connect_start() for the resource-lifetime rules.
89 : : */
90 : : static inline PGconn *
7 fujii@postgresql.org 91 :GNC 1092 : libpqsrv_connect_params_start(const char *const *keywords,
92 : : const char *const *values,
93 : : int expand_dbname)
94 : : {
95 : 1092 : libpqsrv_connect_prepare();
96 : :
97 : 1092 : return PQconnectStartParams(keywords, values, expand_dbname);
98 : : }
99 : :
100 : : /*
101 : : * Like libpqsrv_connect(), except that this is a wrapper for
102 : : * PQconnectdbParams().
103 : : */
104 : : static inline PGconn *
1223 andres@anarazel.de 105 :ECB (952) : libpqsrv_connect_params(const char *const *keywords,
106 : : const char *const *values,
107 : : int expand_dbname,
108 : : uint32 wait_event_info)
109 : : {
110 : : PGconn *conn;
111 : :
112 : : conn = libpqsrv_connect_params_start(keywords, values, expand_dbname);
113 : :
114 : : libpqsrv_connect_complete(conn, wait_event_info);
115 : :
116 : (952) : return conn;
117 : : }
118 : :
119 : : /*
120 : : * PQfinish() wrapper that additionally releases the reserved file descriptor.
121 : : *
122 : : * It is allowed to call this with NULL only when the external FD reservation
123 : : * has already been released, for example after calling
124 : : * libpqsrv_connect_complete() with a NULL connection.
125 : : */
126 : : static inline void
1223 andres@anarazel.de 127 :CBC 1110 : libpqsrv_disconnect(PGconn *conn)
128 : : {
129 : : /*
130 : : * If no connection was established, we haven't reserved an FD for it (or
131 : : * already released it). This rule makes it easier to write PG_CATCH()
132 : : * handlers for this facility's users.
133 : : *
134 : : * See also libpqsrv_connect_complete().
135 : : */
136 [ + + ]: 1110 : if (conn == NULL)
137 : 5 : return;
138 : :
139 : 1105 : ReleaseExternalFD();
140 : 1105 : PQfinish(conn);
141 : : }
142 : :
143 : :
144 : : /* lower-level connection helper functions follow */
145 : :
146 : :
147 : : /*
148 : : * Helper function for all connection establishment functions.
149 : : */
150 : : static inline void
151 : 1117 : libpqsrv_connect_prepare(void)
152 : : {
153 : : /*
154 : : * We must obey fd.c's limit on non-virtual file descriptors. Assume that
155 : : * a PGconn represents one long-lived FD. (Doing this here also ensures
156 : : * that VFDs are closed if needed to make room.)
157 : : */
158 [ - + ]: 1117 : if (!AcquireExternalFD())
159 : : {
160 : : #ifndef WIN32 /* can't write #if within ereport() macro */
1223 andres@anarazel.de 161 [ # # ]:UBC 0 : ereport(ERROR,
162 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
163 : : errmsg("could not establish connection"),
164 : : errdetail("There are too many open files on the local server."),
165 : : errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
166 : : #else
167 : : ereport(ERROR,
168 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
169 : : errmsg("could not establish connection"),
170 : : errdetail("There are too many open files on the local server."),
171 : : errhint("Raise the server's \"max_files_per_process\" setting.")));
172 : : #endif
173 : : }
1223 andres@anarazel.de 174 :CBC 1117 : }
175 : :
176 : : /*
177 : : * Complete a connection started by libpqsrv_connect_start() or
178 : : * libpqsrv_connect_params_start().
179 : : */
180 : : static inline void
7 fujii@postgresql.org 181 :GNC 1117 : libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info)
182 : : {
183 : : /*
184 : : * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
185 : : * that here.
186 : : */
1223 andres@anarazel.de 187 [ - + ]:CBC 1117 : if (conn == NULL)
188 : : {
1223 andres@anarazel.de 189 :UBC 0 : ReleaseExternalFD();
190 : 0 : return;
191 : : }
192 : :
193 : : /*
194 : : * Can't wait without a socket. Note that we don't want to close the libpq
195 : : * connection yet, so callers can emit a useful error.
196 : : */
1223 andres@anarazel.de 197 [ + + ]:CBC 1117 : if (PQstatus(conn) == CONNECTION_BAD)
198 : 63 : return;
199 : :
200 : : /*
201 : : * WaitLatchOrSocket() can conceivably fail, handle that case here instead
202 : : * of requiring all callers to do so.
203 : : */
204 [ + - ]: 1054 : PG_TRY();
205 : : {
206 : : PostgresPollingStatusType status;
207 : :
208 : : /*
209 : : * Poll connection until we have OK or FAILED status.
210 : : *
211 : : * Per spec for PQconnectPoll, first wait till socket is write-ready.
212 : : */
213 : 1054 : status = PGRES_POLLING_WRITING;
214 [ + + + + ]: 4988 : while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
215 : : {
216 : : int io_flag;
217 : : int rc;
218 : :
219 [ + + ]: 2880 : if (status == PGRES_POLLING_READING)
220 : 1092 : io_flag = WL_SOCKET_READABLE;
221 : : #ifdef WIN32
222 : :
223 : : /*
224 : : * Windows needs a different test while waiting for
225 : : * connection-made
226 : : */
227 : : else if (PQstatus(conn) == CONNECTION_STARTED)
228 : : io_flag = WL_SOCKET_CONNECTED;
229 : : #endif
230 : : else
231 : 1788 : io_flag = WL_SOCKET_WRITEABLE;
232 : :
233 : 2880 : rc = WaitLatchOrSocket(MyLatch,
234 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
235 : : PQsocket(conn),
236 : : 0,
237 : : wait_event_info);
238 : :
239 : : /* Interrupted? */
240 [ + + ]: 2880 : if (rc & WL_LATCH_SET)
241 : : {
242 : 719 : ResetLatch(MyLatch);
243 [ - + ]: 719 : CHECK_FOR_INTERRUPTS();
244 : : }
245 : :
246 : : /* If socket is ready, advance the libpq state machine */
247 [ + + ]: 2880 : if (rc & io_flag)
248 : 2161 : status = PQconnectPoll(conn);
249 : : }
250 : : }
1223 andres@anarazel.de 251 :UBC 0 : PG_CATCH();
252 : : {
253 : : /*
254 : : * If an error is thrown here, the callers won't call
255 : : * libpqsrv_disconnect() with a conn, so release resources
256 : : * immediately.
257 : : */
258 : 0 : ReleaseExternalFD();
259 : 0 : PQfinish(conn);
260 : :
261 : 0 : PG_RE_THROW();
262 : : }
1223 andres@anarazel.de 263 [ - + ]:CBC 1054 : PG_END_TRY();
264 : : }
265 : :
266 : : /*
267 : : * PQexec() wrapper that processes interrupts.
268 : : *
269 : : * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
270 : : * interrupts while pushing the query text to the server. Consider that
271 : : * setting if query strings can be long relative to TCP buffer size.
272 : : *
273 : : * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
274 : : * notably, PQexec() would silently discard any prior query results.
275 : : */
276 : : static inline PGresult *
873 noah@leadboat.com 277 : 4301 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
278 : : {
279 [ - + ]: 4301 : if (!PQsendQuery(conn, query))
873 noah@leadboat.com 280 :UBC 0 : return NULL;
873 noah@leadboat.com 281 :CBC 4301 : return libpqsrv_get_result_last(conn, wait_event_info);
282 : : }
283 : :
284 : : /*
285 : : * PQexecParams() wrapper that processes interrupts.
286 : : *
287 : : * See notes at libpqsrv_exec().
288 : : */
289 : : static inline PGresult *
290 : : libpqsrv_exec_params(PGconn *conn,
291 : : const char *command,
292 : : int nParams,
293 : : const Oid *paramTypes,
294 : : const char *const *paramValues,
295 : : const int *paramLengths,
296 : : const int *paramFormats,
297 : : int resultFormat,
298 : : uint32 wait_event_info)
299 : : {
300 : : if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
301 : : paramLengths, paramFormats, resultFormat))
302 : : return NULL;
303 : : return libpqsrv_get_result_last(conn, wait_event_info);
304 : : }
305 : :
306 : : /*
307 : : * Like PQexec(), loop over PQgetResult() until it returns NULL or another
308 : : * terminal state. Return the last non-NULL result or the terminal state.
309 : : */
310 : : static inline PGresult *
311 : 12765 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
312 : : {
309 tgl@sss.pgh.pa.us 313 :GNC 12765 : PGresult *lastResult = NULL;
314 : :
315 : : for (;;)
309 tgl@sss.pgh.pa.us 316 :GIC 11949 : {
317 : : /* Wait for, and collect, the next PGresult. */
318 : : PGresult *result;
319 : :
309 tgl@sss.pgh.pa.us 320 :GNC 24714 : result = libpqsrv_get_result(conn, wait_event_info);
321 [ + + ]: 24712 : if (result == NULL)
322 : 11943 : break; /* query is complete, or failure */
323 : :
324 : : /*
325 : : * Emulate PQexec()'s behavior of returning the last result when there
326 : : * are many.
327 : : */
309 tgl@sss.pgh.pa.us 328 :CBC 12769 : PQclear(lastResult);
309 tgl@sss.pgh.pa.us 329 :GNC 12769 : lastResult = result;
330 : :
331 [ + - + + ]: 25538 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
332 [ + + ]: 25329 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
333 [ + + ]: 24513 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
334 : 11953 : PQstatus(conn) == CONNECTION_BAD)
335 : : break;
336 : : }
873 noah@leadboat.com 337 :CBC 12763 : return lastResult;
338 : : }
339 : :
340 : : /*
341 : : * Perform the equivalent of PQgetResult(), but watch for interrupts.
342 : : */
343 : : static inline PGresult *
344 : 26078 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
345 : : {
346 : : /*
347 : : * Collect data until PQgetResult is ready to get the result without
348 : : * blocking.
349 : : */
350 [ + + ]: 38773 : while (PQisBusy(conn))
351 : : {
352 : : int rc;
353 : :
354 : 12745 : rc = WaitLatchOrSocket(MyLatch,
355 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
356 : : WL_SOCKET_READABLE,
357 : : PQsocket(conn),
358 : : 0,
359 : : wait_event_info);
360 : :
361 : : /* Interrupted? */
362 [ + + ]: 12745 : if (rc & WL_LATCH_SET)
363 : : {
364 : 14 : ResetLatch(MyLatch);
365 [ + + ]: 14 : CHECK_FOR_INTERRUPTS();
366 : : }
367 : :
368 : : /* Consume whatever data is available from the socket */
369 [ + + ]: 12743 : if (PQconsumeInput(conn) == 0)
370 : : {
371 : : /* trouble; expect PQgetResult() to return NULL */
372 : 48 : break;
373 : : }
374 : : }
375 : :
376 : : /* Now we can collect and return the next PGresult */
377 : 26076 : return PQgetResult(conn);
378 : : }
379 : :
380 : : /*
381 : : * Submit a cancel request to the given connection, waiting only until
382 : : * the given time.
383 : : *
384 : : * We sleep interruptibly until we receive confirmation that the cancel
385 : : * request has been accepted, and if it is, return NULL; if the cancel
386 : : * request fails, return an error message string (which is not to be
387 : : * freed).
388 : : *
389 : : * For other problems (to wit: OOM when strdup'ing an error message from
390 : : * libpq), this function can ereport(ERROR).
391 : : *
392 : : * Note: this function leaks a string's worth of memory when reporting
393 : : * libpq errors. Make sure to call it in a transient memory context.
394 : : */
395 : : static inline const char *
793 alvherre@alvh.no-ip. 396 : 2 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
397 : : {
398 : : PGcancelConn *cancel_conn;
785 399 : 2 : const char *error = NULL;
400 : :
793 401 : 2 : cancel_conn = PQcancelCreate(conn);
402 [ - + ]: 2 : if (cancel_conn == NULL)
793 alvherre@alvh.no-ip. 403 :UBC 0 : return "out of memory";
404 : :
405 : : /* In what follows, do not leak any PGcancelConn on any errors. */
406 : :
793 alvherre@alvh.no-ip. 407 [ + - ]:CBC 2 : PG_TRY();
408 : : {
409 [ - + ]: 2 : if (!PQcancelStart(cancel_conn))
410 : : {
793 alvherre@alvh.no-ip. 411 :UBC 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
412 : 0 : goto exit;
413 : : }
414 : :
415 : : for (;;)
793 alvherre@alvh.no-ip. 416 :CBC 2 : {
417 : : PostgresPollingStatusType pollres;
418 : : TimestampTz now;
419 : : long cur_timeout;
420 : 4 : int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
421 : :
422 : 4 : pollres = PQcancelPoll(cancel_conn);
423 [ + + ]: 4 : if (pollres == PGRES_POLLING_OK)
424 : 2 : break; /* success! */
425 : :
426 : : /* If timeout has expired, give up, else get sleep time. */
427 : 2 : now = GetCurrentTimestamp();
428 : 2 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
429 [ - + ]: 2 : if (cur_timeout <= 0)
430 : : {
793 alvherre@alvh.no-ip. 431 :UBC 0 : error = "cancel request timed out";
432 : 0 : break;
433 : : }
434 : :
793 alvherre@alvh.no-ip. 435 [ + - - ]:CBC 2 : switch (pollres)
436 : : {
437 : 2 : case PGRES_POLLING_READING:
438 : 2 : waitEvents |= WL_SOCKET_READABLE;
439 : 2 : break;
793 alvherre@alvh.no-ip. 440 :UBC 0 : case PGRES_POLLING_WRITING:
441 : 0 : waitEvents |= WL_SOCKET_WRITEABLE;
442 : 0 : break;
443 : 0 : default:
444 : 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
445 : 0 : goto exit;
446 : : }
447 : :
448 : : /* Sleep until there's something to do */
793 alvherre@alvh.no-ip. 449 :CBC 2 : WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
450 : : cur_timeout, PG_WAIT_CLIENT);
451 : :
452 : 2 : ResetLatch(MyLatch);
453 : :
454 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
455 : : }
456 : 2 : exit: ;
457 : : }
793 alvherre@alvh.no-ip. 458 :UBC 0 : PG_FINALLY();
459 : : {
793 alvherre@alvh.no-ip. 460 :CBC 2 : PQcancelFinish(cancel_conn);
461 : : }
462 [ - + ]: 2 : PG_END_TRY();
463 : :
464 : 2 : return error;
465 : : }
466 : :
467 : : /*
468 : : * libpqsrv_notice_receiver
469 : : *
470 : : * Custom notice receiver for libpq connections.
471 : : *
472 : : * This function is intended to be set via PQsetNoticeReceiver() so that
473 : : * NOTICE, WARNING, and similar messages from the connection are reported via
474 : : * ereport(), instead of being printed to stderr.
475 : : *
476 : : * Because this will be called from libpq with a "real" (not wrapped)
477 : : * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
478 : : * for PGresult and also PQresultErrorMessage, and put back the wrappers
479 : : * afterwards. That's not pretty, but there seems no better alternative.
480 : : */
481 : : #undef PGresult
482 : : #undef PQresultErrorMessage
483 : :
484 : : static inline void
312 fujii@postgresql.org 485 :GNC 12 : libpqsrv_notice_receiver(void *arg, const PGresult *res)
486 : : {
487 : : const char *message;
488 : : int len;
309 tgl@sss.pgh.pa.us 489 : 12 : const char *prefix = (const char *) arg;
490 : :
491 : : /*
492 : : * Trim the trailing newline from the message text returned from
493 : : * PQresultErrorMessage(), as it always includes one, to produce cleaner
494 : : * log output.
495 : : */
312 fujii@postgresql.org 496 : 12 : message = PQresultErrorMessage(res);
497 : 12 : len = strlen(message);
498 [ + - + - ]: 12 : if (len > 0 && message[len - 1] == '\n')
499 : 12 : len--;
500 : :
501 [ + - ]: 12 : ereport(LOG,
502 : : errmsg_internal("%s: %.*s", prefix, len, message));
503 : 12 : }
504 : :
505 : : #define PGresult libpqsrv_PGresult
506 : : #define PQresultErrorMessage libpqsrv_PQresultErrorMessage
507 : :
508 : : #endif /* LIBPQ_BE_FE_HELPERS_H */
|