Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * libpq-be-fe-helpers.h
4 : : * Helper functions for using libpq in extensions
5 : : *
6 : : * Code built directly into the backend is not allowed to link to libpq
7 : : * directly. Extension code is allowed to use libpq however. However, libpq
8 : : * used in extensions has to be careful not to block inside libpq, otherwise
9 : : * interrupts will not be processed, leading to issues like unresolvable
10 : : * deadlocks. Backend code also needs to take care to acquire/release an
11 : : * external fd for the connection, otherwise fd.c's accounting of fd's is
12 : : * broken.
13 : : *
14 : : * This file provides helper functions to make it easier to comply with these
15 : : * rules. It is a header only library as it needs to be linked into each
16 : : * extension using libpq, and it seems too small to be worth adding a
17 : : * dedicated static library for.
18 : : *
19 : : * TODO: For historical reasons the connections established here are not put
20 : : * into non-blocking mode. That can lead to blocking even when only the async
21 : : * libpq functions are used. This should be fixed.
22 : : *
23 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
24 : : * Portions Copyright (c) 1994, Regents of the University of California
25 : : *
26 : : * src/include/libpq/libpq-be-fe-helpers.h
27 : : *
28 : : *-------------------------------------------------------------------------
29 : : */
30 : : #ifndef LIBPQ_BE_FE_HELPERS_H
31 : : #define LIBPQ_BE_FE_HELPERS_H
32 : :
33 : : #include "libpq/libpq-be-fe.h"
34 : : #include "miscadmin.h"
35 : : #include "storage/fd.h"
36 : : #include "storage/latch.h"
37 : : #include "utils/timestamp.h"
38 : : #include "utils/wait_event.h"
39 : :
40 : :
41 : : static inline void libpqsrv_connect_prepare(void);
42 : : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
43 : : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
44 : : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
45 : :
46 : :
47 : : /*
48 : : * PQconnectdb() wrapper that reserves a file descriptor and processes
49 : : * interrupts during connection establishment.
50 : : *
51 : : * Throws an error if AcquireExternalFD() fails, but does not throw if
52 : : * connection establishment itself fails. Callers need to use PQstatus() to
53 : : * check if connection establishment succeeded.
54 : : */
55 : : static inline PGconn *
957 andres@anarazel.de 56 :CBC 26 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
57 : : {
58 : 26 : PGconn *conn = NULL;
59 : :
60 : 26 : libpqsrv_connect_prepare();
61 : :
62 : 26 : conn = PQconnectStart(conninfo);
63 : :
64 : 26 : libpqsrv_connect_internal(conn, wait_event_info);
65 : :
66 : 26 : return conn;
67 : : }
68 : :
69 : : /*
70 : : * Like libpqsrv_connect(), except that this is a wrapper for
71 : : * PQconnectdbParams().
72 : : */
73 : : static inline PGconn *
74 : 972 : libpqsrv_connect_params(const char *const *keywords,
75 : : const char *const *values,
76 : : int expand_dbname,
77 : : uint32 wait_event_info)
78 : : {
79 : 972 : PGconn *conn = NULL;
80 : :
81 : 972 : libpqsrv_connect_prepare();
82 : :
83 : 972 : conn = PQconnectStartParams(keywords, values, expand_dbname);
84 : :
85 : 972 : libpqsrv_connect_internal(conn, wait_event_info);
86 : :
87 : 971 : return conn;
88 : : }
89 : :
90 : : /*
91 : : * PQfinish() wrapper that additionally releases the reserved file descriptor.
92 : : *
93 : : * It is allowed to call this with a NULL pgconn iff NULL was returned by
94 : : * libpqsrv_connect*.
95 : : */
96 : : static inline void
97 : 989 : libpqsrv_disconnect(PGconn *conn)
98 : : {
99 : : /*
100 : : * If no connection was established, we haven't reserved an FD for it (or
101 : : * already released it). This rule makes it easier to write PG_CATCH()
102 : : * handlers for this facility's users.
103 : : *
104 : : * See also libpqsrv_connect_internal().
105 : : */
106 [ + + ]: 989 : if (conn == NULL)
107 : 4 : return;
108 : :
109 : 985 : ReleaseExternalFD();
110 : 985 : PQfinish(conn);
111 : : }
112 : :
113 : :
114 : : /* internal helper functions follow */
115 : :
116 : :
117 : : /*
118 : : * Helper function for all connection establishment functions.
119 : : */
120 : : static inline void
121 : 998 : libpqsrv_connect_prepare(void)
122 : : {
123 : : /*
124 : : * We must obey fd.c's limit on non-virtual file descriptors. Assume that
125 : : * a PGconn represents one long-lived FD. (Doing this here also ensures
126 : : * that VFDs are closed if needed to make room.)
127 : : */
128 [ - + ]: 998 : if (!AcquireExternalFD())
129 : : {
130 : : #ifndef WIN32 /* can't write #if within ereport() macro */
957 andres@anarazel.de 131 [ # # ]:UBC 0 : ereport(ERROR,
132 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
133 : : errmsg("could not establish connection"),
134 : : errdetail("There are too many open files on the local server."),
135 : : errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
136 : : #else
137 : : ereport(ERROR,
138 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
139 : : errmsg("could not establish connection"),
140 : : errdetail("There are too many open files on the local server."),
141 : : errhint("Raise the server's \"max_files_per_process\" setting.")));
142 : : #endif
143 : : }
957 andres@anarazel.de 144 :CBC 998 : }
145 : :
146 : : /*
147 : : * Helper function for all connection establishment functions.
148 : : */
149 : : static inline void
150 : 998 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
151 : : {
152 : : /*
153 : : * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
154 : : * that here.
155 : : */
156 [ - + ]: 998 : if (conn == NULL)
157 : : {
957 andres@anarazel.de 158 :UBC 0 : ReleaseExternalFD();
159 : 0 : return;
160 : : }
161 : :
162 : : /*
163 : : * Can't wait without a socket. Note that we don't want to close the libpq
164 : : * connection yet, so callers can emit a useful error.
165 : : */
957 andres@anarazel.de 166 [ + + ]:CBC 998 : if (PQstatus(conn) == CONNECTION_BAD)
167 : 64 : return;
168 : :
169 : : /*
170 : : * WaitLatchOrSocket() can conceivably fail, handle that case here instead
171 : : * of requiring all callers to do so.
172 : : */
173 [ + - ]: 934 : PG_TRY();
174 : : {
175 : : PostgresPollingStatusType status;
176 : :
177 : : /*
178 : : * Poll connection until we have OK or FAILED status.
179 : : *
180 : : * Per spec for PQconnectPoll, first wait till socket is write-ready.
181 : : */
182 : 934 : status = PGRES_POLLING_WRITING;
183 [ + + + + ]: 4410 : while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
184 : : {
185 : : int io_flag;
186 : : int rc;
187 : :
188 [ + + ]: 2543 : if (status == PGRES_POLLING_READING)
189 : 971 : io_flag = WL_SOCKET_READABLE;
190 : : #ifdef WIN32
191 : :
192 : : /*
193 : : * Windows needs a different test while waiting for
194 : : * connection-made
195 : : */
196 : : else if (PQstatus(conn) == CONNECTION_STARTED)
197 : : io_flag = WL_SOCKET_CONNECTED;
198 : : #endif
199 : : else
200 : 1572 : io_flag = WL_SOCKET_WRITEABLE;
201 : :
202 : 2543 : rc = WaitLatchOrSocket(MyLatch,
203 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
204 : : PQsocket(conn),
205 : : 0,
206 : : wait_event_info);
207 : :
208 : : /* Interrupted? */
209 [ + + ]: 2543 : if (rc & WL_LATCH_SET)
210 : : {
211 : 622 : ResetLatch(MyLatch);
212 [ + + ]: 622 : CHECK_FOR_INTERRUPTS();
213 : : }
214 : :
215 : : /* If socket is ready, advance the libpq state machine */
216 [ + + ]: 2542 : if (rc & io_flag)
217 : 1921 : status = PQconnectPoll(conn);
218 : : }
219 : : }
957 andres@anarazel.de 220 :UBC 0 : PG_CATCH();
221 : : {
222 : : /*
223 : : * If an error is thrown here, the callers won't call
224 : : * libpqsrv_disconnect() with a conn, so release resources
225 : : * immediately.
226 : : */
227 : 0 : ReleaseExternalFD();
228 : 0 : PQfinish(conn);
229 : :
230 : 0 : PG_RE_THROW();
231 : : }
957 andres@anarazel.de 232 [ - + ]:CBC 933 : PG_END_TRY();
233 : : }
234 : :
235 : : /*
236 : : * PQexec() wrapper that processes interrupts.
237 : : *
238 : : * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
239 : : * interrupts while pushing the query text to the server. Consider that
240 : : * setting if query strings can be long relative to TCP buffer size.
241 : : *
242 : : * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
243 : : * notably, PQexec() would silently discard any prior query results.
244 : : */
245 : : static inline PGresult *
607 noah@leadboat.com 246 : 3851 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
247 : : {
248 [ - + ]: 3851 : if (!PQsendQuery(conn, query))
607 noah@leadboat.com 249 :UBC 0 : return NULL;
607 noah@leadboat.com 250 :CBC 3851 : return libpqsrv_get_result_last(conn, wait_event_info);
251 : : }
252 : :
253 : : /*
254 : : * PQexecParams() wrapper that processes interrupts.
255 : : *
256 : : * See notes at libpqsrv_exec().
257 : : */
258 : : static inline PGresult *
259 : : libpqsrv_exec_params(PGconn *conn,
260 : : const char *command,
261 : : int nParams,
262 : : const Oid *paramTypes,
263 : : const char *const *paramValues,
264 : : const int *paramLengths,
265 : : const int *paramFormats,
266 : : int resultFormat,
267 : : uint32 wait_event_info)
268 : : {
269 : : if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
270 : : paramLengths, paramFormats, resultFormat))
271 : : return NULL;
272 : : return libpqsrv_get_result_last(conn, wait_event_info);
273 : : }
274 : :
275 : : /*
276 : : * Like PQexec(), loop over PQgetResult() until it returns NULL or another
277 : : * terminal state. Return the last non-NULL result or the terminal state.
278 : : */
279 : : static inline PGresult *
280 : 12079 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
281 : : {
43 tgl@sss.pgh.pa.us 282 :GNC 12079 : PGresult *lastResult = NULL;
283 : :
284 : : for (;;)
43 tgl@sss.pgh.pa.us 285 :GIC 11359 : {
286 : : /* Wait for, and collect, the next PGresult. */
287 : : PGresult *result;
288 : :
43 tgl@sss.pgh.pa.us 289 :GNC 23438 : result = libpqsrv_get_result(conn, wait_event_info);
290 [ + + ]: 23437 : if (result == NULL)
291 : 11357 : break; /* query is complete, or failure */
292 : :
293 : : /*
294 : : * Emulate PQexec()'s behavior of returning the last result when there
295 : : * are many.
296 : : */
43 tgl@sss.pgh.pa.us 297 :CBC 12080 : PQclear(lastResult);
43 tgl@sss.pgh.pa.us 298 :GNC 12080 : lastResult = result;
299 : :
300 [ + - + + ]: 24160 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
301 [ + + ]: 23968 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
302 [ + + ]: 23249 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
303 : 11361 : PQstatus(conn) == CONNECTION_BAD)
304 : : break;
305 : : }
607 noah@leadboat.com 306 :CBC 12078 : return lastResult;
307 : : }
308 : :
309 : : /*
310 : : * Perform the equivalent of PQgetResult(), but watch for interrupts.
311 : : */
312 : : static inline PGresult *
313 : 24690 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
314 : : {
315 : : /*
316 : : * Collect data until PQgetResult is ready to get the result without
317 : : * blocking.
318 : : */
319 [ + + ]: 36705 : while (PQisBusy(conn))
320 : : {
321 : : int rc;
322 : :
323 : 12050 : rc = WaitLatchOrSocket(MyLatch,
324 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
325 : : WL_SOCKET_READABLE,
326 : : PQsocket(conn),
327 : : 0,
328 : : wait_event_info);
329 : :
330 : : /* Interrupted? */
331 [ + + ]: 12050 : if (rc & WL_LATCH_SET)
332 : : {
333 : 11 : ResetLatch(MyLatch);
334 [ + + ]: 11 : CHECK_FOR_INTERRUPTS();
335 : : }
336 : :
337 : : /* Consume whatever data is available from the socket */
338 [ + + ]: 12049 : if (PQconsumeInput(conn) == 0)
339 : : {
340 : : /* trouble; expect PQgetResult() to return NULL */
341 : 34 : break;
342 : : }
343 : : }
344 : :
345 : : /* Now we can collect and return the next PGresult */
346 : 24689 : return PQgetResult(conn);
347 : : }
348 : :
349 : : /*
350 : : * Submit a cancel request to the given connection, waiting only until
351 : : * the given time.
352 : : *
353 : : * We sleep interruptibly until we receive confirmation that the cancel
354 : : * request has been accepted, and if it is, return NULL; if the cancel
355 : : * request fails, return an error message string (which is not to be
356 : : * freed).
357 : : *
358 : : * For other problems (to wit: OOM when strdup'ing an error message from
359 : : * libpq), this function can ereport(ERROR).
360 : : *
361 : : * Note: this function leaks a string's worth of memory when reporting
362 : : * libpq errors. Make sure to call it in a transient memory context.
363 : : */
364 : : static inline const char *
527 alvherre@alvh.no-ip. 365 : 3 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
366 : : {
367 : : PGcancelConn *cancel_conn;
519 368 : 3 : const char *error = NULL;
369 : :
527 370 : 3 : cancel_conn = PQcancelCreate(conn);
371 [ - + ]: 3 : if (cancel_conn == NULL)
527 alvherre@alvh.no-ip. 372 :UBC 0 : return "out of memory";
373 : :
374 : : /* In what follows, do not leak any PGcancelConn on any errors. */
375 : :
527 alvherre@alvh.no-ip. 376 [ + - ]:CBC 3 : PG_TRY();
377 : : {
378 [ - + ]: 3 : if (!PQcancelStart(cancel_conn))
379 : : {
527 alvherre@alvh.no-ip. 380 :UBC 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
381 : 0 : goto exit;
382 : : }
383 : :
384 : : for (;;)
527 alvherre@alvh.no-ip. 385 :CBC 3 : {
386 : : PostgresPollingStatusType pollres;
387 : : TimestampTz now;
388 : : long cur_timeout;
389 : 6 : int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
390 : :
391 : 6 : pollres = PQcancelPoll(cancel_conn);
392 [ + + ]: 6 : if (pollres == PGRES_POLLING_OK)
393 : 3 : break; /* success! */
394 : :
395 : : /* If timeout has expired, give up, else get sleep time. */
396 : 3 : now = GetCurrentTimestamp();
397 : 3 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
398 [ - + ]: 3 : if (cur_timeout <= 0)
399 : : {
527 alvherre@alvh.no-ip. 400 :UBC 0 : error = "cancel request timed out";
401 : 0 : break;
402 : : }
403 : :
527 alvherre@alvh.no-ip. 404 [ + - - ]:CBC 3 : switch (pollres)
405 : : {
406 : 3 : case PGRES_POLLING_READING:
407 : 3 : waitEvents |= WL_SOCKET_READABLE;
408 : 3 : break;
527 alvherre@alvh.no-ip. 409 :UBC 0 : case PGRES_POLLING_WRITING:
410 : 0 : waitEvents |= WL_SOCKET_WRITEABLE;
411 : 0 : break;
412 : 0 : default:
413 : 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
414 : 0 : goto exit;
415 : : }
416 : :
417 : : /* Sleep until there's something to do */
527 alvherre@alvh.no-ip. 418 :CBC 3 : WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
419 : : cur_timeout, PG_WAIT_CLIENT);
420 : :
421 : 3 : ResetLatch(MyLatch);
422 : :
423 [ - + ]: 3 : CHECK_FOR_INTERRUPTS();
424 : : }
425 : 3 : exit: ;
426 : : }
527 alvherre@alvh.no-ip. 427 :UBC 0 : PG_FINALLY();
428 : : {
527 alvherre@alvh.no-ip. 429 :CBC 3 : PQcancelFinish(cancel_conn);
430 : : }
431 [ - + ]: 3 : PG_END_TRY();
432 : :
433 : 3 : return error;
434 : : }
435 : :
436 : : /*
437 : : * libpqsrv_notice_receiver
438 : : *
439 : : * Custom notice receiver for libpq connections.
440 : : *
441 : : * This function is intended to be set via PQsetNoticeReceiver() so that
442 : : * NOTICE, WARNING, and similar messages from the connection are reported via
443 : : * ereport(), instead of being printed to stderr.
444 : : *
445 : : * Because this will be called from libpq with a "real" (not wrapped)
446 : : * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
447 : : * for PGresult and also PQresultErrorMessage, and put back the wrappers
448 : : * afterwards. That's not pretty, but there seems no better alternative.
449 : : */
450 : : #undef PGresult
451 : : #undef PQresultErrorMessage
452 : :
453 : : static inline void
46 fujii@postgresql.org 454 :GNC 8 : libpqsrv_notice_receiver(void *arg, const PGresult *res)
455 : : {
456 : : const char *message;
457 : : int len;
43 tgl@sss.pgh.pa.us 458 : 8 : const char *prefix = (const char *) arg;
459 : :
460 : : /*
461 : : * Trim the trailing newline from the message text returned from
462 : : * PQresultErrorMessage(), as it always includes one, to produce cleaner
463 : : * log output.
464 : : */
46 fujii@postgresql.org 465 : 8 : message = PQresultErrorMessage(res);
466 : 8 : len = strlen(message);
467 [ + - + - ]: 8 : if (len > 0 && message[len - 1] == '\n')
468 : 8 : len--;
469 : :
470 [ + - ]: 8 : ereport(LOG,
471 : : errmsg_internal("%s: %.*s", prefix, len, message));
472 : 8 : }
473 : :
474 : : #define PGresult libpqsrv_PGresult
475 : : #define PQresultErrorMessage libpqsrv_PQresultErrorMessage
476 : :
477 : : #endif /* LIBPQ_BE_FE_HELPERS_H */
|