Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * connection.c
4 : : * Connection management functions for postgres_fdw
5 : : *
6 : : * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * contrib/postgres_fdw/connection.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #if HAVE_POLL_H
16 : : #include <poll.h>
17 : : #endif
18 : :
19 : : #include "access/htup_details.h"
20 : : #include "access/xact.h"
21 : : #include "catalog/pg_user_mapping.h"
22 : : #include "commands/defrem.h"
23 : : #include "common/base64.h"
24 : : #include "funcapi.h"
25 : : #include "libpq/libpq-be.h"
26 : : #include "libpq/libpq-be-fe-helpers.h"
27 : : #include "mb/pg_wchar.h"
28 : : #include "miscadmin.h"
29 : : #include "pgstat.h"
30 : : #include "postgres_fdw.h"
31 : : #include "storage/latch.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/hsearch.h"
34 : : #include "utils/inval.h"
35 : : #include "utils/syscache.h"
36 : :
37 : : /*
38 : : * Connection cache hash table entry
39 : : *
40 : : * The lookup key in this hash table is the user mapping OID. We use just one
41 : : * connection per user mapping ID, which ensures that all the scans use the
42 : : * same snapshot during a query. Using the user mapping OID rather than
43 : : * the foreign server OID + user OID avoids creating multiple connections when
44 : : * the public user mapping applies to all user OIDs.
45 : : *
46 : : * The "conn" pointer can be NULL if we don't currently have a live connection.
47 : : * When we do have a connection, xact_depth tracks the current depth of
48 : : * transactions and subtransactions open on the remote side. We need to issue
49 : : * commands at the same nesting depth on the remote as we're executing at
50 : : * ourselves, so that rolling back a subtransaction will kill the right
51 : : * queries and not the wrong ones.
52 : : */
53 : : typedef Oid ConnCacheKey;
54 : :
55 : : typedef struct ConnCacheEntry
56 : : {
57 : : ConnCacheKey key; /* hash key (must be first) */
58 : : PGconn *conn; /* connection to foreign server, or NULL */
59 : : /* Remaining fields are invalid when conn is NULL: */
60 : : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
61 : : * one level of subxact open, etc */
62 : : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
63 : : bool have_error; /* have any subxacts aborted in this xact? */
64 : : bool changing_xact_state; /* xact state change in process */
65 : : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
66 : : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
67 : : bool invalidated; /* true if reconnect is pending */
68 : : bool keep_connections; /* setting value of keep_connections
69 : : * server option */
70 : : Oid serverid; /* foreign server OID used to get server name */
71 : : uint32 server_hashvalue; /* hash value of foreign server OID */
72 : : uint32 mapping_hashvalue; /* hash value of user mapping OID */
73 : : PgFdwConnState state; /* extra per-connection state */
74 : : } ConnCacheEntry;
75 : :
76 : : /*
77 : : * Connection cache (initialized on first use)
78 : : */
79 : : static HTAB *ConnectionHash = NULL;
80 : :
81 : : /* for assigning cursor numbers and prepared statement numbers */
82 : : static unsigned int cursor_number = 0;
83 : : static unsigned int prep_stmt_number = 0;
84 : :
85 : : /* tracks whether any work is needed in callback functions */
86 : : static bool xact_got_connection = false;
87 : :
88 : : /* custom wait event values, retrieved from shared memory */
89 : : static uint32 pgfdw_we_cleanup_result = 0;
90 : : static uint32 pgfdw_we_connect = 0;
91 : : static uint32 pgfdw_we_get_result = 0;
92 : :
93 : : /*
94 : : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
95 : : * query; if it takes longer than 30 seconds to do these, we assume the
96 : : * connection is dead.
97 : : */
98 : : #define CONNECTION_CLEANUP_TIMEOUT 30000
99 : :
100 : : /*
101 : : * Milliseconds to wait before issuing another cancel request. This covers
102 : : * the race condition where the remote session ignored our cancel request
103 : : * because it arrived while idle.
104 : : */
105 : : #define RETRY_CANCEL_TIMEOUT 1000
106 : :
107 : : /* Macro for constructing abort command to be sent */
108 : : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
109 : : do { \
110 : : if (toplevel) \
111 : : snprintf((sql), sizeof(sql), \
112 : : "ABORT TRANSACTION"); \
113 : : else \
114 : : snprintf((sql), sizeof(sql), \
115 : : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
116 : : (entry)->xact_depth, (entry)->xact_depth); \
117 : : } while(0)
118 : :
119 : : /*
120 : : * Extension version number, for supporting older extension versions' objects
121 : : */
122 : : enum pgfdwVersion
123 : : {
124 : : PGFDW_V1_1 = 0,
125 : : PGFDW_V1_2,
126 : : };
127 : :
128 : : /*
129 : : * SQL functions
130 : : */
1743 fujii@postgresql.org 131 :CBC 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
458 132 : 5 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
1735 133 : 5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
134 : 5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
135 : :
136 : : /* prototypes of private functions */
137 : : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
138 : : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
139 : : static void disconnect_pg_server(ConnCacheEntry *entry);
140 : : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
141 : : static void configure_remote_session(PGconn *conn);
142 : : static void do_sql_command_begin(PGconn *conn, const char *sql);
143 : : static void do_sql_command_end(PGconn *conn, const char *sql,
144 : : bool consume_input);
145 : : static void begin_remote_xact(ConnCacheEntry *entry);
146 : : static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
147 : : const char *sql);
148 : : static void pgfdw_xact_callback(XactEvent event, void *arg);
149 : : static void pgfdw_subxact_callback(SubXactEvent event,
150 : : SubTransactionId mySubid,
151 : : SubTransactionId parentSubid,
152 : : void *arg);
153 : : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
154 : : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
155 : : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
156 : : static bool pgfdw_cancel_query(PGconn *conn);
157 : : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
158 : : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
159 : : TimestampTz retrycanceltime,
160 : : bool consume_input);
161 : : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
162 : : bool ignore_errors);
163 : : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
164 : : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
165 : : TimestampTz endtime,
166 : : bool consume_input,
167 : : bool ignore_errors);
168 : : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
169 : : TimestampTz retrycanceltime,
170 : : PGresult **result, bool *timed_out);
171 : : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
172 : : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
173 : : List **pending_entries,
174 : : List **cancel_requested);
175 : : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
176 : : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
177 : : int curlevel);
178 : : static void pgfdw_finish_abort_cleanup(List *pending_entries,
179 : : List *cancel_requested,
180 : : bool toplevel);
181 : : static void pgfdw_security_check(const char **keywords, const char **values,
182 : : UserMapping *user, PGconn *conn);
183 : : static bool UserMappingPasswordRequired(UserMapping *user);
184 : : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
185 : : static bool disconnect_cached_connections(Oid serverid);
186 : : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
187 : : enum pgfdwVersion api_version);
188 : : static int pgfdw_conn_check(PGconn *conn);
189 : : static bool pgfdw_conn_checkable(void);
190 : : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
191 : :
192 : : /*
193 : : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
194 : : * server with the user's authorization. A new connection is established
195 : : * if we don't already have a suitable one, and a transaction is opened at
196 : : * the right subtransaction nesting depth if we didn't do that already.
197 : : *
198 : : * will_prep_stmt must be true if caller intends to create any prepared
199 : : * statements. Since those don't go away automatically at transaction end
200 : : * (not even on error), we need this flag to cue manual cleanup.
201 : : *
202 : : * If state is not NULL, *state receives the per-connection state associated
203 : : * with the PGconn.
204 : : */
205 : : PGconn *
1671 efujita@postgresql.o 206 : 2207 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
207 : : {
208 : : bool found;
1837 fujii@postgresql.org 209 : 2207 : bool retry = false;
210 : : ConnCacheEntry *entry;
211 : : ConnCacheKey key;
212 : 2207 : MemoryContext ccxt = CurrentMemoryContext;
213 : :
214 : : /* First time through, initialize connection cache hashtable */
4631 tgl@sss.pgh.pa.us 215 [ + + ]: 2207 : if (ConnectionHash == NULL)
216 : : {
217 : : HASHCTL ctl;
218 : :
658 noah@leadboat.com 219 [ + - ]: 15 : if (pgfdw_we_get_result == 0)
220 : 15 : pgfdw_we_get_result =
221 : 15 : WaitEventExtensionNew("PostgresFdwGetResult");
222 : :
4631 tgl@sss.pgh.pa.us 223 : 15 : ctl.keysize = sizeof(ConnCacheKey);
224 : 15 : ctl.entrysize = sizeof(ConnCacheEntry);
225 : 15 : ConnectionHash = hash_create("postgres_fdw connections", 8,
226 : : &ctl,
227 : : HASH_ELEM | HASH_BLOBS);
228 : :
229 : : /*
230 : : * Register some callback functions that manage connection cleanup.
231 : : * This should be done just once in each backend.
232 : : */
233 : 15 : RegisterXactCallback(pgfdw_xact_callback, NULL);
234 : 15 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
3020 235 : 15 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
236 : : pgfdw_inval_callback, (Datum) 0);
237 : 15 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
238 : : pgfdw_inval_callback, (Datum) 0);
239 : : }
240 : :
241 : : /* Set flag that we did GetConnection during the current transaction */
4631 242 : 2207 : xact_got_connection = true;
243 : :
244 : : /* Create hash key for the entry. Assume no pad bytes in key struct */
3560 rhaas@postgresql.org 245 : 2207 : key = user->umid;
246 : :
247 : : /*
248 : : * Find or create cached entry for requested connection.
249 : : */
4631 tgl@sss.pgh.pa.us 250 : 2207 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
251 [ + + ]: 2207 : if (!found)
252 : : {
253 : : /*
254 : : * We need only clear "conn" here; remaining fields will be filled
255 : : * later when "conn" is set.
256 : : */
257 : 23 : entry->conn = NULL;
258 : : }
259 : :
260 : : /* Reject further use of connections which failed abort cleanup. */
3064 rhaas@postgresql.org 261 : 2207 : pgfdw_reject_incomplete_xact_state_change(entry);
262 : :
263 : : /*
264 : : * If the connection needs to be remade due to invalidation, disconnect as
265 : : * soon as we're out of all transactions.
266 : : */
1837 fujii@postgresql.org 267 [ + + - + : 2207 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
- - ]
268 : : {
1837 fujii@postgresql.org 269 [ # # ]:UBC 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
270 : : entry->conn);
3020 tgl@sss.pgh.pa.us 271 : 0 : disconnect_pg_server(entry);
272 : : }
273 : :
274 : : /*
275 : : * If cache entry doesn't have a connection, we have to establish a new
276 : : * connection. (If connect_pg_server throws an error, the cache entry
277 : : * will remain in a valid empty state, ie conn == NULL.)
278 : : */
4631 tgl@sss.pgh.pa.us 279 [ + + ]:CBC 2207 : if (entry->conn == NULL)
1837 fujii@postgresql.org 280 : 83 : make_new_connection(entry, user);
281 : :
282 : : /*
283 : : * We check the health of the cached connection here when using it. In
284 : : * cases where we're out of all transactions, if a broken connection is
285 : : * detected, we try to reestablish a new connection later.
286 : : */
1847 287 [ + + ]: 2198 : PG_TRY();
288 : : {
289 : : /* Process a pending asynchronous request if any. */
1671 efujita@postgresql.o 290 [ - + ]: 2198 : if (entry->state.pendingAreq)
1671 efujita@postgresql.o 291 :UBC 0 : process_pending_request(entry->state.pendingAreq);
292 : : /* Start a new transaction or subtransaction if needed. */
1847 fujii@postgresql.org 293 :CBC 2198 : begin_remote_xact(entry);
294 : : }
295 : 2 : PG_CATCH();
296 : : {
1837 297 : 2 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
298 : 2 : ErrorData *errdata = CopyErrorData();
299 : :
300 : : /*
301 : : * Determine whether to try to reestablish the connection.
302 : : *
303 : : * After a broken connection is detected in libpq, any error other
304 : : * than connection failure (e.g., out-of-memory) can be thrown
305 : : * somewhere between return from libpq and the expected ereport() call
306 : : * in pgfdw_report_error(). In this case, since PQstatus() indicates
307 : : * CONNECTION_BAD, checking only PQstatus() causes the false detection
308 : : * of connection failure. To avoid this, we also verify that the
309 : : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
310 : : * checking only the sqlstate can cause another false detection
311 : : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
312 : : * for any libpq-originated error condition.
313 : : */
314 [ + - ]: 2 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
315 [ + - ]: 2 : PQstatus(entry->conn) != CONNECTION_BAD ||
316 [ + + ]: 2 : entry->xact_depth > 0)
317 : : {
318 : 1 : MemoryContextSwitchTo(ecxt);
1847 319 : 1 : PG_RE_THROW();
320 : : }
321 : :
322 : : /* Clean up the error state */
1837 323 : 1 : FlushErrorState();
324 : 1 : FreeErrorData(errdata);
325 : 1 : errdata = NULL;
326 : :
327 : 1 : retry = true;
328 : : }
1847 329 [ - + ]: 2197 : PG_END_TRY();
330 : :
331 : : /*
332 : : * If a broken connection is detected, disconnect it, reestablish a new
333 : : * connection and retry a new remote transaction. If connection failure is
334 : : * reported again, we give up getting a connection.
335 : : */
1837 336 [ + + ]: 2197 : if (retry)
337 : : {
338 [ - + ]: 1 : Assert(entry->xact_depth == 0);
339 : :
1847 340 [ - + ]: 1 : ereport(DEBUG3,
341 : : (errmsg_internal("could not start remote transaction on connection %p",
342 : : entry->conn)),
343 : : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
344 : :
1837 345 [ - + ]: 1 : elog(DEBUG3, "closing connection %p to reestablish a new one",
346 : : entry->conn);
347 : 1 : disconnect_pg_server(entry);
348 : :
955 efujita@postgresql.o 349 : 1 : make_new_connection(entry, user);
350 : :
1837 fujii@postgresql.org 351 : 1 : begin_remote_xact(entry);
352 : : }
353 : :
354 : : /* Remember if caller will prepare statements */
4614 tgl@sss.pgh.pa.us 355 : 2197 : entry->have_prep_stmt |= will_prep_stmt;
356 : :
357 : : /* If caller needs access to the per-connection state, return it. */
1671 efujita@postgresql.o 358 [ + + ]: 2197 : if (state)
359 : 750 : *state = &entry->state;
360 : :
4631 tgl@sss.pgh.pa.us 361 : 2197 : return entry->conn;
362 : : }
363 : :
364 : : /*
365 : : * Reset all transient state fields in the cached connection entry and
366 : : * establish new connection to the remote server.
367 : : */
368 : : static void
1837 fujii@postgresql.org 369 : 84 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
370 : : {
371 : 84 : ForeignServer *server = GetForeignServer(user->serverid);
372 : : ListCell *lc;
373 : :
374 [ - + ]: 84 : Assert(entry->conn == NULL);
375 : :
376 : : /* Reset all transient state fields, to be sure all are clean */
377 : 84 : entry->xact_depth = 0;
378 : 84 : entry->have_prep_stmt = false;
379 : 84 : entry->have_error = false;
380 : 84 : entry->changing_xact_state = false;
381 : 84 : entry->invalidated = false;
1746 382 : 84 : entry->serverid = server->serverid;
1837 383 : 84 : entry->server_hashvalue =
384 : 84 : GetSysCacheHashValue1(FOREIGNSERVEROID,
385 : : ObjectIdGetDatum(server->serverid));
386 : 84 : entry->mapping_hashvalue =
387 : 84 : GetSysCacheHashValue1(USERMAPPINGOID,
388 : : ObjectIdGetDatum(user->umid));
1671 efujita@postgresql.o 389 : 84 : memset(&entry->state, 0, sizeof(entry->state));
390 : :
391 : : /*
392 : : * Determine whether to keep the connection that we're about to make here
393 : : * open even after the transaction using it ends, so that the subsequent
394 : : * transactions can re-use it.
395 : : *
396 : : * By default, all the connections to any foreign servers are kept open.
397 : : *
398 : : * Also determine whether to commit/abort (sub)transactions opened on the
399 : : * remote server in parallel at (sub)transaction end, which is disabled by
400 : : * default.
401 : : *
402 : : * Note: it's enough to determine these only when making a new connection
403 : : * because if these settings for it are changed, it will be closed and
404 : : * re-made later.
405 : : */
1669 fujii@postgresql.org 406 : 84 : entry->keep_connections = true;
1341 efujita@postgresql.o 407 : 84 : entry->parallel_commit = false;
935 408 : 84 : entry->parallel_abort = false;
1669 fujii@postgresql.org 409 [ + - + + : 385 : foreach(lc, server->options)
+ + ]
410 : : {
411 : 301 : DefElem *def = (DefElem *) lfirst(lc);
412 : :
413 [ + + ]: 301 : if (strcmp(def->defname, "keep_connections") == 0)
414 : 13 : entry->keep_connections = defGetBoolean(def);
1341 efujita@postgresql.o 415 [ + + ]: 288 : else if (strcmp(def->defname, "parallel_commit") == 0)
416 : 2 : entry->parallel_commit = defGetBoolean(def);
935 417 [ + + ]: 286 : else if (strcmp(def->defname, "parallel_abort") == 0)
418 : 2 : entry->parallel_abort = defGetBoolean(def);
419 : : }
420 : :
421 : : /* Now try to make the connection */
1837 fujii@postgresql.org 422 : 84 : entry->conn = connect_pg_server(server, user);
423 : :
424 [ - + ]: 75 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
425 : : entry->conn, server->servername, user->umid, user->userid);
426 : 75 : }
427 : :
428 : : /*
429 : : * Check that non-superuser has used password or delegated credentials
430 : : * to establish connection; otherwise, he's piggybacking on the
431 : : * postgres server's user identity. See also dblink_security_check()
432 : : * in contrib/dblink and check_conn_params.
433 : : */
434 : : static void
928 sfrost@snowman.net 435 : 78 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
436 : : {
437 : : /* Superusers bypass the check */
438 [ + + ]: 78 : if (superuser_arg(user->userid))
439 : 67 : return;
440 : :
441 : : #ifdef ENABLE_GSS
442 : : /* Connected via GSSAPI with delegated credentials- all good. */
891 bruce@momjian.us 443 [ + + + - ]: 11 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
928 sfrost@snowman.net 444 : 2 : return;
445 : : #endif
446 : :
447 : : /* Ok if superuser set PW required false. */
448 [ + + ]: 9 : if (!UserMappingPasswordRequired(user))
449 : 2 : return;
450 : :
451 : : /* Connected via PW, with PW required true, and provided non-empty PW. */
452 [ + + ]: 7 : if (PQconnectionUsedPassword(conn))
453 : : {
454 : : /* ok if params contain a non-empty password */
455 [ + + ]: 47 : for (int i = 0; keywords[i] != NULL; i++)
456 : : {
457 [ - + - - ]: 42 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
928 sfrost@snowman.net 458 :UBC 0 : return;
459 : : }
460 : : }
461 : :
462 : : /*
463 : : * Ok if SCRAM pass-through is being used and all required SCRAM options
464 : : * are set correctly. If pgfdw_has_required_scram_options returns true we
465 : : * assume that UseScramPassthrough is also true since SCRAM options are
466 : : * only set when UseScramPassthrough is enabled.
467 : : */
80 peter@eisentraut.org 468 [ + - + + :CBC 7 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
+ - ]
217 469 : 4 : return;
470 : :
928 sfrost@snowman.net 471 [ + - ]: 3 : ereport(ERROR,
472 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
473 : : errmsg("password or GSSAPI delegated credentials required"),
474 : : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
475 : : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
476 : : }
477 : :
478 : : /*
479 : : * Connect to remote server using specified server and user mapping properties.
480 : : */
481 : : static PGconn *
4631 tgl@sss.pgh.pa.us 482 : 84 : connect_pg_server(ForeignServer *server, UserMapping *user)
483 : : {
484 : 84 : PGconn *volatile conn = NULL;
485 : :
486 : : /*
487 : : * Use PG_TRY block to ensure closing connection on error.
488 : : */
489 [ + + ]: 84 : PG_TRY();
490 : : {
491 : : const char **keywords;
492 : : const char **values;
1403 fujii@postgresql.org 493 : 84 : char *appname = NULL;
494 : : int n;
495 : :
496 : : /*
497 : : * Construct connection params from generic options of ForeignServer
498 : : * and UserMapping. (Some of them might not be libpq options, in
499 : : * which case we'll just waste a few array slots.) Add 4 extra slots
500 : : * for application_name, fallback_application_name, client_encoding,
501 : : * end marker, and 3 extra slots for scram keys and required scram
502 : : * pass-through options.
503 : : */
217 peter@eisentraut.org 504 : 84 : n = list_length(server->options) + list_length(user->options) + 4 + 3;
4631 tgl@sss.pgh.pa.us 505 : 84 : keywords = (const char **) palloc(n * sizeof(char *));
506 : 84 : values = (const char **) palloc(n * sizeof(char *));
507 : :
508 : 84 : n = 0;
509 : 168 : n += ExtractConnectionOptions(server->options,
510 : 84 : keywords + n, values + n);
511 : 168 : n += ExtractConnectionOptions(user->options,
512 : 84 : keywords + n, values + n);
513 : :
514 : : /*
515 : : * Use pgfdw_application_name as application_name if set.
516 : : *
517 : : * PQconnectdbParams() processes the parameter arrays from start to
518 : : * end. If any key word is repeated, the last value is used. Therefore
519 : : * note that pgfdw_application_name must be added to the arrays after
520 : : * options of ForeignServer are, so that it can override
521 : : * application_name set in ForeignServer.
522 : : */
1511 fujii@postgresql.org 523 [ + + + - ]: 84 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
524 : : {
525 : 1 : keywords[n] = "application_name";
526 : 1 : values[n] = pgfdw_application_name;
527 : 1 : n++;
528 : : }
529 : :
530 : : /*
531 : : * Search the parameter arrays to find application_name setting, and
532 : : * replace escape sequences in it with status information if found.
533 : : * The arrays are searched backwards because the last value is used if
534 : : * application_name is repeatedly set.
535 : : */
1403 536 [ + + ]: 231 : for (int i = n - 1; i >= 0; i--)
537 : : {
538 [ + + ]: 170 : if (strcmp(keywords[i], "application_name") == 0 &&
539 [ + - ]: 23 : *(values[i]) != '\0')
540 : : {
541 : : /*
542 : : * Use this application_name setting if it's not empty string
543 : : * even after any escape sequences in it are replaced.
544 : : */
545 : 23 : appname = process_pgfdw_appname(values[i]);
546 [ + - ]: 23 : if (appname[0] != '\0')
547 : : {
548 : 23 : values[i] = appname;
549 : 23 : break;
550 : : }
551 : :
552 : : /*
553 : : * This empty application_name is not used, so we set
554 : : * values[i] to NULL and keep searching the array to find the
555 : : * next one.
556 : : */
1403 fujii@postgresql.org 557 :UBC 0 : values[i] = NULL;
558 : 0 : pfree(appname);
559 : 0 : appname = NULL;
560 : : }
561 : : }
562 : :
563 : : /* Use "postgres_fdw" as fallback_application_name */
4631 tgl@sss.pgh.pa.us 564 :CBC 84 : keywords[n] = "fallback_application_name";
565 : 84 : values[n] = "postgres_fdw";
566 : 84 : n++;
567 : :
568 : : /* Set client_encoding so that libpq can convert encoding properly. */
569 : 84 : keywords[n] = "client_encoding";
570 : 84 : values[n] = GetDatabaseEncodingName();
571 : 84 : n++;
572 : :
573 : : /* Add required SCRAM pass-through connection options if it's enabled. */
80 peter@eisentraut.org 574 [ + - + + : 84 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
+ - ]
575 : : {
576 : : int len;
577 : : int encoded_len;
578 : :
285 579 : 4 : keywords[n] = "scram_client_key";
580 : 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
581 : : /* don't forget the zero-terminator */
582 : 4 : values[n] = palloc0(len + 1);
172 heikki.linnakangas@i 583 : 8 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
584 : : sizeof(MyProcPort->scram_ClientKey),
284 peter@eisentraut.org 585 : 4 : (char *) values[n], len);
586 [ - + ]: 4 : if (encoded_len < 0)
284 peter@eisentraut.org 587 [ # # ]:UBC 0 : elog(ERROR, "could not encode SCRAM client key");
285 peter@eisentraut.org 588 :CBC 4 : n++;
589 : :
590 : 4 : keywords[n] = "scram_server_key";
591 : 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
592 : : /* don't forget the zero-terminator */
593 : 4 : values[n] = palloc0(len + 1);
172 heikki.linnakangas@i 594 : 8 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
595 : : sizeof(MyProcPort->scram_ServerKey),
284 peter@eisentraut.org 596 : 4 : (char *) values[n], len);
597 [ - + ]: 4 : if (encoded_len < 0)
284 peter@eisentraut.org 598 [ # # ]:UBC 0 : elog(ERROR, "could not encode SCRAM server key");
285 peter@eisentraut.org 599 :CBC 4 : n++;
600 : :
601 : : /*
602 : : * Require scram-sha-256 to ensure that no other auth method is
603 : : * used when connecting with foreign server.
604 : : */
217 605 : 4 : keywords[n] = "require_auth";
606 : 4 : values[n] = "scram-sha-256";
607 : 4 : n++;
608 : : }
609 : :
4631 tgl@sss.pgh.pa.us 610 : 84 : keywords[n] = values[n] = NULL;
611 : :
612 : : /* Verify the set of connection parameters. */
217 peter@eisentraut.org 613 : 84 : check_conn_params(keywords, values, user);
614 : :
615 : : /* first time, allocate or get the custom wait event */
753 michael@paquier.xyz 616 [ + + ]: 80 : if (pgfdw_we_connect == 0)
617 : 13 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
618 : :
619 : : /* OK to make connection */
1008 andres@anarazel.de 620 : 80 : conn = libpqsrv_connect_params(keywords, values,
621 : : false, /* expand_dbname */
622 : : pgfdw_we_connect);
623 : :
4631 tgl@sss.pgh.pa.us 624 [ + - + + ]: 80 : if (!conn || PQstatus(conn) != CONNECTION_OK)
625 [ + - ]: 2 : ereport(ERROR,
626 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
627 : : errmsg("could not connect to server \"%s\"",
628 : : server->servername),
629 : : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
630 : :
97 fujii@postgresql.org 631 :GNC 78 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
632 : : "received message via remote connection");
633 : :
634 : : /* Perform post-connection security checks. */
217 peter@eisentraut.org 635 :CBC 78 : pgfdw_security_check(keywords, values, user, conn);
636 : :
637 : : /* Prepare new session for use */
4630 tgl@sss.pgh.pa.us 638 : 75 : configure_remote_session(conn);
639 : :
1403 fujii@postgresql.org 640 [ + + ]: 75 : if (appname != NULL)
641 : 23 : pfree(appname);
4631 tgl@sss.pgh.pa.us 642 : 75 : pfree(keywords);
643 : 75 : pfree(values);
644 : : }
645 : 9 : PG_CATCH();
646 : : {
1008 andres@anarazel.de 647 : 9 : libpqsrv_disconnect(conn);
4631 tgl@sss.pgh.pa.us 648 : 9 : PG_RE_THROW();
649 : : }
650 [ - + ]: 75 : PG_END_TRY();
651 : :
652 : 75 : return conn;
653 : : }
654 : :
655 : : /*
656 : : * Disconnect any open connection for a connection cache entry.
657 : : */
658 : : static void
3020 659 : 63 : disconnect_pg_server(ConnCacheEntry *entry)
660 : : {
661 [ + - ]: 63 : if (entry->conn != NULL)
662 : : {
1008 andres@anarazel.de 663 : 63 : libpqsrv_disconnect(entry->conn);
3020 tgl@sss.pgh.pa.us 664 : 63 : entry->conn = NULL;
665 : : }
666 : 63 : }
667 : :
668 : : /*
669 : : * Return true if the password_required is defined and false for this user
670 : : * mapping, otherwise false. The mapping has been pre-validated.
671 : : */
672 : : static bool
2138 andrew@dunslane.net 673 : 18 : UserMappingPasswordRequired(UserMapping *user)
674 : : {
675 : : ListCell *cell;
676 : :
677 [ + + + + : 32 : foreach(cell, user->options)
+ + ]
678 : : {
679 : 17 : DefElem *def = (DefElem *) lfirst(cell);
680 : :
681 [ + + ]: 17 : if (strcmp(def->defname, "password_required") == 0)
682 : 3 : return defGetBoolean(def);
683 : : }
684 : :
685 : 15 : return true;
686 : : }
687 : :
688 : : static bool
285 peter@eisentraut.org 689 : 4 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
690 : : {
691 : : ListCell *cell;
692 : :
693 [ + - + - : 16 : foreach(cell, server->options)
+ - ]
694 : : {
695 : 16 : DefElem *def = (DefElem *) lfirst(cell);
696 : :
697 [ + + ]: 16 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
698 : 4 : return defGetBoolean(def);
699 : : }
700 : :
285 peter@eisentraut.org 701 [ # # # # :UBC 0 : foreach(cell, user->options)
# # ]
702 : : {
703 : 0 : DefElem *def = (DefElem *) lfirst(cell);
704 : :
705 [ # # ]: 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
706 : 0 : return defGetBoolean(def);
707 : : }
708 : :
709 : 0 : return false;
710 : : }
711 : :
712 : : /*
713 : : * For non-superusers, insist that the connstr specify a password or that the
714 : : * user provided their own GSSAPI delegated credentials. This
715 : : * prevents a password from being picked up from .pgpass, a service file, the
716 : : * environment, etc. We don't want the postgres user's passwords,
717 : : * certificates, etc to be accessible to non-superusers. (See also
718 : : * dblink_connstr_check in contrib/dblink.)
719 : : */
720 : : static void
2883 rhaas@postgresql.org 721 :CBC 84 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
722 : : {
723 : : int i;
724 : :
725 : : /* no check required if superuser */
726 [ + + ]: 84 : if (superuser_arg(user->userid))
4631 tgl@sss.pgh.pa.us 727 : 69 : return;
728 : :
729 : : #ifdef ENABLE_GSS
730 : : /* ok if the user provided their own delegated credentials */
891 bruce@momjian.us 731 [ + + ]: 15 : if (be_gssapi_get_delegation(MyProcPort))
928 sfrost@snowman.net 732 : 3 : return;
733 : : #endif
734 : :
735 : : /* ok if params contain a non-empty password */
4631 tgl@sss.pgh.pa.us 736 [ + + ]: 79 : for (i = 0; keywords[i] != NULL; i++)
737 : : {
738 [ + + + - ]: 70 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
739 : 3 : return;
740 : : }
741 : :
742 : : /* ok if the superuser explicitly said so at user mapping creation time */
2138 andrew@dunslane.net 743 [ + + ]: 9 : if (!UserMappingPasswordRequired(user))
744 : 1 : return;
745 : :
746 : : /*
747 : : * Ok if SCRAM pass-through is being used and all required scram options
748 : : * are set correctly. If pgfdw_has_required_scram_options returns true we
749 : : * assume that UseScramPassthrough is also true since SCRAM options are
750 : : * only set when UseScramPassthrough is enabled.
751 : : */
80 peter@eisentraut.org 752 [ + - + + : 8 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
+ - ]
217 753 : 4 : return;
754 : :
4631 tgl@sss.pgh.pa.us 755 [ + - ]: 4 : ereport(ERROR,
756 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
757 : : errmsg("password or GSSAPI delegated credentials required"),
758 : : errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
759 : : }
760 : :
761 : : /*
762 : : * Issue SET commands to make sure remote session is configured properly.
763 : : *
764 : : * We do this just once at connection, assuming nothing will change the
765 : : * values later. Since we'll never send volatile function calls to the
766 : : * remote, there shouldn't be any way to break this assumption from our end.
767 : : * It's possible to think of ways to break it at the remote end, eg making
768 : : * a foreign table point to a view that includes a set_config call ---
769 : : * but once you admit the possibility of a malicious view definition,
770 : : * there are any number of ways to break things.
771 : : */
772 : : static void
4630 773 : 75 : configure_remote_session(PGconn *conn)
774 : : {
4613 775 : 75 : int remoteversion = PQserverVersion(conn);
776 : :
777 : : /* Force the search path to contain only pg_catalog (see deparse.c) */
778 : 75 : do_sql_command(conn, "SET search_path = pg_catalog");
779 : :
780 : : /*
781 : : * Set remote timezone; this is basically just cosmetic, since all
782 : : * transmitted and returned timestamptzs should specify a zone explicitly
783 : : * anyway. However it makes the regression test outputs more predictable.
784 : : *
785 : : * We don't risk setting remote zone equal to ours, since the remote
786 : : * server might use a different timezone database. Instead, use GMT
787 : : * (quoted, because very old servers are picky about case). That's
788 : : * guaranteed to work regardless of the remote's timezone database,
789 : : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
790 : : */
554 791 : 75 : do_sql_command(conn, "SET timezone = 'GMT'");
792 : :
793 : : /*
794 : : * Set values needed to ensure unambiguous data output from remote. (This
795 : : * logic should match what pg_dump does. See also set_transmission_modes
796 : : * in postgres_fdw.c.)
797 : : */
4613 798 : 75 : do_sql_command(conn, "SET datestyle = ISO");
799 [ + - ]: 75 : if (remoteversion >= 80400)
800 : 75 : do_sql_command(conn, "SET intervalstyle = postgres");
801 [ + - ]: 75 : if (remoteversion >= 90000)
802 : 75 : do_sql_command(conn, "SET extra_float_digits = 3");
803 : : else
4613 tgl@sss.pgh.pa.us 804 :UBC 0 : do_sql_command(conn, "SET extra_float_digits = 2");
4613 tgl@sss.pgh.pa.us 805 :CBC 75 : }
806 : :
807 : : /*
808 : : * Convenience subroutine to issue a non-data-returning SQL command to remote
809 : : */
810 : : void
811 : 1846 : do_sql_command(PGconn *conn, const char *sql)
812 : : {
1341 efujita@postgresql.o 813 : 1846 : do_sql_command_begin(conn, sql);
814 : 1846 : do_sql_command_end(conn, sql, false);
815 : 1843 : }
816 : :
817 : : static void
818 : 1864 : do_sql_command_begin(PGconn *conn, const char *sql)
819 : : {
3064 rhaas@postgresql.org 820 [ - + ]: 1864 : if (!PQsendQuery(conn, sql))
90 tgl@sss.pgh.pa.us 821 :UNC 0 : pgfdw_report_error(NULL, conn, sql);
1341 efujita@postgresql.o 822 :CBC 1864 : }
823 : :
824 : : static void
825 : 1864 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
826 : : {
827 : : PGresult *res;
828 : :
829 : : /*
830 : : * If requested, consume whatever data is available from the socket. (Note
831 : : * that if all data is available, this allows pgfdw_get_result to call
832 : : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
833 : : * would be large compared to the overhead of PQconsumeInput.)
834 : : */
835 [ + + - + ]: 1864 : if (consume_input && !PQconsumeInput(conn))
90 tgl@sss.pgh.pa.us 836 :UNC 0 : pgfdw_report_error(NULL, conn, sql);
658 noah@leadboat.com 837 :CBC 1864 : res = pgfdw_get_result(conn);
4630 tgl@sss.pgh.pa.us 838 [ + + ]: 1864 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
90 tgl@sss.pgh.pa.us 839 :GNC 3 : pgfdw_report_error(res, conn, sql);
4630 tgl@sss.pgh.pa.us 840 :CBC 1861 : PQclear(res);
841 : 1861 : }
842 : :
843 : : /*
844 : : * Start remote transaction or subtransaction, if needed.
845 : : *
846 : : * Note that we always use at least REPEATABLE READ in the remote session.
847 : : * This is so that, if a query initiates multiple scans of the same or
848 : : * different foreign tables, we will get snapshot-consistent results from
849 : : * those scans. A disadvantage is that we can't provide sane emulation of
850 : : * READ COMMITTED behavior --- it would be nice if we had some other way to
851 : : * control which remote queries share a snapshot.
852 : : */
853 : : static void
4631 854 : 2199 : begin_remote_xact(ConnCacheEntry *entry)
855 : : {
856 : 2199 : int curlevel = GetCurrentTransactionNestLevel();
857 : :
858 : : /* Start main transaction if we haven't yet */
859 [ + + ]: 2199 : if (entry->xact_depth <= 0)
860 : : {
861 : : const char *sql;
862 : :
863 [ - + ]: 754 : elog(DEBUG3, "starting remote transaction on connection %p",
864 : : entry->conn);
865 : :
866 [ - + ]: 754 : if (IsolationIsSerializable())
141 efujita@postgresql.o 867 :UBC 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
868 : : else
141 efujita@postgresql.o 869 :CBC 754 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
3064 rhaas@postgresql.org 870 : 754 : entry->changing_xact_state = true;
141 efujita@postgresql.o 871 : 754 : do_sql_command(entry->conn, sql);
4631 tgl@sss.pgh.pa.us 872 : 753 : entry->xact_depth = 1;
3064 rhaas@postgresql.org 873 : 753 : entry->changing_xact_state = false;
874 : : }
875 : :
876 : : /*
877 : : * If we're in a subtransaction, stack up savepoints to match our level.
878 : : * This ensures we can rollback just the desired effects when a
879 : : * subtransaction aborts.
880 : : */
4631 tgl@sss.pgh.pa.us 881 [ + + ]: 2212 : while (entry->xact_depth < curlevel)
882 : : {
883 : : char sql[64];
884 : :
141 efujita@postgresql.o 885 : 15 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
3064 rhaas@postgresql.org 886 : 15 : entry->changing_xact_state = true;
141 efujita@postgresql.o 887 : 15 : do_sql_command(entry->conn, sql);
4631 tgl@sss.pgh.pa.us 888 : 14 : entry->xact_depth++;
3064 rhaas@postgresql.org 889 : 14 : entry->changing_xact_state = false;
890 : : }
4631 tgl@sss.pgh.pa.us 891 : 2197 : }
892 : :
893 : : /*
894 : : * Release connection reference count created by calling GetConnection.
895 : : */
896 : : void
897 : 2132 : ReleaseConnection(PGconn *conn)
898 : : {
899 : : /*
900 : : * Currently, we don't actually track connection references because all
901 : : * cleanup is managed on a transaction or subtransaction basis instead. So
902 : : * there's nothing to do here.
903 : : */
904 : 2132 : }
905 : :
906 : : /*
907 : : * Assign a "unique" number for a cursor.
908 : : *
909 : : * These really only need to be unique per connection within a transaction.
910 : : * For the moment we ignore the per-connection point and assign them across
911 : : * all connections in the transaction, but we ask for the connection to be
912 : : * supplied in case we want to refine that.
913 : : *
914 : : * Note that even if wraparound happens in a very long transaction, actual
915 : : * collisions are highly improbable; just be sure to use %u not %d to print.
916 : : */
917 : : unsigned int
918 : 543 : GetCursorNumber(PGconn *conn)
919 : : {
920 : 543 : return ++cursor_number;
921 : : }
922 : :
923 : : /*
924 : : * Assign a "unique" number for a prepared statement.
925 : : *
926 : : * This works much like GetCursorNumber, except that we never reset the counter
927 : : * within a session. That's because we can't be 100% sure we've gotten rid
928 : : * of all prepared statements on all connections, and it's not really worth
929 : : * increasing the risk of prepared-statement name collisions by resetting.
930 : : */
931 : : unsigned int
4614 932 : 187 : GetPrepStmtNumber(PGconn *conn)
933 : : {
934 : 187 : return ++prep_stmt_number;
935 : : }
936 : :
937 : : /*
938 : : * Submit a query and wait for the result.
939 : : *
940 : : * Since we don't use non-blocking mode, this can't process interrupts while
941 : : * pushing the query text to the server. That risk is relatively small, so we
942 : : * ignore that for now.
943 : : *
944 : : * Caller is responsible for the error handling on the result.
945 : : */
946 : : PGresult *
1671 efujita@postgresql.o 947 : 4078 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
948 : : {
949 : : /* First, process a pending asynchronous request, if any. */
950 [ + + + + ]: 4078 : if (state && state->pendingAreq)
951 : 4 : process_pending_request(state->pendingAreq);
952 : :
3476 rhaas@postgresql.org 953 [ - + ]: 4078 : if (!PQsendQuery(conn, query))
658 noah@leadboat.com 954 :UBC 0 : return NULL;
658 noah@leadboat.com 955 :CBC 4078 : return pgfdw_get_result(conn);
956 : : }
957 : :
958 : : /*
959 : : * Wrap libpqsrv_get_result_last(), adding wait event.
960 : : *
961 : : * Caller is responsible for the error handling on the result.
962 : : */
963 : : PGresult *
964 : 8242 : pgfdw_get_result(PGconn *conn)
965 : : {
966 : 8242 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
967 : : }
968 : :
969 : : /*
970 : : * Report an error we got from the remote server.
971 : : *
972 : : * Callers should use pgfdw_report_error() to throw an error, or use
973 : : * pgfdw_report() for lesser message levels. (We make this distinction
974 : : * so that pgfdw_report_error() can be marked noreturn.)
975 : : *
976 : : * res: PGresult containing the error (might be NULL)
977 : : * conn: connection we did the query on
978 : : * sql: NULL, or text of remote command we tried to execute
979 : : *
980 : : * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
981 : : * in which case memory context cleanup will clear it eventually).
982 : : *
983 : : * Note: callers that choose not to throw ERROR for a remote error are
984 : : * responsible for making sure that the associated ConnCacheEntry gets
985 : : * marked with have_error = true.
986 : : */
987 : : void
90 tgl@sss.pgh.pa.us 988 :GNC 16 : pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
989 : : {
990 : 16 : pgfdw_report_internal(ERROR, res, conn, sql);
90 tgl@sss.pgh.pa.us 991 :UNC 0 : pg_unreachable();
992 : : }
993 : :
994 : : void
995 : 0 : pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
996 : : {
997 [ # # ]: 0 : Assert(elevel < ERROR); /* use pgfdw_report_error for that */
998 : 0 : pgfdw_report_internal(elevel, res, conn, sql);
999 : 0 : }
1000 : :
1001 : : static void
90 tgl@sss.pgh.pa.us 1002 :GNC 16 : pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
1003 : : const char *sql)
1004 : : {
94 1005 : 16 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
1006 : 16 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
1007 : 16 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
1008 : 16 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
1009 : 16 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
1010 : : int sqlstate;
1011 : :
1012 [ + + ]: 16 : if (diag_sqlstate)
1013 : 14 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1014 : : diag_sqlstate[1],
1015 : : diag_sqlstate[2],
1016 : : diag_sqlstate[3],
1017 : : diag_sqlstate[4]);
1018 : : else
1019 : 2 : sqlstate = ERRCODE_CONNECTION_FAILURE;
1020 : :
1021 : : /*
1022 : : * If we don't get a message from the PGresult, try the PGconn. This is
1023 : : * needed because for connection-level failures, PQgetResult may just
1024 : : * return NULL, not a PGresult at all.
1025 : : */
1026 [ + + ]: 16 : if (message_primary == NULL)
1027 : 2 : message_primary = pchomp(PQerrorMessage(conn));
1028 : :
1029 [ + - + - : 16 : ereport(elevel,
+ - + + +
+ - + +
- ]
1030 : : (errcode(sqlstate),
1031 : : (message_primary != NULL && message_primary[0] != '\0') ?
1032 : : errmsg_internal("%s", message_primary) :
1033 : : errmsg("could not obtain message string for remote error"),
1034 : : message_detail ? errdetail_internal("%s", message_detail) : 0,
1035 : : message_hint ? errhint("%s", message_hint) : 0,
1036 : : message_context ? errcontext("%s", message_context) : 0,
1037 : : sql ? errcontext("remote SQL command: %s", sql) : 0));
94 tgl@sss.pgh.pa.us 1038 :UNC 0 : PQclear(res);
4631 tgl@sss.pgh.pa.us 1039 :UBC 0 : }
1040 : :
1041 : : /*
1042 : : * pgfdw_xact_callback --- cleanup at main-transaction end.
1043 : : *
1044 : : * This runs just late enough that it must not enter user-defined code
1045 : : * locally. (Entering such code on the remote side is fine. Its remote
1046 : : * COMMIT TRANSACTION may run deferred triggers.)
1047 : : */
1048 : : static void
4631 tgl@sss.pgh.pa.us 1049 :CBC 4037 : pgfdw_xact_callback(XactEvent event, void *arg)
1050 : : {
1051 : : HASH_SEQ_STATUS scan;
1052 : : ConnCacheEntry *entry;
1341 efujita@postgresql.o 1053 : 4037 : List *pending_entries = NIL;
935 1054 : 4037 : List *cancel_requested = NIL;
1055 : :
1056 : : /* Quick exit if no connections were touched in this transaction. */
4631 tgl@sss.pgh.pa.us 1057 [ + + ]: 4037 : if (!xact_got_connection)
1058 : 3312 : return;
1059 : :
1060 : : /*
1061 : : * Scan all connection cache entries to find open remote transactions, and
1062 : : * close them.
1063 : : */
1064 : 725 : hash_seq_init(&scan, ConnectionHash);
1065 [ + + ]: 3721 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1066 : : {
1067 : : PGresult *res;
1068 : :
1069 : : /* Ignore cache entry if no open connection right now */
4284 1070 [ + + ]: 2997 : if (entry->conn == NULL)
4631 1071 : 1706 : continue;
1072 : :
1073 : : /* If it has an open remote transaction, try to close it */
4284 1074 [ + + ]: 1291 : if (entry->xact_depth > 0)
1075 : : {
1076 [ - + ]: 754 : elog(DEBUG3, "closing remote transaction on connection %p",
1077 : : entry->conn);
1078 : :
1079 [ + + - + : 754 : switch (event)
- ]
1080 : : {
3833 rhaas@postgresql.org 1081 : 701 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
1082 : : case XACT_EVENT_PRE_COMMIT:
1083 : :
1084 : : /*
1085 : : * If abort cleanup previously failed for this connection,
1086 : : * we can't issue any more commands against it.
1087 : : */
3064 1088 : 701 : pgfdw_reject_incomplete_xact_state_change(entry);
1089 : :
1090 : : /* Commit all remote transactions during pre-commit */
1091 : 701 : entry->changing_xact_state = true;
1341 efujita@postgresql.o 1092 [ + + ]: 701 : if (entry->parallel_commit)
1093 : : {
1094 : 16 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1095 : 16 : pending_entries = lappend(pending_entries, entry);
1096 : 16 : continue;
1097 : : }
4284 tgl@sss.pgh.pa.us 1098 : 685 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
3064 rhaas@postgresql.org 1099 : 685 : entry->changing_xact_state = false;
1100 : :
1101 : : /*
1102 : : * If there were any errors in subtransactions, and we
1103 : : * made prepared statements, do a DEALLOCATE ALL to make
1104 : : * sure we get rid of all prepared statements. This is
1105 : : * annoying and not terribly bulletproof, but it's
1106 : : * probably not worth trying harder.
1107 : : *
1108 : : * DEALLOCATE ALL only exists in 8.3 and later, so this
1109 : : * constrains how old a server postgres_fdw can
1110 : : * communicate with. We intentionally ignore errors in
1111 : : * the DEALLOCATE, so that we can hobble along to some
1112 : : * extent with older servers (leaking prepared statements
1113 : : * as we go; but we don't really support update operations
1114 : : * pre-8.3 anyway).
1115 : : */
4614 tgl@sss.pgh.pa.us 1116 [ + + - + ]: 685 : if (entry->have_prep_stmt && entry->have_error)
1117 : : {
658 noah@leadboat.com 1118 :UBC 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1119 : : NULL);
4614 tgl@sss.pgh.pa.us 1120 : 0 : PQclear(res);
1121 : : }
4614 tgl@sss.pgh.pa.us 1122 :CBC 685 : entry->have_prep_stmt = false;
1123 : 685 : entry->have_error = false;
4284 1124 : 685 : break;
1125 : 1 : case XACT_EVENT_PRE_PREPARE:
1126 : :
1127 : : /*
1128 : : * We disallow any remote transactions, since it's not
1129 : : * very reasonable to hold them open until the prepared
1130 : : * transaction is committed. For the moment, throw error
1131 : : * unconditionally; later we might allow read-only cases.
1132 : : * Note that the error will cause us to come right back
1133 : : * here with event == XACT_EVENT_ABORT, so we'll clean up
1134 : : * the connection state at that point.
1135 : : */
1136 [ + - ]: 1 : ereport(ERROR,
1137 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1138 : : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1139 : : break;
3833 rhaas@postgresql.org 1140 :UBC 0 : case XACT_EVENT_PARALLEL_COMMIT:
1141 : : case XACT_EVENT_COMMIT:
1142 : : case XACT_EVENT_PREPARE:
1143 : : /* Pre-commit should have closed the open transaction */
4284 tgl@sss.pgh.pa.us 1144 [ # # ]: 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1145 : : break;
3833 rhaas@postgresql.org 1146 :CBC 52 : case XACT_EVENT_PARALLEL_ABORT:
1147 : : case XACT_EVENT_ABORT:
1148 : : /* Rollback all remote transactions during abort */
935 efujita@postgresql.o 1149 [ + + ]: 52 : if (entry->parallel_abort)
1150 : : {
1151 [ + - ]: 4 : if (pgfdw_abort_cleanup_begin(entry, true,
1152 : : &pending_entries,
1153 : : &cancel_requested))
1154 : 4 : continue;
1155 : : }
1156 : : else
1157 : 48 : pgfdw_abort_cleanup(entry, true);
4284 tgl@sss.pgh.pa.us 1158 : 48 : break;
1159 : : }
1160 : : }
1161 : :
1162 : : /* Reset state to show we're out of a transaction */
1341 efujita@postgresql.o 1163 : 1270 : pgfdw_reset_xact_state(entry, true);
1164 : : }
1165 : :
1166 : : /* If there are any pending connections, finish cleaning them up */
935 1167 [ + + - + ]: 724 : if (pending_entries || cancel_requested)
1168 : : {
1169 [ + - + + ]: 15 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1170 : : event == XACT_EVENT_PRE_COMMIT)
1171 : : {
1172 [ - + ]: 13 : Assert(cancel_requested == NIL);
1173 : 13 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1174 : : }
1175 : : else
1176 : : {
1177 [ + - - + ]: 2 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1178 : : event == XACT_EVENT_ABORT);
1179 : 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1180 : : true);
1181 : : }
1182 : : }
1183 : :
1184 : : /*
1185 : : * Regardless of the event type, we can now mark ourselves as out of the
1186 : : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1187 : : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1188 : : */
4631 tgl@sss.pgh.pa.us 1189 : 724 : xact_got_connection = false;
1190 : :
1191 : : /* Also reset cursor numbering for next transaction */
1192 : 724 : cursor_number = 0;
1193 : : }
1194 : :
1195 : : /*
1196 : : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1197 : : */
1198 : : static void
1199 : 38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1200 : : SubTransactionId parentSubid, void *arg)
1201 : : {
1202 : : HASH_SEQ_STATUS scan;
1203 : : ConnCacheEntry *entry;
1204 : : int curlevel;
1341 efujita@postgresql.o 1205 : 38 : List *pending_entries = NIL;
935 1206 : 38 : List *cancel_requested = NIL;
1207 : :
1208 : : /* Nothing to do at subxact start, nor after commit. */
4631 tgl@sss.pgh.pa.us 1209 [ + + + + ]: 38 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1210 : : event == SUBXACT_EVENT_ABORT_SUB))
1211 : 23 : return;
1212 : :
1213 : : /* Quick exit if no connections were touched in this transaction. */
1214 [ - + ]: 15 : if (!xact_got_connection)
4631 tgl@sss.pgh.pa.us 1215 :UBC 0 : return;
1216 : :
1217 : : /*
1218 : : * Scan all connection cache entries to find open remote subtransactions
1219 : : * of the current level, and close them.
1220 : : */
4631 tgl@sss.pgh.pa.us 1221 :CBC 15 : curlevel = GetCurrentTransactionNestLevel();
1222 : 15 : hash_seq_init(&scan, ConnectionHash);
1223 [ + + ]: 102 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1224 : : {
1225 : : char sql[100];
1226 : :
1227 : : /*
1228 : : * We only care about connections with open remote subtransactions of
1229 : : * the current level.
1230 : : */
1231 [ + + + + ]: 87 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1232 : 79 : continue;
1233 : :
1234 [ - + ]: 14 : if (entry->xact_depth > curlevel)
4631 tgl@sss.pgh.pa.us 1235 [ # # ]:UBC 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1236 : : entry->xact_depth);
1237 : :
4631 tgl@sss.pgh.pa.us 1238 [ + + ]:CBC 14 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1239 : : {
1240 : : /*
1241 : : * If abort cleanup previously failed for this connection, we
1242 : : * can't issue any more commands against it.
1243 : : */
3064 rhaas@postgresql.org 1244 : 7 : pgfdw_reject_incomplete_xact_state_change(entry);
1245 : :
1246 : : /* Commit all remote subtransactions during pre-commit */
4631 tgl@sss.pgh.pa.us 1247 : 7 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
3064 rhaas@postgresql.org 1248 : 7 : entry->changing_xact_state = true;
1341 efujita@postgresql.o 1249 [ + + ]: 7 : if (entry->parallel_commit)
1250 : : {
1251 : 2 : do_sql_command_begin(entry->conn, sql);
1252 : 2 : pending_entries = lappend(pending_entries, entry);
1253 : 2 : continue;
1254 : : }
4613 tgl@sss.pgh.pa.us 1255 : 5 : do_sql_command(entry->conn, sql);
3064 rhaas@postgresql.org 1256 : 5 : entry->changing_xact_state = false;
1257 : : }
1258 : : else
1259 : : {
1260 : : /* Rollback all remote subtransactions during abort */
935 efujita@postgresql.o 1261 [ + + ]: 7 : if (entry->parallel_abort)
1262 : : {
1263 [ + - ]: 4 : if (pgfdw_abort_cleanup_begin(entry, false,
1264 : : &pending_entries,
1265 : : &cancel_requested))
1266 : 4 : continue;
1267 : : }
1268 : : else
1269 : 3 : pgfdw_abort_cleanup(entry, false);
1270 : : }
1271 : :
1272 : : /* OK, we're outta that level of subtransaction */
1341 1273 : 8 : pgfdw_reset_xact_state(entry, false);
1274 : : }
1275 : :
1276 : : /* If there are any pending connections, finish cleaning them up */
935 1277 [ + + - + ]: 15 : if (pending_entries || cancel_requested)
1278 : : {
1279 [ + + ]: 3 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1280 : : {
1281 [ - + ]: 1 : Assert(cancel_requested == NIL);
1282 : 1 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1283 : : }
1284 : : else
1285 : : {
1286 [ - + ]: 2 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1287 : 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1288 : : false);
1289 : : }
1290 : : }
1291 : : }
1292 : :
1293 : : /*
1294 : : * Connection invalidation callback function
1295 : : *
1296 : : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1297 : : * close connections depending on that entry immediately if current transaction
1298 : : * has not used those connections yet. Otherwise, mark those connections as
1299 : : * invalid and then make pgfdw_xact_callback() close them at the end of current
1300 : : * transaction, since they cannot be closed in the midst of the transaction
1301 : : * using them. Closed connections will be remade at the next opportunity if
1302 : : * necessary.
1303 : : *
1304 : : * Although most cache invalidation callbacks blow away all the related stuff
1305 : : * regardless of the given hashvalue, connections are expensive enough that
1306 : : * it's worth trying to avoid that.
1307 : : *
1308 : : * NB: We could avoid unnecessary disconnection more strictly by examining
1309 : : * individual option values, but it seems too much effort for the gain.
1310 : : */
1311 : : static void
3020 tgl@sss.pgh.pa.us 1312 : 176 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1313 : : {
1314 : : HASH_SEQ_STATUS scan;
1315 : : ConnCacheEntry *entry;
1316 : :
1317 [ + + - + ]: 176 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1318 : :
1319 : : /* ConnectionHash must exist already, if we're registered */
1320 : 176 : hash_seq_init(&scan, ConnectionHash);
1321 [ + + ]: 1186 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1322 : : {
1323 : : /* Ignore invalid entries */
1324 [ + + ]: 1010 : if (entry->conn == NULL)
1325 : 818 : continue;
1326 : :
1327 : : /* hashvalue == 0 means a cache reset, must clear all state */
1328 [ + - + + ]: 192 : if (hashvalue == 0 ||
1329 : 139 : (cacheid == FOREIGNSERVEROID &&
1330 [ + + + + ]: 192 : entry->server_hashvalue == hashvalue) ||
1331 : 53 : (cacheid == USERMAPPINGOID &&
1332 [ + + ]: 53 : entry->mapping_hashvalue == hashvalue))
1333 : : {
1334 : : /*
1335 : : * Close the connection immediately if it's not used yet in this
1336 : : * transaction. Otherwise mark it as invalid so that
1337 : : * pgfdw_xact_callback() can close it at the end of this
1338 : : * transaction.
1339 : : */
1764 fujii@postgresql.org 1340 [ + + ]: 55 : if (entry->xact_depth == 0)
1341 : : {
1342 [ - + ]: 52 : elog(DEBUG3, "discarding connection %p", entry->conn);
1343 : 52 : disconnect_pg_server(entry);
1344 : : }
1345 : : else
1346 : 3 : entry->invalidated = true;
1347 : : }
1348 : : }
3020 tgl@sss.pgh.pa.us 1349 : 176 : }
1350 : :
1351 : : /*
1352 : : * Raise an error if the given connection cache entry is marked as being
1353 : : * in the middle of an xact state change. This should be called at which no
1354 : : * such change is expected to be in progress; if one is found to be in
1355 : : * progress, it means that we aborted in the middle of a previous state change
1356 : : * and now don't know what the remote transaction state actually is.
1357 : : * Such connections can't safely be further used. Re-establishing the
1358 : : * connection would change the snapshot and roll back any writes already
1359 : : * performed, so that's not an option, either. Thus, we must abort.
1360 : : */
1361 : : static void
3064 rhaas@postgresql.org 1362 : 2915 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1363 : : {
1364 : : ForeignServer *server;
1365 : :
1366 : : /* nothing to do for inactive entries and entries of sane state */
3020 tgl@sss.pgh.pa.us 1367 [ + + + - ]: 2915 : if (entry->conn == NULL || !entry->changing_xact_state)
3064 rhaas@postgresql.org 1368 : 2915 : return;
1369 : :
1370 : : /* make sure this entry is inactive */
3020 tgl@sss.pgh.pa.us 1371 :UBC 0 : disconnect_pg_server(entry);
1372 : :
1373 : : /* find server name to be shown in the message below */
1746 fujii@postgresql.org 1374 : 0 : server = GetForeignServer(entry->serverid);
1375 : :
3064 rhaas@postgresql.org 1376 [ # # ]: 0 : ereport(ERROR,
1377 : : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1378 : : errmsg("connection to server \"%s\" was lost",
1379 : : server->servername)));
1380 : : }
1381 : :
1382 : : /*
1383 : : * Reset state to show we're out of a (sub)transaction.
1384 : : */
1385 : : static void
1341 efujita@postgresql.o 1386 :CBC 1304 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1387 : : {
1388 [ + + ]: 1304 : if (toplevel)
1389 : : {
1390 : : /* Reset state to show we're out of a transaction */
1391 : 1290 : entry->xact_depth = 0;
1392 : :
1393 : : /*
1394 : : * If the connection isn't in a good idle state, it is marked as
1395 : : * invalid or keep_connections option of its server is disabled, then
1396 : : * discard it to recover. Next GetConnection will open a new
1397 : : * connection.
1398 : : */
1399 [ + + + - ]: 2579 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1400 : 1289 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1401 [ + - ]: 1289 : entry->changing_xact_state ||
1402 [ + + ]: 1289 : entry->invalidated ||
1403 [ + + ]: 1287 : !entry->keep_connections)
1404 : : {
1405 [ - + ]: 4 : elog(DEBUG3, "discarding connection %p", entry->conn);
1406 : 4 : disconnect_pg_server(entry);
1407 : : }
1408 : : }
1409 : : else
1410 : : {
1411 : : /* Reset state to show we're out of a subtransaction */
1412 : 14 : entry->xact_depth--;
1413 : : }
1414 : 1304 : }
1415 : :
1416 : : /*
1417 : : * Cancel the currently-in-progress query (whose query text we do not have)
1418 : : * and ignore the result. Returns true if we successfully cancel the query
1419 : : * and discard any pending result, and false if not.
1420 : : *
1421 : : * It's not a huge problem if we throw an ERROR here, but if we get into error
1422 : : * recursion trouble, we'll end up slamming the connection shut, which will
1423 : : * necessitate failing the entire toplevel transaction even if subtransactions
1424 : : * were used. Try to use WARNING where we can.
1425 : : *
1426 : : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1427 : : * query text from the pendingAreq saved in the per-connection state, then
1428 : : * report the query using it.
1429 : : */
1430 : : static bool
3064 rhaas@postgresql.org 1431 : 1 : pgfdw_cancel_query(PGconn *conn)
1432 : : {
308 tgl@sss.pgh.pa.us 1433 : 1 : TimestampTz now = GetCurrentTimestamp();
1434 : : TimestampTz endtime;
1435 : : TimestampTz retrycanceltime;
1436 : :
1437 : : /*
1438 : : * If it takes too long to cancel the query and discard the result, assume
1439 : : * the connection is dead.
1440 : : */
1441 : 1 : endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
1442 : :
1443 : : /*
1444 : : * Also, lose patience and re-issue the cancel request after a little bit.
1445 : : * (This serves to close some race conditions.)
1446 : : */
1447 : 1 : retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
1448 : :
578 alvherre@alvh.no-ip. 1449 [ - + ]: 1 : if (!pgfdw_cancel_query_begin(conn, endtime))
935 efujita@postgresql.o 1450 :UBC 0 : return false;
308 tgl@sss.pgh.pa.us 1451 :CBC 1 : return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
1452 : : }
1453 : :
1454 : : /*
1455 : : * Submit a cancel request to the given connection, waiting only until
1456 : : * the given time.
1457 : : *
1458 : : * We sleep interruptibly until we receive confirmation that the cancel
1459 : : * request has been accepted, and if it is, return true; if the timeout
1460 : : * lapses without that, or the request fails for whatever reason, return
1461 : : * false.
1462 : : */
1463 : : static bool
578 alvherre@alvh.no-ip. 1464 : 1 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1465 : : {
570 1466 : 1 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1467 : :
578 1468 [ - + ]: 1 : if (errormsg != NULL)
578 alvherre@alvh.no-ip. 1469 [ # # ]:UBC 0 : ereport(WARNING,
1470 : : errcode(ERRCODE_CONNECTION_FAILURE),
1471 : : errmsg("could not send cancel request: %s", errormsg));
1472 : :
578 alvherre@alvh.no-ip. 1473 :CBC 1 : return errormsg == NULL;
1474 : : }
1475 : :
1476 : : static bool
308 tgl@sss.pgh.pa.us 1477 : 1 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
1478 : : TimestampTz retrycanceltime, bool consume_input)
1479 : : {
1480 : : PGresult *result;
1481 : : bool timed_out;
1482 : :
1483 : : /*
1484 : : * If requested, consume whatever data is available from the socket. (Note
1485 : : * that if all data is available, this allows pgfdw_get_cleanup_result to
1486 : : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1487 : : * which would be large compared to the overhead of PQconsumeInput.)
1488 : : */
935 efujita@postgresql.o 1489 [ - + - - ]: 1 : if (consume_input && !PQconsumeInput(conn))
1490 : : {
935 efujita@postgresql.o 1491 [ # # ]:UBC 0 : ereport(WARNING,
1492 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1493 : : errmsg("could not get result of cancel request: %s",
1494 : : pchomp(PQerrorMessage(conn)))));
1495 : 0 : return false;
1496 : : }
1497 : :
1498 : : /* Get and discard the result of the query. */
308 tgl@sss.pgh.pa.us 1499 [ - + ]:CBC 1 : if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
1500 : : &result, &timed_out))
1501 : : {
1419 fujii@postgresql.org 1502 [ # # ]:UBC 0 : if (timed_out)
1503 [ # # ]: 0 : ereport(WARNING,
1504 : : (errmsg("could not get result of cancel request due to timeout")));
1505 : : else
1506 [ # # ]: 0 : ereport(WARNING,
1507 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1508 : : errmsg("could not get result of cancel request: %s",
1509 : : pchomp(PQerrorMessage(conn)))));
1510 : :
3064 rhaas@postgresql.org 1511 : 0 : return false;
1512 : : }
3064 rhaas@postgresql.org 1513 :CBC 1 : PQclear(result);
1514 : :
1515 : 1 : return true;
1516 : : }
1517 : :
1518 : : /*
1519 : : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1520 : : * result. If the query is executed without error, the return value is true.
1521 : : * If the query is executed successfully but returns an error, the return
1522 : : * value is true if and only if ignore_errors is set. If the query can't be
1523 : : * sent or times out, the return value is false.
1524 : : *
1525 : : * It's not a huge problem if we throw an ERROR here, but if we get into error
1526 : : * recursion trouble, we'll end up slamming the connection shut, which will
1527 : : * necessitate failing the entire toplevel transaction even if subtransactions
1528 : : * were used. Try to use WARNING where we can.
1529 : : */
1530 : : static bool
1531 : 75 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1532 : : {
1533 : : TimestampTz endtime;
1534 : :
1535 : : /*
1536 : : * If it takes too long to execute a cleanup query, assume the connection
1537 : : * is dead. It's fairly likely that this is why we aborted in the first
1538 : : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1539 : : * be too long.
1540 : : */
935 efujita@postgresql.o 1541 : 75 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1542 : : CONNECTION_CLEANUP_TIMEOUT);
1543 : :
1544 [ - + ]: 75 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
935 efujita@postgresql.o 1545 :UBC 0 : return false;
935 efujita@postgresql.o 1546 :CBC 75 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1547 : : false, ignore_errors);
1548 : : }
1549 : :
1550 : : static bool
1551 : 87 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1552 : : {
571 1553 [ - + ]: 87 : Assert(query != NULL);
1554 : :
1555 : : /*
1556 : : * Submit a query. Since we don't use non-blocking mode, this also can
1557 : : * block. But its risk is relatively small, so we ignore that for now.
1558 : : */
3064 rhaas@postgresql.org 1559 [ - + ]: 87 : if (!PQsendQuery(conn, query))
1560 : : {
90 tgl@sss.pgh.pa.us 1561 :UNC 0 : pgfdw_report(WARNING, NULL, conn, query);
3064 rhaas@postgresql.org 1562 :UBC 0 : return false;
1563 : : }
1564 : :
935 efujita@postgresql.o 1565 :CBC 87 : return true;
1566 : : }
1567 : :
1568 : : static bool
1569 : 87 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1570 : : TimestampTz endtime, bool consume_input,
1571 : : bool ignore_errors)
1572 : : {
1573 : : PGresult *result;
1574 : : bool timed_out;
1575 : :
571 1576 [ - + ]: 87 : Assert(query != NULL);
1577 : :
1578 : : /*
1579 : : * If requested, consume whatever data is available from the socket. (Note
1580 : : * that if all data is available, this allows pgfdw_get_cleanup_result to
1581 : : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1582 : : * which would be large compared to the overhead of PQconsumeInput.)
1583 : : */
935 1584 [ + + - + ]: 87 : if (consume_input && !PQconsumeInput(conn))
1585 : : {
90 tgl@sss.pgh.pa.us 1586 :UNC 0 : pgfdw_report(WARNING, NULL, conn, query);
935 efujita@postgresql.o 1587 :UBC 0 : return false;
1588 : : }
1589 : :
1590 : : /* Get the result of the query. */
308 tgl@sss.pgh.pa.us 1591 [ - + ]:CBC 87 : if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
1592 : : {
1419 fujii@postgresql.org 1593 [ # # ]:UBC 0 : if (timed_out)
1594 [ # # ]: 0 : ereport(WARNING,
1595 : : (errmsg("could not get query result due to timeout"),
1596 : : errcontext("remote SQL command: %s", query)));
1597 : : else
90 tgl@sss.pgh.pa.us 1598 :UNC 0 : pgfdw_report(WARNING, NULL, conn, query);
1599 : :
3064 rhaas@postgresql.org 1600 :UBC 0 : return false;
1601 : : }
1602 : :
1603 : : /* Issue a warning if not successful. */
3064 rhaas@postgresql.org 1604 [ - + ]:CBC 87 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1605 : : {
90 tgl@sss.pgh.pa.us 1606 :UNC 0 : pgfdw_report(WARNING, result, conn, query);
3064 rhaas@postgresql.org 1607 :UBC 0 : return ignore_errors;
1608 : : }
3056 tgl@sss.pgh.pa.us 1609 :CBC 87 : PQclear(result);
1610 : :
3064 rhaas@postgresql.org 1611 : 87 : return true;
1612 : : }
1613 : :
1614 : : /*
1615 : : * Get, during abort cleanup, the result of a query that is in progress.
1616 : : * This might be a query that is being interrupted by a cancel request or by
1617 : : * transaction abort, or it might be a query that was initiated as part of
1618 : : * transaction abort to get the remote side back to the appropriate state.
1619 : : *
1620 : : * endtime is the time at which we should give up and assume the remote side
1621 : : * is dead. retrycanceltime is the time at which we should issue a fresh
1622 : : * cancel request (pass the same value as endtime if this is not wanted).
1623 : : *
1624 : : * Returns true if the timeout expired or connection trouble occurred,
1625 : : * false otherwise. Sets *result except in case of a true result.
1626 : : * Sets *timed_out to true only when the timeout expired.
1627 : : */
1628 : : static bool
308 tgl@sss.pgh.pa.us 1629 : 88 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
1630 : : TimestampTz retrycanceltime,
1631 : : PGresult **result,
1632 : : bool *timed_out)
1633 : : {
94 tgl@sss.pgh.pa.us 1634 :GNC 88 : bool failed = false;
1635 : 88 : PGresult *last_res = NULL;
1636 : 88 : int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
1637 : :
308 tgl@sss.pgh.pa.us 1638 :CBC 88 : *result = NULL;
1419 fujii@postgresql.org 1639 : 88 : *timed_out = false;
1640 : : for (;;)
94 tgl@sss.pgh.pa.us 1641 :GIC 95 : {
1642 : : PGresult *res;
1643 : :
94 tgl@sss.pgh.pa.us 1644 [ + + ]:GNC 260 : while (PQisBusy(conn))
3064 rhaas@postgresql.org 1645 :ECB (95) : {
1646 : : int wc;
94 tgl@sss.pgh.pa.us 1647 :GNC 77 : TimestampTz now = GetCurrentTimestamp();
1648 : : long cur_timeout;
1649 : :
1650 : : /* If timeout has expired, give up. */
1651 [ - + ]: 77 : if (now >= endtime)
1652 : : {
94 tgl@sss.pgh.pa.us 1653 :UNC 0 : *timed_out = true;
1654 : 0 : failed = true;
1655 : 0 : goto exit;
1656 : : }
1657 : :
1658 : : /* If we need to re-issue the cancel request, do that. */
94 tgl@sss.pgh.pa.us 1659 [ - + ]:GNC 77 : if (now >= retrycanceltime)
1660 : : {
1661 : : /* We ignore failure to issue the repeated request. */
94 tgl@sss.pgh.pa.us 1662 :UNC 0 : (void) libpqsrv_cancel(conn, endtime);
1663 : :
1664 : : /* Recompute "now" in case that took measurable time. */
1665 : 0 : now = GetCurrentTimestamp();
1666 : :
1667 : : /* Adjust re-cancel timeout in increasing steps. */
1668 : 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
1669 : : canceldelta);
1670 : 0 : canceldelta += canceldelta;
1671 : : }
1672 : :
1673 : : /* If timeout has expired, give up, else get sleep time. */
94 tgl@sss.pgh.pa.us 1674 :GNC 77 : cur_timeout = TimestampDifferenceMilliseconds(now,
1675 : : Min(endtime,
1676 : : retrycanceltime));
1677 [ - + ]: 77 : if (cur_timeout <= 0)
1678 : : {
94 tgl@sss.pgh.pa.us 1679 :UNC 0 : *timed_out = true;
1680 : 0 : failed = true;
1681 : 0 : goto exit;
1682 : : }
1683 : :
1684 : : /* first time, allocate or get the custom wait event */
94 tgl@sss.pgh.pa.us 1685 [ + + ]:GNC 77 : if (pgfdw_we_cleanup_result == 0)
1686 : 2 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1687 : :
1688 : : /* Sleep until there's something to do */
1689 : 77 : wc = WaitLatchOrSocket(MyLatch,
1690 : : WL_LATCH_SET | WL_SOCKET_READABLE |
1691 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1692 : : PQsocket(conn),
1693 : : cur_timeout, pgfdw_we_cleanup_result);
1694 : 77 : ResetLatch(MyLatch);
1695 : :
1696 [ - + ]: 77 : CHECK_FOR_INTERRUPTS();
1697 : :
1698 : : /* Data available in socket? */
1699 [ + - ]: 77 : if (wc & WL_SOCKET_READABLE)
1700 : : {
1701 [ - + ]: 77 : if (!PQconsumeInput(conn))
1702 : : {
1703 : : /* connection trouble */
94 tgl@sss.pgh.pa.us 1704 :UNC 0 : failed = true;
1705 : 0 : goto exit;
1706 : : }
1707 : : }
1708 : : }
1709 : :
94 tgl@sss.pgh.pa.us 1710 :GNC 183 : res = PQgetResult(conn);
1711 [ + + ]: 183 : if (res == NULL)
1712 : 88 : break; /* query is complete */
1713 : :
94 tgl@sss.pgh.pa.us 1714 :GBC 95 : PQclear(last_res);
94 tgl@sss.pgh.pa.us 1715 :GNC 95 : last_res = res;
1716 : : }
1717 : 88 : exit:
1419 fujii@postgresql.org 1718 [ - + ]:CBC 88 : if (failed)
3056 tgl@sss.pgh.pa.us 1719 :UBC 0 : PQclear(last_res);
1720 : : else
3056 tgl@sss.pgh.pa.us 1721 :CBC 88 : *result = last_res;
1419 fujii@postgresql.org 1722 : 88 : return failed;
1723 : : }
1724 : :
1725 : : /*
1726 : : * Abort remote transaction or subtransaction.
1727 : : *
1728 : : * "toplevel" should be set to true if toplevel (main) transaction is
1729 : : * rollbacked, false otherwise.
1730 : : *
1731 : : * Set entry->changing_xact_state to false on success, true on failure.
1732 : : */
1733 : : static void
1312 efujita@postgresql.o 1734 : 51 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1735 : : {
1736 : : char sql[100];
1737 : :
1738 : : /*
1739 : : * Don't try to clean up the connection if we're already in error
1740 : : * recursion trouble.
1741 : : */
1496 fujii@postgresql.org 1742 [ - + ]: 51 : if (in_error_recursion_trouble())
1496 fujii@postgresql.org 1743 :UBC 0 : entry->changing_xact_state = true;
1744 : :
1745 : : /*
1746 : : * If connection is already unsalvageable, don't touch it further.
1747 : : */
1496 fujii@postgresql.org 1748 [ + + ]:CBC 51 : if (entry->changing_xact_state)
1749 : 1 : return;
1750 : :
1751 : : /*
1752 : : * Mark this connection as in the process of changing transaction state.
1753 : : */
1754 : 50 : entry->changing_xact_state = true;
1755 : :
1756 : : /* Assume we might have lost track of prepared statements */
1757 : 50 : entry->have_error = true;
1758 : :
1759 : : /*
1760 : : * If a command has been submitted to the remote server by using an
1761 : : * asynchronous execution function, the command might not have yet
1762 : : * completed. Check to see if a command is still being processed by the
1763 : : * remote server, and if so, request cancellation of the command.
1764 : : */
1765 [ + + ]: 50 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1766 [ - + ]: 1 : !pgfdw_cancel_query(entry->conn))
1496 fujii@postgresql.org 1767 :UBC 0 : return; /* Unable to cancel running query */
1768 : :
935 efujita@postgresql.o 1769 [ + + ]:CBC 50 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1496 fujii@postgresql.org 1770 [ - + ]: 50 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1312 efujita@postgresql.o 1771 :UBC 0 : return; /* Unable to abort remote (sub)transaction */
1772 : :
1496 fujii@postgresql.org 1773 [ + + ]:CBC 50 : if (toplevel)
1774 : : {
1775 [ + + + - ]: 47 : if (entry->have_prep_stmt && entry->have_error &&
1776 [ - + ]: 25 : !pgfdw_exec_cleanup_query(entry->conn,
1777 : : "DEALLOCATE ALL",
1778 : : true))
1496 fujii@postgresql.org 1779 :UBC 0 : return; /* Trouble clearing prepared statements */
1780 : :
1496 fujii@postgresql.org 1781 :CBC 47 : entry->have_prep_stmt = false;
1782 : 47 : entry->have_error = false;
1783 : : }
1784 : :
1785 : : /*
1786 : : * If pendingAreq of the per-connection state is not NULL, it means that
1787 : : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1788 : : * successfully and thus the per-connection state was not reset in
1789 : : * fetch_more_data(); in that case reset the per-connection state here.
1790 : : */
1375 efujita@postgresql.o 1791 [ - + ]: 50 : if (entry->state.pendingAreq)
1375 efujita@postgresql.o 1792 :UBC 0 : memset(&entry->state, 0, sizeof(entry->state));
1793 : :
1794 : : /* Disarm changing_xact_state if it all worked */
1496 fujii@postgresql.org 1795 :CBC 50 : entry->changing_xact_state = false;
1796 : : }
1797 : :
1798 : : /*
1799 : : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1800 : : * don't wait for the result.
1801 : : *
1802 : : * Returns true if the abort command or cancel request is successfully issued,
1803 : : * false otherwise. If the abort command is successfully issued, the given
1804 : : * connection cache entry is appended to *pending_entries. Otherwise, if the
1805 : : * cancel request is successfully issued, it is appended to *cancel_requested.
1806 : : */
1807 : : static bool
935 efujita@postgresql.o 1808 : 8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1809 : : List **pending_entries, List **cancel_requested)
1810 : : {
1811 : : /*
1812 : : * Don't try to clean up the connection if we're already in error
1813 : : * recursion trouble.
1814 : : */
1815 [ - + ]: 8 : if (in_error_recursion_trouble())
935 efujita@postgresql.o 1816 :UBC 0 : entry->changing_xact_state = true;
1817 : :
1818 : : /*
1819 : : * If connection is already unsalvageable, don't touch it further.
1820 : : */
935 efujita@postgresql.o 1821 [ - + ]:CBC 8 : if (entry->changing_xact_state)
935 efujita@postgresql.o 1822 :UBC 0 : return false;
1823 : :
1824 : : /*
1825 : : * Mark this connection as in the process of changing transaction state.
1826 : : */
935 efujita@postgresql.o 1827 :CBC 8 : entry->changing_xact_state = true;
1828 : :
1829 : : /* Assume we might have lost track of prepared statements */
1830 : 8 : entry->have_error = true;
1831 : :
1832 : : /*
1833 : : * If a command has been submitted to the remote server by using an
1834 : : * asynchronous execution function, the command might not have yet
1835 : : * completed. Check to see if a command is still being processed by the
1836 : : * remote server, and if so, request cancellation of the command.
1837 : : */
1838 [ - + ]: 8 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1839 : : {
1840 : : TimestampTz endtime;
1841 : :
578 alvherre@alvh.no-ip. 1842 :UBC 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1843 : : CONNECTION_CLEANUP_TIMEOUT);
1844 [ # # ]: 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
935 efujita@postgresql.o 1845 : 0 : return false; /* Unable to cancel running query */
1846 : 0 : *cancel_requested = lappend(*cancel_requested, entry);
1847 : : }
1848 : : else
1849 : : {
1850 : : char sql[100];
1851 : :
935 efujita@postgresql.o 1852 [ + + ]:CBC 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1853 [ - + ]: 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
935 efujita@postgresql.o 1854 :UBC 0 : return false; /* Unable to abort remote transaction */
935 efujita@postgresql.o 1855 :CBC 8 : *pending_entries = lappend(*pending_entries, entry);
1856 : : }
1857 : :
1858 : 8 : return true;
1859 : : }
1860 : :
1861 : : /*
1862 : : * Finish pre-commit cleanup of connections on each of which we've sent a
1863 : : * COMMIT command to the remote server.
1864 : : */
1865 : : static void
1341 1866 : 13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1867 : : {
1868 : : ConnCacheEntry *entry;
1869 : 13 : List *pending_deallocs = NIL;
1870 : : ListCell *lc;
1871 : :
1872 [ - + ]: 13 : Assert(pending_entries);
1873 : :
1874 : : /*
1875 : : * Get the result of the COMMIT command for each of the pending entries
1876 : : */
1877 [ + - + + : 29 : foreach(lc, pending_entries)
+ + ]
1878 : : {
1879 : 16 : entry = (ConnCacheEntry *) lfirst(lc);
1880 : :
1881 [ - + ]: 16 : Assert(entry->changing_xact_state);
1882 : :
1883 : : /*
1884 : : * We might already have received the result on the socket, so pass
1885 : : * consume_input=true to try to consume it first
1886 : : */
1887 : 16 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1888 : 16 : entry->changing_xact_state = false;
1889 : :
1890 : : /* Do a DEALLOCATE ALL in parallel if needed */
1891 [ + + + + ]: 16 : if (entry->have_prep_stmt && entry->have_error)
1892 : : {
1893 : : /* Ignore errors (see notes in pgfdw_xact_callback) */
1894 [ + - ]: 2 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1895 : : {
1896 : 2 : pending_deallocs = lappend(pending_deallocs, entry);
1897 : 2 : continue;
1898 : : }
1899 : : }
1900 : 14 : entry->have_prep_stmt = false;
1901 : 14 : entry->have_error = false;
1902 : :
1903 : 14 : pgfdw_reset_xact_state(entry, true);
1904 : : }
1905 : :
1906 : : /* No further work if no pending entries */
1907 [ + + ]: 13 : if (!pending_deallocs)
1908 : 12 : return;
1909 : :
1910 : : /*
1911 : : * Get the result of the DEALLOCATE command for each of the pending
1912 : : * entries
1913 : : */
1914 [ + - + + : 3 : foreach(lc, pending_deallocs)
+ + ]
1915 : : {
1916 : : PGresult *res;
1917 : :
1918 : 2 : entry = (ConnCacheEntry *) lfirst(lc);
1919 : :
1920 : : /* Ignore errors (see notes in pgfdw_xact_callback) */
1921 [ + + ]: 4 : while ((res = PQgetResult(entry->conn)) != NULL)
1922 : : {
1923 : 2 : PQclear(res);
1924 : : /* Stop if the connection is lost (else we'll loop infinitely) */
1925 [ - + ]: 2 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1341 efujita@postgresql.o 1926 :UBC 0 : break;
1927 : : }
1341 efujita@postgresql.o 1928 :CBC 2 : entry->have_prep_stmt = false;
1929 : 2 : entry->have_error = false;
1930 : :
1931 : 2 : pgfdw_reset_xact_state(entry, true);
1932 : : }
1933 : : }
1934 : :
1935 : : /*
1936 : : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1937 : : * RELEASE command to the remote server.
1938 : : */
1939 : : static void
1940 : 1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1941 : : {
1942 : : ConnCacheEntry *entry;
1943 : : char sql[100];
1944 : : ListCell *lc;
1945 : :
1946 [ - + ]: 1 : Assert(pending_entries);
1947 : :
1948 : : /*
1949 : : * Get the result of the RELEASE command for each of the pending entries
1950 : : */
1951 : 1 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1952 [ + - + + : 3 : foreach(lc, pending_entries)
+ + ]
1953 : : {
1954 : 2 : entry = (ConnCacheEntry *) lfirst(lc);
1955 : :
1956 [ - + ]: 2 : Assert(entry->changing_xact_state);
1957 : :
1958 : : /*
1959 : : * We might already have received the result on the socket, so pass
1960 : : * consume_input=true to try to consume it first
1961 : : */
1962 : 2 : do_sql_command_end(entry->conn, sql, true);
1963 : 2 : entry->changing_xact_state = false;
1964 : :
1965 : 2 : pgfdw_reset_xact_state(entry, false);
1966 : : }
1967 : 1 : }
1968 : :
1969 : : /*
1970 : : * Finish abort cleanup of connections on each of which we've sent an abort
1971 : : * command or cancel request to the remote server.
1972 : : */
1973 : : static void
935 1974 : 4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1975 : : bool toplevel)
1976 : : {
1977 : 4 : List *pending_deallocs = NIL;
1978 : : ListCell *lc;
1979 : :
1980 : : /*
1981 : : * For each of the pending cancel requests (if any), get and discard the
1982 : : * result of the query, and submit an abort command to the remote server.
1983 : : */
1984 [ - + ]: 4 : if (cancel_requested)
1985 : : {
935 efujita@postgresql.o 1986 [ # # # # :UBC 0 : foreach(lc, cancel_requested)
# # ]
1987 : : {
1988 : 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
308 tgl@sss.pgh.pa.us 1989 : 0 : TimestampTz now = GetCurrentTimestamp();
1990 : : TimestampTz endtime;
1991 : : TimestampTz retrycanceltime;
1992 : : char sql[100];
1993 : :
935 efujita@postgresql.o 1994 [ # # ]: 0 : Assert(entry->changing_xact_state);
1995 : :
1996 : : /*
1997 : : * Set end time. You might think we should do this before issuing
1998 : : * cancel request like in normal mode, but that is problematic,
1999 : : * because if, for example, it took longer than 30 seconds to
2000 : : * process the first few entries in the cancel_requested list, it
2001 : : * would cause a timeout error when processing each of the
2002 : : * remaining entries in the list, leading to slamming that entry's
2003 : : * connection shut.
2004 : : */
308 tgl@sss.pgh.pa.us 2005 : 0 : endtime = TimestampTzPlusMilliseconds(now,
2006 : : CONNECTION_CLEANUP_TIMEOUT);
2007 : 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
2008 : : RETRY_CANCEL_TIMEOUT);
2009 : :
2010 [ # # ]: 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime,
2011 : : retrycanceltime, true))
2012 : : {
2013 : : /* Unable to cancel running query */
935 efujita@postgresql.o 2014 : 0 : pgfdw_reset_xact_state(entry, toplevel);
2015 : 0 : continue;
2016 : : }
2017 : :
2018 : : /* Send an abort command in parallel if needed */
2019 [ # # ]: 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2020 [ # # ]: 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
2021 : : {
2022 : : /* Unable to abort remote (sub)transaction */
2023 : 0 : pgfdw_reset_xact_state(entry, toplevel);
2024 : : }
2025 : : else
2026 : 0 : pending_entries = lappend(pending_entries, entry);
2027 : : }
2028 : : }
2029 : :
2030 : : /* No further work if no pending entries */
935 efujita@postgresql.o 2031 [ - + ]:CBC 4 : if (!pending_entries)
935 efujita@postgresql.o 2032 :UBC 0 : return;
2033 : :
2034 : : /*
2035 : : * Get the result of the abort command for each of the pending entries
2036 : : */
935 efujita@postgresql.o 2037 [ + - + + :CBC 12 : foreach(lc, pending_entries)
+ + ]
2038 : : {
2039 : 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2040 : : TimestampTz endtime;
2041 : : char sql[100];
2042 : :
2043 [ - + ]: 8 : Assert(entry->changing_xact_state);
2044 : :
2045 : : /*
2046 : : * Set end time. We do this now, not before issuing the command like
2047 : : * in normal mode, for the same reason as for the cancel_requested
2048 : : * entries.
2049 : : */
2050 : 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2051 : : CONNECTION_CLEANUP_TIMEOUT);
2052 : :
2053 [ + + ]: 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2054 [ - + ]: 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2055 : : true, false))
2056 : : {
2057 : : /* Unable to abort remote (sub)transaction */
935 efujita@postgresql.o 2058 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
935 efujita@postgresql.o 2059 :CBC 4 : continue;
2060 : : }
2061 : :
2062 [ + + ]: 8 : if (toplevel)
2063 : : {
2064 : : /* Do a DEALLOCATE ALL in parallel if needed */
2065 [ + - + - ]: 4 : if (entry->have_prep_stmt && entry->have_error)
2066 : : {
2067 [ - + ]: 4 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
2068 : : "DEALLOCATE ALL"))
2069 : : {
2070 : : /* Trouble clearing prepared statements */
935 efujita@postgresql.o 2071 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
2072 : : }
2073 : : else
935 efujita@postgresql.o 2074 :CBC 4 : pending_deallocs = lappend(pending_deallocs, entry);
2075 : 4 : continue;
2076 : : }
935 efujita@postgresql.o 2077 :UBC 0 : entry->have_prep_stmt = false;
2078 : 0 : entry->have_error = false;
2079 : : }
2080 : :
2081 : : /* Reset the per-connection state if needed */
935 efujita@postgresql.o 2082 [ - + ]:CBC 4 : if (entry->state.pendingAreq)
935 efujita@postgresql.o 2083 :UBC 0 : memset(&entry->state, 0, sizeof(entry->state));
2084 : :
2085 : : /* We're done with this entry; unset the changing_xact_state flag */
935 efujita@postgresql.o 2086 :CBC 4 : entry->changing_xact_state = false;
2087 : 4 : pgfdw_reset_xact_state(entry, toplevel);
2088 : : }
2089 : :
2090 : : /* No further work if no pending entries */
2091 [ + + ]: 4 : if (!pending_deallocs)
2092 : 2 : return;
2093 [ - + ]: 2 : Assert(toplevel);
2094 : :
2095 : : /*
2096 : : * Get the result of the DEALLOCATE command for each of the pending
2097 : : * entries
2098 : : */
2099 [ + - + + : 6 : foreach(lc, pending_deallocs)
+ + ]
2100 : : {
2101 : 4 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2102 : : TimestampTz endtime;
2103 : :
2104 [ - + ]: 4 : Assert(entry->changing_xact_state);
2105 [ - + ]: 4 : Assert(entry->have_prep_stmt);
2106 [ - + ]: 4 : Assert(entry->have_error);
2107 : :
2108 : : /*
2109 : : * Set end time. We do this now, not before issuing the command like
2110 : : * in normal mode, for the same reason as for the cancel_requested
2111 : : * entries.
2112 : : */
2113 : 4 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2114 : : CONNECTION_CLEANUP_TIMEOUT);
2115 : :
2116 [ - + ]: 4 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2117 : : endtime, true, true))
2118 : : {
2119 : : /* Trouble clearing prepared statements */
935 efujita@postgresql.o 2120 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
2121 : 0 : continue;
2122 : : }
935 efujita@postgresql.o 2123 :CBC 4 : entry->have_prep_stmt = false;
2124 : 4 : entry->have_error = false;
2125 : :
2126 : : /* Reset the per-connection state if needed */
2127 [ - + ]: 4 : if (entry->state.pendingAreq)
935 efujita@postgresql.o 2128 :UBC 0 : memset(&entry->state, 0, sizeof(entry->state));
2129 : :
2130 : : /* We're done with this entry; unset the changing_xact_state flag */
935 efujita@postgresql.o 2131 :CBC 4 : entry->changing_xact_state = false;
2132 : 4 : pgfdw_reset_xact_state(entry, toplevel);
2133 : : }
2134 : : }
2135 : :
2136 : : /* Number of output arguments (columns) for various API versions */
2137 : : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2138 : : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2139 : : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 6 /* maximum of above */
2140 : :
2141 : : /*
2142 : : * Internal function used by postgres_fdw_get_connections variants.
2143 : : *
2144 : : * For API version 1.1, this function takes no input parameter and
2145 : : * returns a set of records with the following values:
2146 : : *
2147 : : * - server_name - server name of active connection. In case the foreign server
2148 : : * is dropped but still the connection is active, then the server name will
2149 : : * be NULL in output.
2150 : : * - valid - true/false representing whether the connection is valid or not.
2151 : : * Note that connections can become invalid in pgfdw_inval_callback.
2152 : : *
2153 : : * For API version 1.2 and later, this function takes an input parameter
2154 : : * to check a connection status and returns the following
2155 : : * additional values along with the four values from version 1.1:
2156 : : *
2157 : : * - user_name - the local user name of the active connection. In case the
2158 : : * user mapping is dropped but the connection is still active, then the
2159 : : * user name will be NULL in the output.
2160 : : * - used_in_xact - true if the connection is used in the current transaction.
2161 : : * - closed - true if the connection is closed.
2162 : : * - remote_backend_pid - process ID of the remote backend, on the foreign
2163 : : * server, handling the connection.
2164 : : *
2165 : : * No records are returned when there are no cached connections at all.
2166 : : */
2167 : : static void
458 fujii@postgresql.org 2168 : 13 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
2169 : : enum pgfdwVersion api_version)
2170 : : {
1743 2171 : 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2172 : : HASH_SEQ_STATUS scan;
2173 : : ConnCacheEntry *entry;
2174 : :
1105 michael@paquier.xyz 2175 : 13 : InitMaterializedSRF(fcinfo, 0);
2176 : :
2177 : : /* If cache doesn't exist, we return no records */
1743 fujii@postgresql.org 2178 [ - + ]: 13 : if (!ConnectionHash)
458 fujii@postgresql.org 2179 :UBC 0 : return;
2180 : :
2181 : : /* Check we have the expected number of output arguments */
458 fujii@postgresql.org 2182 [ - + - ]:CBC 13 : switch (rsinfo->setDesc->natts)
2183 : : {
458 fujii@postgresql.org 2184 :UBC 0 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
2185 [ # # ]: 0 : if (api_version != PGFDW_V1_1)
2186 [ # # ]: 0 : elog(ERROR, "incorrect number of output arguments");
2187 : 0 : break;
458 fujii@postgresql.org 2188 :CBC 13 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
2189 [ - + ]: 13 : if (api_version != PGFDW_V1_2)
458 fujii@postgresql.org 2190 [ # # ]:UBC 0 : elog(ERROR, "incorrect number of output arguments");
458 fujii@postgresql.org 2191 :CBC 13 : break;
458 fujii@postgresql.org 2192 :UBC 0 : default:
2193 [ # # ]: 0 : elog(ERROR, "incorrect number of output arguments");
2194 : : }
2195 : :
1743 fujii@postgresql.org 2196 :CBC 13 : hash_seq_init(&scan, ConnectionHash);
2197 [ + + ]: 113 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2198 : : {
2199 : : ForeignServer *server;
1199 peter@eisentraut.org 2200 : 100 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2201 : 100 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
404 fujii@postgresql.org 2202 : 100 : int i = 0;
2203 : :
2204 : : /* We only look for open remote connections */
1743 2205 [ + + ]: 100 : if (!entry->conn)
2206 : 87 : continue;
2207 : :
2208 : 13 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2209 : :
2210 : : /*
2211 : : * The foreign server may have been dropped in current explicit
2212 : : * transaction. It is not possible to drop the server from another
2213 : : * session when the connection associated with it is in use in the
2214 : : * current transaction, if tried so, the drop query in another session
2215 : : * blocks until the current transaction finishes.
2216 : : *
2217 : : * Even though the server is dropped in the current transaction, the
2218 : : * cache can still have associated active connection entry, say we
2219 : : * call such connections dangling. Since we can not fetch the server
2220 : : * name from system catalogs for dangling connections, instead we show
2221 : : * NULL value for server name in output.
2222 : : *
2223 : : * We could have done better by storing the server name in the cache
2224 : : * entry instead of server oid so that it could be used in the output.
2225 : : * But the server name in each cache entry requires 64 bytes of
2226 : : * memory, which is huge, when there are many cached connections and
2227 : : * the use case i.e. dropping the foreign server within the explicit
2228 : : * current transaction seems rare. So, we chose to show NULL value for
2229 : : * server name in output.
2230 : : *
2231 : : * Such dangling connections get closed either in next use or at the
2232 : : * end of current explicit transaction in pgfdw_xact_callback.
2233 : : */
2234 [ + + ]: 13 : if (!server)
2235 : : {
2236 : : /*
2237 : : * If the server has been dropped in the current explicit
2238 : : * transaction, then this entry would have been invalidated in
2239 : : * pgfdw_inval_callback at the end of drop server command. Note
2240 : : * that this connection would not have been closed in
2241 : : * pgfdw_inval_callback because it is still being used in the
2242 : : * current explicit transaction. So, assert that here.
2243 : : */
2244 [ + - + - : 1 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
- + ]
2245 : :
2246 : : /* Show null, if no server name was found */
404 2247 : 1 : nulls[i++] = true;
2248 : : }
2249 : : else
2250 : 12 : values[i++] = CStringGetTextDatum(server->servername);
2251 : :
2252 [ + - ]: 13 : if (api_version >= PGFDW_V1_2)
2253 : : {
2254 : : HeapTuple tp;
2255 : :
2256 : : /* Use the system cache to obtain the user mapping */
2257 : 13 : tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2258 : :
2259 : : /*
2260 : : * Just like in the foreign server case, user mappings can also be
2261 : : * dropped in the current explicit transaction. Therefore, the
2262 : : * similar check as in the server case is required.
2263 : : */
2264 [ + + ]: 13 : if (!HeapTupleIsValid(tp))
2265 : : {
2266 : : /*
2267 : : * If we reach here, this entry must have been invalidated in
2268 : : * pgfdw_inval_callback, same as in the server case.
2269 : : */
2270 [ + - + - : 1 : Assert(entry->conn && entry->xact_depth > 0 &&
- + ]
2271 : : entry->invalidated);
2272 : :
2273 : 1 : nulls[i++] = true;
2274 : : }
2275 : : else
2276 : : {
2277 : : Oid userid;
2278 : :
2279 : 12 : userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2280 [ + + ]: 12 : values[i++] = CStringGetTextDatum(MappingUserName(userid));
2281 : 12 : ReleaseSysCache(tp);
2282 : : }
2283 : : }
2284 : :
2285 : 13 : values[i++] = BoolGetDatum(!entry->invalidated);
2286 : :
458 2287 [ + - ]: 13 : if (api_version >= PGFDW_V1_2)
2288 : : {
2289 : 13 : bool check_conn = PG_GETARG_BOOL(0);
2290 : :
2291 : : /* Is this connection used in the current transaction? */
404 2292 : 13 : values[i++] = BoolGetDatum(entry->xact_depth > 0);
2293 : :
2294 : : /*
2295 : : * If a connection status check is requested and supported, return
2296 : : * whether the connection is closed. Otherwise, return NULL.
2297 : : */
458 2298 [ + + + - ]: 13 : if (check_conn && pgfdw_conn_checkable())
404 2299 : 2 : values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2300 : : else
2301 : 11 : nulls[i++] = true;
2302 : :
2303 : : /* Return process ID of remote backend */
238 2304 : 13 : values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
2305 : : }
2306 : :
1329 michael@paquier.xyz 2307 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2308 : : }
2309 : : }
2310 : :
2311 : : /*
2312 : : * List active foreign server connections.
2313 : : *
2314 : : * The SQL API of this function has changed multiple times, and will likely
2315 : : * do so again in future. To support the case where a newer version of this
2316 : : * loadable module is being used with an old SQL declaration of the function,
2317 : : * we continue to support the older API versions.
2318 : : */
2319 : : Datum
458 fujii@postgresql.org 2320 : 13 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
2321 : : {
2322 : 13 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
2323 : :
2324 : 13 : PG_RETURN_VOID();
2325 : : }
2326 : :
2327 : : Datum
458 fujii@postgresql.org 2328 :UBC 0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
2329 : : {
2330 : 0 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
2331 : :
1743 2332 : 0 : PG_RETURN_VOID();
2333 : : }
2334 : :
2335 : : /*
2336 : : * Disconnect the specified cached connections.
2337 : : *
2338 : : * This function discards the open connections that are established by
2339 : : * postgres_fdw from the local session to the foreign server with
2340 : : * the given name. Note that there can be multiple connections to
2341 : : * the given server using different user mappings. If the connections
2342 : : * are used in the current local transaction, they are not disconnected
2343 : : * and warning messages are reported. This function returns true
2344 : : * if it disconnects at least one connection, otherwise false. If no
2345 : : * foreign server with the given name is found, an error is reported.
2346 : : */
2347 : : Datum
1735 fujii@postgresql.org 2348 :CBC 4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2349 : : {
2350 : : ForeignServer *server;
2351 : : char *servername;
2352 : :
2353 : 4 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2354 : 4 : server = GetForeignServerByName(servername, false);
2355 : :
2356 : 3 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2357 : : }
2358 : :
2359 : : /*
2360 : : * Disconnect all the cached connections.
2361 : : *
2362 : : * This function discards all the open connections that are established by
2363 : : * postgres_fdw from the local session to the foreign servers.
2364 : : * If the connections are used in the current local transaction, they are
2365 : : * not disconnected and warning messages are reported. This function
2366 : : * returns true if it disconnects at least one connection, otherwise false.
2367 : : */
2368 : : Datum
2369 : 5 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2370 : : {
2371 : 5 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2372 : : }
2373 : :
2374 : : /*
2375 : : * Workhorse to disconnect cached connections.
2376 : : *
2377 : : * This function scans all the connection cache entries and disconnects
2378 : : * the open connections whose foreign server OID matches with
2379 : : * the specified one. If InvalidOid is specified, it disconnects all
2380 : : * the cached connections.
2381 : : *
2382 : : * This function emits a warning for each connection that's used in
2383 : : * the current transaction and doesn't close it. It returns true if
2384 : : * it disconnects at least one connection, otherwise false.
2385 : : *
2386 : : * Note that this function disconnects even the connections that are
2387 : : * established by other users in the same local session using different
2388 : : * user mappings. This leads even non-superuser to be able to close
2389 : : * the connections established by superusers in the same local session.
2390 : : *
2391 : : * XXX As of now we don't see any security risk doing this. But we should
2392 : : * set some restrictions on that, for example, prevent non-superuser
2393 : : * from closing the connections established by superusers even
2394 : : * in the same session?
2395 : : */
2396 : : static bool
2397 : 8 : disconnect_cached_connections(Oid serverid)
2398 : : {
2399 : : HASH_SEQ_STATUS scan;
2400 : : ConnCacheEntry *entry;
2401 : 8 : bool all = !OidIsValid(serverid);
2402 : 8 : bool result = false;
2403 : :
2404 : : /*
2405 : : * Connection cache hashtable has not been initialized yet in this
2406 : : * session, so return false.
2407 : : */
2408 [ - + ]: 8 : if (!ConnectionHash)
1735 fujii@postgresql.org 2409 :UBC 0 : return false;
2410 : :
1735 fujii@postgresql.org 2411 :CBC 8 : hash_seq_init(&scan, ConnectionHash);
2412 [ + + ]: 67 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2413 : : {
2414 : : /* Ignore cache entry if no open connection right now. */
2415 [ + + ]: 59 : if (!entry->conn)
2416 : 47 : continue;
2417 : :
2418 [ + + + + ]: 12 : if (all || entry->serverid == serverid)
2419 : : {
2420 : : /*
2421 : : * Emit a warning because the connection to close is used in the
2422 : : * current transaction and cannot be disconnected right now.
2423 : : */
2424 [ + + ]: 9 : if (entry->xact_depth > 0)
2425 : : {
2426 : : ForeignServer *server;
2427 : :
2428 : 3 : server = GetForeignServerExtended(entry->serverid,
2429 : : FSV_MISSING_OK);
2430 : :
2431 [ - + ]: 3 : if (!server)
2432 : : {
2433 : : /*
2434 : : * If the foreign server was dropped while its connection
2435 : : * was used in the current transaction, the connection
2436 : : * must have been marked as invalid by
2437 : : * pgfdw_inval_callback at the end of DROP SERVER command.
2438 : : */
1735 fujii@postgresql.org 2439 [ # # ]:UBC 0 : Assert(entry->invalidated);
2440 : :
2441 [ # # ]: 0 : ereport(WARNING,
2442 : : (errmsg("cannot close dropped server connection because it is still in use")));
2443 : : }
2444 : : else
1735 fujii@postgresql.org 2445 [ + - ]:CBC 3 : ereport(WARNING,
2446 : : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2447 : : server->servername)));
2448 : : }
2449 : : else
2450 : : {
2451 [ - + ]: 6 : elog(DEBUG3, "discarding connection %p", entry->conn);
2452 : 6 : disconnect_pg_server(entry);
2453 : 6 : result = true;
2454 : : }
2455 : : }
2456 : : }
2457 : :
2458 : 8 : return result;
2459 : : }
2460 : :
2461 : : /*
2462 : : * Check if the remote server closed the connection.
2463 : : *
2464 : : * Returns 1 if the connection is closed, -1 if an error occurred,
2465 : : * and 0 if it's not closed or if the connection check is unavailable
2466 : : * on this platform.
2467 : : */
2468 : : static int
458 2469 : 2 : pgfdw_conn_check(PGconn *conn)
2470 : : {
2471 : 2 : int sock = PQsocket(conn);
2472 : :
2473 [ + - - + ]: 2 : if (PQstatus(conn) != CONNECTION_OK || sock == -1)
458 fujii@postgresql.org 2474 :UBC 0 : return -1;
2475 : :
2476 : : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2477 : : {
2478 : : struct pollfd input_fd;
2479 : : int result;
2480 : :
458 fujii@postgresql.org 2481 :CBC 2 : input_fd.fd = sock;
2482 : 2 : input_fd.events = POLLRDHUP;
2483 : 2 : input_fd.revents = 0;
2484 : :
2485 : : do
2486 : 2 : result = poll(&input_fd, 1, 0);
2487 [ - + - - ]: 2 : while (result < 0 && errno == EINTR);
2488 : :
2489 [ - + ]: 2 : if (result < 0)
458 fujii@postgresql.org 2490 :UBC 0 : return -1;
2491 : :
457 fujii@postgresql.org 2492 :CBC 2 : return (input_fd.revents &
2493 : 2 : (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2494 : : }
2495 : : #else
2496 : : return 0;
2497 : : #endif
2498 : : }
2499 : :
2500 : : /*
2501 : : * Check if connection status checking is available on this platform.
2502 : : *
2503 : : * Returns true if available, false otherwise.
2504 : : */
2505 : : static bool
458 2506 : 2 : pgfdw_conn_checkable(void)
2507 : : {
2508 : : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2509 : 2 : return true;
2510 : : #else
2511 : : return false;
2512 : : #endif
2513 : : }
2514 : :
2515 : : /*
2516 : : * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
2517 : : * keys used to pass-through are coming from the initial connection from the
2518 : : * client with the server.
2519 : : *
2520 : : * All required SCRAM options are set by postgres_fdw, so we just need to
2521 : : * ensure that these options are not overwritten by the user.
2522 : : */
2523 : : static bool
217 peter@eisentraut.org 2524 : 8 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
2525 : : {
2526 : 8 : bool has_scram_server_key = false;
2527 : 8 : bool has_scram_client_key = false;
2528 : 8 : bool has_require_auth = false;
2529 : 8 : bool has_scram_keys = false;
2530 : :
2531 : : /*
2532 : : * Continue iterating even if we found the keys that we need to validate
2533 : : * to make sure that there is no other declaration of these keys that can
2534 : : * overwrite the first.
2535 : : */
2536 [ + + ]: 80 : for (int i = 0; keywords[i] != NULL; i++)
2537 : : {
2538 [ + + ]: 72 : if (strcmp(keywords[i], "scram_client_key") == 0)
2539 : : {
2540 [ + - + - ]: 8 : if (values[i] != NULL && values[i][0] != '\0')
2541 : 8 : has_scram_client_key = true;
2542 : : else
217 peter@eisentraut.org 2543 :UBC 0 : has_scram_client_key = false;
2544 : : }
2545 : :
217 peter@eisentraut.org 2546 [ + + ]:CBC 72 : if (strcmp(keywords[i], "scram_server_key") == 0)
2547 : : {
2548 [ + - + - ]: 8 : if (values[i] != NULL && values[i][0] != '\0')
2549 : 8 : has_scram_server_key = true;
2550 : : else
217 peter@eisentraut.org 2551 :UBC 0 : has_scram_server_key = false;
2552 : : }
2553 : :
217 peter@eisentraut.org 2554 [ + + ]:CBC 72 : if (strcmp(keywords[i], "require_auth") == 0)
2555 : : {
2556 [ + - + - ]: 8 : if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
2557 : 8 : has_require_auth = true;
2558 : : else
217 peter@eisentraut.org 2559 :UBC 0 : has_require_auth = false;
2560 : : }
2561 : : }
2562 : :
80 peter@eisentraut.org 2563 [ + - + - :CBC 8 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
+ - + - ]
2564 : :
217 2565 [ + - + - ]: 8 : return (has_scram_keys && has_require_auth);
2566 : : }
|