Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pqmq.c
4 : : * Use the frontend/backend protocol for communication over a shm_mq
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * src/backend/libpq/pqmq.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres.h"
15 : :
16 : : #include "access/parallel.h"
17 : : #include "libpq/libpq.h"
18 : : #include "libpq/pqformat.h"
19 : : #include "libpq/pqmq.h"
20 : : #include "miscadmin.h"
21 : : #include "pgstat.h"
22 : : #include "replication/logicalworker.h"
23 : : #include "storage/latch.h"
24 : : #include "tcop/tcopprot.h"
25 : : #include "utils/builtins.h"
26 : : #include "utils/wait_event.h"
27 : :
28 : : static shm_mq_handle *pq_mq_handle = NULL;
29 : : static bool pq_mq_busy = false;
30 : : static pid_t pq_mq_parallel_leader_pid = 0;
31 : : static ProcNumber pq_mq_parallel_leader_proc_number = INVALID_PROC_NUMBER;
32 : :
33 : : static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
34 : : static void mq_comm_reset(void);
35 : : static int mq_flush(void);
36 : : static int mq_flush_if_writable(void);
37 : : static bool mq_is_send_pending(void);
38 : : static int mq_putmessage(char msgtype, const char *s, size_t len);
39 : : static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
40 : :
41 : : static const PQcommMethods PqCommMqMethods = {
42 : : .comm_reset = mq_comm_reset,
43 : : .flush = mq_flush,
44 : : .flush_if_writable = mq_flush_if_writable,
45 : : .is_send_pending = mq_is_send_pending,
46 : : .putmessage = mq_putmessage,
47 : : .putmessage_noblock = mq_putmessage_noblock
48 : : };
49 : :
50 : : /*
51 : : * Arrange to redirect frontend/backend protocol messages to a shared-memory
52 : : * message queue.
53 : : */
54 : : void
3803 rhaas@postgresql.org 55 :CBC 1503 : pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
56 : : {
4153 57 : 1503 : PqCommMethods = &PqCommMqMethods;
58 : 1503 : pq_mq_handle = mqh;
59 : 1503 : whereToSendOutput = DestRemote;
60 : 1503 : FrontendProtocol = PG_PROTOCOL_LATEST;
3803 61 : 1503 : on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
62 : 1503 : }
63 : :
64 : : /*
65 : : * When the DSM that contains our shm_mq goes away, we need to stop sending
66 : : * messages to it.
67 : : */
68 : : static void
69 : 1503 : pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
70 : : {
225 tgl@sss.pgh.pa.us 71 [ + - ]:GNC 1503 : if (pq_mq_handle != NULL)
72 : : {
73 : 1503 : pfree(pq_mq_handle);
74 : 1503 : pq_mq_handle = NULL;
75 : : }
3803 rhaas@postgresql.org 76 :CBC 1503 : whereToSendOutput = DestNone;
4153 77 : 1503 : }
78 : :
79 : : /*
80 : : * Arrange to SendProcSignal() to the parallel leader each time we transmit
81 : : * message data via the shm_mq.
82 : : */
83 : : void
742 heikki.linnakangas@i 84 : 1503 : pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
85 : : {
3972 rhaas@postgresql.org 86 [ - + ]: 1503 : Assert(PqCommMethods == &PqCommMqMethods);
2100 andres@anarazel.de 87 : 1503 : pq_mq_parallel_leader_pid = pid;
742 heikki.linnakangas@i 88 : 1503 : pq_mq_parallel_leader_proc_number = procNumber;
3972 rhaas@postgresql.org 89 : 1503 : }
90 : :
91 : : static void
4153 rhaas@postgresql.org 92 :UBC 0 : mq_comm_reset(void)
93 : : {
94 : : /* Nothing to do. */
95 : 0 : }
96 : :
97 : : static int
4153 rhaas@postgresql.org 98 :CBC 13 : mq_flush(void)
99 : : {
100 : : /* Nothing to do. */
101 : 13 : return 0;
102 : : }
103 : :
104 : : static int
4153 rhaas@postgresql.org 105 :UBC 0 : mq_flush_if_writable(void)
106 : : {
107 : : /* Nothing to do. */
108 : 0 : return 0;
109 : : }
110 : :
111 : : static bool
112 : 0 : mq_is_send_pending(void)
113 : : {
114 : : /* There's never anything pending. */
115 : 0 : return 0;
116 : : }
117 : :
118 : : /*
119 : : * Transmit a libpq protocol message to the shared memory message queue
120 : : * selected via pq_mq_handle. We don't include a length word, because the
121 : : * receiver will know the length of the message from shm_mq_receive().
122 : : */
123 : : static int
4153 rhaas@postgresql.org 124 :CBC 1498 : mq_putmessage(char msgtype, const char *s, size_t len)
125 : : {
126 : : shm_mq_iovec iov[2];
127 : : shm_mq_result result;
128 : :
129 : : /*
130 : : * If we're sending a message, and we have to wait because the queue is
131 : : * full, and then we get interrupted, and that interrupt results in trying
132 : : * to send another message, we respond by detaching the queue. There's no
133 : : * way to return to the original context, but even if there were, just
134 : : * queueing the message would amount to indefinitely postponing the
135 : : * response to the interrupt. So we do this instead.
136 : : */
137 [ - + ]: 1498 : if (pq_mq_busy)
138 : : {
3118 tgl@sss.pgh.pa.us 139 [ # # ]:UBC 0 : if (pq_mq_handle != NULL)
140 : : {
141 : 0 : shm_mq_detach(pq_mq_handle);
225 tgl@sss.pgh.pa.us 142 :UNC 0 : pfree(pq_mq_handle);
143 : 0 : pq_mq_handle = NULL;
144 : : }
4153 rhaas@postgresql.org 145 :UBC 0 : return EOF;
146 : : }
147 : :
148 : : /*
149 : : * If the message queue is already gone, just ignore the message. This
150 : : * doesn't necessarily indicate a problem; for example, DEBUG messages can
151 : : * be generated late in the shutdown sequence, after all DSMs have already
152 : : * been detached.
153 : : */
3118 tgl@sss.pgh.pa.us 154 [ - + ]:CBC 1498 : if (pq_mq_handle == NULL)
3803 rhaas@postgresql.org 155 :UBC 0 : return 0;
156 : :
4153 rhaas@postgresql.org 157 :CBC 1498 : pq_mq_busy = true;
158 : :
159 : 1498 : iov[0].data = &msgtype;
160 : 1498 : iov[0].len = 1;
161 : 1498 : iov[1].data = s;
162 : 1498 : iov[1].len = len;
163 : :
164 : : for (;;)
165 : : {
166 : : /*
167 : : * Immediately notify the receiver by passing force_flush as true so
168 : : * that the shared memory value is updated before we send the parallel
169 : : * message signal right after this.
170 : : */
225 tgl@sss.pgh.pa.us 171 [ - + ]:GNC 1502 : Assert(pq_mq_handle != NULL);
1613 rhaas@postgresql.org 172 :CBC 1502 : result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
173 : :
2100 andres@anarazel.de 174 [ + - ]: 1502 : if (pq_mq_parallel_leader_pid != 0)
175 : : {
1161 akapila@postgresql.o 176 [ + + ]: 1502 : if (IsLogicalParallelApplyWorker())
177 : 7 : SendProcSignal(pq_mq_parallel_leader_pid,
178 : : PROCSIG_PARALLEL_APPLY_MESSAGE,
179 : : pq_mq_parallel_leader_proc_number);
180 : : else
181 : : {
182 [ - + ]: 1495 : Assert(IsParallelWorker());
183 : 1495 : SendProcSignal(pq_mq_parallel_leader_pid,
184 : : PROCSIG_PARALLEL_MESSAGE,
185 : : pq_mq_parallel_leader_proc_number);
186 : : }
187 : : }
188 : :
3972 rhaas@postgresql.org 189 [ + + ]: 1502 : if (result != SHM_MQ_WOULD_BLOCK)
190 : 1498 : break;
191 : :
2669 tmunro@postgresql.or 192 : 4 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
193 : : WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE);
3204 andres@anarazel.de 194 : 4 : ResetLatch(MyLatch);
3513 tgl@sss.pgh.pa.us 195 [ - + ]: 4 : CHECK_FOR_INTERRUPTS();
196 : : }
197 : :
4153 rhaas@postgresql.org 198 : 1498 : pq_mq_busy = false;
199 : :
200 [ + + - + ]: 1498 : Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
201 [ + + ]: 1498 : if (result != SHM_MQ_SUCCESS)
202 : 4 : return EOF;
203 : 1494 : return 0;
204 : : }
205 : :
206 : : static void
4153 rhaas@postgresql.org 207 :UBC 0 : mq_putmessage_noblock(char msgtype, const char *s, size_t len)
208 : : {
209 : : /*
210 : : * While the shm_mq machinery does support sending a message in
211 : : * non-blocking mode, there's currently no way to try sending beginning to
212 : : * send the message that doesn't also commit us to completing the
213 : : * transmission. This could be improved in the future, but for now we
214 : : * don't need it.
215 : : */
216 [ # # ]: 0 : elog(ERROR, "not currently supported");
217 : : }
218 : :
219 : : /*
220 : : * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
221 : : * structure with the results.
222 : : */
223 : : void
4153 rhaas@postgresql.org 224 :CBC 9 : pq_parse_errornotice(StringInfo msg, ErrorData *edata)
225 : : {
226 : : /* Initialize edata with reasonable defaults. */
227 [ + - + - : 216 : MemSet(edata, 0, sizeof(ErrorData));
+ - + - +
+ ]
228 : 9 : edata->elevel = ERROR;
229 : 9 : edata->assoc_context = CurrentMemoryContext;
230 : :
231 : : /* Loop over fields and extract each one. */
232 : : for (;;)
233 : 74 : {
3949 bruce@momjian.us 234 : 83 : char code = pq_getmsgbyte(msg);
235 : : const char *value;
236 : :
4153 rhaas@postgresql.org 237 [ + + ]: 83 : if (code == '\0')
238 : : {
239 : 9 : pq_getmsgend(msg);
240 : 9 : break;
241 : : }
3545 242 : 74 : value = pq_getmsgrawstring(msg);
243 : :
4153 244 [ + + + + : 74 : switch (code)
+ + - - -
+ - - - -
- + + +
- ]
245 : : {
246 : 9 : case PG_DIAG_SEVERITY:
247 : : /* ignore, trusting we'll get a nonlocalized version */
3488 tgl@sss.pgh.pa.us 248 : 9 : break;
249 : 9 : case PG_DIAG_SEVERITY_NONLOCALIZED:
4153 rhaas@postgresql.org 250 [ - + ]: 9 : if (strcmp(value, "DEBUG") == 0)
251 : : {
252 : : /*
253 : : * We can't reconstruct the exact DEBUG level, but
254 : : * presumably it was >= client_min_messages, so select
255 : : * DEBUG1 to ensure we'll pass it on to the client.
256 : : */
3488 tgl@sss.pgh.pa.us 257 :UBC 0 : edata->elevel = DEBUG1;
258 : : }
4153 rhaas@postgresql.org 259 [ - + ]:CBC 9 : else if (strcmp(value, "LOG") == 0)
260 : : {
261 : : /*
262 : : * It can't be LOG_SERVER_ONLY, or the worker wouldn't
263 : : * have sent it to us; so LOG is the correct value.
264 : : */
3488 tgl@sss.pgh.pa.us 265 :UBC 0 : edata->elevel = LOG;
266 : : }
4153 rhaas@postgresql.org 267 [ - + ]:CBC 9 : else if (strcmp(value, "INFO") == 0)
4153 rhaas@postgresql.org 268 :UBC 0 : edata->elevel = INFO;
4153 rhaas@postgresql.org 269 [ - + ]:CBC 9 : else if (strcmp(value, "NOTICE") == 0)
4153 rhaas@postgresql.org 270 :UBC 0 : edata->elevel = NOTICE;
4153 rhaas@postgresql.org 271 [ - + ]:CBC 9 : else if (strcmp(value, "WARNING") == 0)
4153 rhaas@postgresql.org 272 :UBC 0 : edata->elevel = WARNING;
4153 rhaas@postgresql.org 273 [ + - ]:CBC 9 : else if (strcmp(value, "ERROR") == 0)
274 : 9 : edata->elevel = ERROR;
4153 rhaas@postgresql.org 275 [ # # ]:UBC 0 : else if (strcmp(value, "FATAL") == 0)
276 : 0 : edata->elevel = FATAL;
277 [ # # ]: 0 : else if (strcmp(value, "PANIC") == 0)
278 : 0 : edata->elevel = PANIC;
279 : : else
3488 tgl@sss.pgh.pa.us 280 [ # # ]: 0 : elog(ERROR, "unrecognized error severity: \"%s\"", value);
4153 rhaas@postgresql.org 281 :CBC 9 : break;
282 : 9 : case PG_DIAG_SQLSTATE:
283 [ - + ]: 9 : if (strlen(value) != 5)
3488 tgl@sss.pgh.pa.us 284 [ # # ]:UBC 0 : elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
4153 rhaas@postgresql.org 285 :CBC 9 : edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
286 : : value[3], value[4]);
287 : 9 : break;
288 : 9 : case PG_DIAG_MESSAGE_PRIMARY:
289 : 9 : edata->message = pstrdup(value);
290 : 9 : break;
291 : 2 : case PG_DIAG_MESSAGE_DETAIL:
292 : 2 : edata->detail = pstrdup(value);
293 : 2 : break;
294 : 3 : case PG_DIAG_MESSAGE_HINT:
295 : 3 : edata->hint = pstrdup(value);
296 : 3 : break;
4153 rhaas@postgresql.org 297 :UBC 0 : case PG_DIAG_STATEMENT_POSITION:
2793 andres@anarazel.de 298 : 0 : edata->cursorpos = pg_strtoint32(value);
4153 rhaas@postgresql.org 299 : 0 : break;
300 : 0 : case PG_DIAG_INTERNAL_POSITION:
2793 andres@anarazel.de 301 : 0 : edata->internalpos = pg_strtoint32(value);
4153 rhaas@postgresql.org 302 : 0 : break;
303 : 0 : case PG_DIAG_INTERNAL_QUERY:
304 : 0 : edata->internalquery = pstrdup(value);
305 : 0 : break;
4153 rhaas@postgresql.org 306 :CBC 6 : case PG_DIAG_CONTEXT:
307 : 6 : edata->context = pstrdup(value);
308 : 6 : break;
4153 rhaas@postgresql.org 309 :UBC 0 : case PG_DIAG_SCHEMA_NAME:
310 : 0 : edata->schema_name = pstrdup(value);
311 : 0 : break;
312 : 0 : case PG_DIAG_TABLE_NAME:
313 : 0 : edata->table_name = pstrdup(value);
314 : 0 : break;
315 : 0 : case PG_DIAG_COLUMN_NAME:
316 : 0 : edata->column_name = pstrdup(value);
317 : 0 : break;
318 : 0 : case PG_DIAG_DATATYPE_NAME:
319 : 0 : edata->datatype_name = pstrdup(value);
320 : 0 : break;
321 : 0 : case PG_DIAG_CONSTRAINT_NAME:
322 : 0 : edata->constraint_name = pstrdup(value);
323 : 0 : break;
4153 rhaas@postgresql.org 324 :CBC 9 : case PG_DIAG_SOURCE_FILE:
325 : 9 : edata->filename = pstrdup(value);
326 : 9 : break;
327 : 9 : case PG_DIAG_SOURCE_LINE:
2793 andres@anarazel.de 328 : 9 : edata->lineno = pg_strtoint32(value);
4153 rhaas@postgresql.org 329 : 9 : break;
330 : 9 : case PG_DIAG_SOURCE_FUNCTION:
331 : 9 : edata->funcname = pstrdup(value);
332 : 9 : break;
4153 rhaas@postgresql.org 333 :UBC 0 : default:
96 peter@eisentraut.org 334 [ # # ]:UNC 0 : elog(ERROR, "unrecognized error field code: %d", code);
335 : : break;
336 : : }
337 : : }
4153 rhaas@postgresql.org 338 :CBC 9 : }
|