Age Owner Branch data TLA Line data Source code
1 : : /*--------------------------------------------------------------------------
2 : : *
3 : : * test.c
4 : : * Test harness code for shared memory message queues.
5 : : *
6 : : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/test/modules/test_shm_mq/test.c
10 : : *
11 : : * -------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres.h"
15 : :
16 : : #include "fmgr.h"
17 : : #include "miscadmin.h"
18 : : #include "pgstat.h"
19 : : #include "storage/proc.h"
20 : : #include "utils/wait_event.h"
21 : : #include "varatt.h"
22 : :
23 : : #include "test_shm_mq.h"
24 : :
4266 bruce@momjian.us 25 :CBC 8 : PG_MODULE_MAGIC;
26 : :
27 : 2 : PG_FUNCTION_INFO_V1(test_shm_mq);
4443 rhaas@postgresql.org 28 : 2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
29 : :
30 : : static void verify_message(Size origlen, char *origdata, Size newlen,
31 : : char *newdata);
32 : :
33 : : /* value cached, fetched from shared memory */
34 : : static uint32 we_message_queue = 0;
35 : :
36 : : /*
37 : : * Simple test of the shared memory message queue infrastructure.
38 : : *
39 : : * We set up a ring of message queues passing through 1 or more background
40 : : * processes and eventually looping back to ourselves. We then send a message
41 : : * through the ring a number of times indicated by the loop count. At the end,
42 : : * we check whether the final message matches the one we started with.
43 : : */
44 : : Datum
45 : 4 : test_shm_mq(PG_FUNCTION_ARGS)
46 : : {
47 : 4 : int64 queue_size = PG_GETARG_INT64(0);
48 : 4 : text *message = PG_GETARG_TEXT_PP(1);
49 [ - + ]: 4 : char *message_contents = VARDATA_ANY(message);
50 [ - + - - : 4 : int message_size = VARSIZE_ANY_EXHDR(message);
- - - - -
+ ]
51 : 4 : int32 loop_count = PG_GETARG_INT32(2);
52 : 4 : int32 nworkers = PG_GETARG_INT32(3);
53 : : dsm_segment *seg;
54 : : shm_mq_handle *outqh;
55 : : shm_mq_handle *inqh;
56 : : shm_mq_result res;
57 : : Size len;
58 : : void *data;
59 : :
60 : : /* A negative loopcount is nonsensical. */
61 [ - + ]: 4 : if (loop_count < 0)
4443 rhaas@postgresql.org 62 [ # # ]:UBC 0 : ereport(ERROR,
63 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
64 : : errmsg("repeat count size must be an integer value greater than or equal to zero")));
65 : :
66 : : /*
67 : : * Since this test sends data using the blocking interfaces, it cannot
68 : : * send data to itself. Therefore, a minimum of 1 worker is required. Of
69 : : * course, a negative worker count is nonsensical.
70 : : */
1691 fujii@postgresql.org 71 [ - + ]:CBC 4 : if (nworkers <= 0)
4443 rhaas@postgresql.org 72 [ # # ]:UBC 0 : ereport(ERROR,
73 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : : errmsg("number of workers must be an integer value greater than zero")));
75 : :
76 : : /* Set up dynamic shared memory segment and background workers. */
4443 rhaas@postgresql.org 77 :CBC 4 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
78 : :
79 : : /* Send the initial message. */
1613 80 : 4 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
4443 81 [ + - ]: 4 : if (res != SHM_MQ_SUCCESS)
4443 rhaas@postgresql.org 82 [ # # ]:UBC 0 : ereport(ERROR,
83 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
84 : : errmsg("could not send message")));
85 : :
86 : : /*
87 : : * Receive a message and send it back out again. Do this a number of
88 : : * times equal to the loop count.
89 : : */
90 : : for (;;)
91 : : {
92 : : /* Receive a message. */
4443 rhaas@postgresql.org 93 :CBC 24001 : res = shm_mq_receive(inqh, &len, &data, false);
94 [ - + ]: 24001 : if (res != SHM_MQ_SUCCESS)
4443 rhaas@postgresql.org 95 [ # # ]:UBC 0 : ereport(ERROR,
96 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
97 : : errmsg("could not receive message")));
98 : :
99 : : /* If this is supposed to be the last iteration, stop here. */
4443 rhaas@postgresql.org 100 [ + + ]:CBC 24001 : if (--loop_count <= 0)
101 : 4 : break;
102 : :
103 : : /* Send it back out. */
1613 104 : 23997 : res = shm_mq_send(outqh, len, data, false, true);
4443 105 [ - + ]: 23997 : if (res != SHM_MQ_SUCCESS)
4443 rhaas@postgresql.org 106 [ # # ]:UBC 0 : ereport(ERROR,
107 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
108 : : errmsg("could not send message")));
109 : : }
110 : :
111 : : /*
112 : : * Finally, check that we got back the same message from the last
113 : : * iteration that we originally sent.
114 : : */
4443 rhaas@postgresql.org 115 :CBC 4 : verify_message(message_size, message_contents, len, data);
116 : :
117 : : /* Clean up. */
118 : 4 : dsm_detach(seg);
119 : :
120 : 4 : PG_RETURN_VOID();
121 : : }
122 : :
123 : : /*
124 : : * Pipelined test of the shared memory message queue infrastructure.
125 : : *
126 : : * As in the basic test, we set up a ring of message queues passing through
127 : : * 1 or more background processes and eventually looping back to ourselves.
128 : : * Then, we send N copies of the user-specified message through the ring and
129 : : * receive them all back. Since this might fill up all message queues in the
130 : : * ring and then stall, we must be prepared to begin receiving the messages
131 : : * back before we've finished sending them.
132 : : */
133 : : Datum
134 : 1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
135 : : {
136 : 1 : int64 queue_size = PG_GETARG_INT64(0);
137 : 1 : text *message = PG_GETARG_TEXT_PP(1);
138 [ - + ]: 1 : char *message_contents = VARDATA_ANY(message);
139 [ - + - - : 1 : int message_size = VARSIZE_ANY_EXHDR(message);
- - - - -
+ ]
140 : 1 : int32 loop_count = PG_GETARG_INT32(2);
141 : 1 : int32 nworkers = PG_GETARG_INT32(3);
142 : 1 : bool verify = PG_GETARG_BOOL(4);
143 : 1 : int32 send_count = 0;
144 : 1 : int32 receive_count = 0;
145 : : dsm_segment *seg;
146 : : shm_mq_handle *outqh;
147 : : shm_mq_handle *inqh;
148 : : shm_mq_result res;
149 : : Size len;
150 : : void *data;
151 : :
152 : : /* A negative loopcount is nonsensical. */
153 [ - + ]: 1 : if (loop_count < 0)
4443 rhaas@postgresql.org 154 [ # # ]:UBC 0 : ereport(ERROR,
155 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
156 : : errmsg("repeat count size must be an integer value greater than or equal to zero")));
157 : :
158 : : /*
159 : : * Using the nonblocking interfaces, we can even send data to ourselves,
160 : : * so the minimum number of workers for this test is zero.
161 : : */
4443 rhaas@postgresql.org 162 [ - + ]:CBC 1 : if (nworkers < 0)
4443 rhaas@postgresql.org 163 [ # # ]:UBC 0 : ereport(ERROR,
164 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165 : : errmsg("number of workers must be an integer value greater than or equal to zero")));
166 : :
167 : : /* Set up dynamic shared memory segment and background workers. */
4443 rhaas@postgresql.org 168 :CBC 1 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
169 : :
170 : : /* Main loop. */
171 : : for (;;)
172 : 4141 : {
173 : 4142 : bool wait = true;
174 : :
175 : : /*
176 : : * If we haven't yet sent the message the requisite number of times,
177 : : * try again to send it now. Note that when shm_mq_send() returns
178 : : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
179 : : * same message size and contents; that's not an issue here because
180 : : * we're sending the same message every time.
181 : : */
182 [ + + ]: 4142 : if (send_count < loop_count)
183 : : {
1613 184 : 4104 : res = shm_mq_send(outqh, message_size, message_contents, true,
185 : : true);
4443 186 [ + + ]: 4104 : if (res == SHM_MQ_SUCCESS)
187 : : {
188 : 200 : ++send_count;
189 : 200 : wait = false;
190 : : }
191 [ - + ]: 3904 : else if (res == SHM_MQ_DETACHED)
4443 rhaas@postgresql.org 192 [ # # ]:UBC 0 : ereport(ERROR,
193 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194 : : errmsg("could not send message")));
195 : : }
196 : :
197 : : /*
198 : : * If we haven't yet received the message the requisite number of
199 : : * times, try to receive it again now.
200 : : */
4443 rhaas@postgresql.org 201 [ + + ]:CBC 4142 : if (receive_count < loop_count)
202 : : {
203 : 4141 : res = shm_mq_receive(inqh, &len, &data, true);
204 [ + + ]: 4141 : if (res == SHM_MQ_SUCCESS)
205 : : {
206 : 200 : ++receive_count;
207 : : /* Verifying every time is slow, so it's optional. */
208 [ + - ]: 200 : if (verify)
209 : 200 : verify_message(message_size, message_contents, len, data);
210 : 200 : wait = false;
211 : : }
212 [ - + ]: 3941 : else if (res == SHM_MQ_DETACHED)
4443 rhaas@postgresql.org 213 [ # # ]:UBC 0 : ereport(ERROR,
214 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
215 : : errmsg("could not receive message")));
216 : : }
217 : : else
218 : : {
219 : : /*
220 : : * Otherwise, we've received the message enough times. This
221 : : * shouldn't happen unless we've also sent it enough times.
222 : : */
4443 rhaas@postgresql.org 223 [ - + ]:CBC 1 : if (send_count != receive_count)
4443 rhaas@postgresql.org 224 [ # # ]:UBC 0 : ereport(ERROR,
225 : : (errcode(ERRCODE_INTERNAL_ERROR),
226 : : errmsg("message sent %d times, but received %d times",
227 : : send_count, receive_count)));
4443 rhaas@postgresql.org 228 :CBC 1 : break;
229 : : }
230 : :
231 [ + + ]: 4141 : if (wait)
232 : : {
233 : : /* first time, allocate or get the custom wait event */
893 michael@paquier.xyz 234 [ + + ]: 3792 : if (we_message_queue == 0)
235 : 1 : we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
236 : :
237 : : /*
238 : : * If we made no progress, wait for one of the other processes to
239 : : * which we are connected to set our latch, indicating that they
240 : : * have read or written data and therefore there may now be work
241 : : * for us to do.
242 : : */
2669 tmunro@postgresql.or 243 : 3792 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
244 : : we_message_queue);
4078 andres@anarazel.de 245 : 3792 : ResetLatch(MyLatch);
3513 tgl@sss.pgh.pa.us 246 [ - + ]: 3792 : CHECK_FOR_INTERRUPTS();
247 : : }
248 : : }
249 : :
250 : : /* Clean up. */
4443 rhaas@postgresql.org 251 : 1 : dsm_detach(seg);
252 : :
253 : 1 : PG_RETURN_VOID();
254 : : }
255 : :
256 : : /*
257 : : * Verify that two messages are the same.
258 : : */
259 : : static void
4380 260 : 204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
261 : : {
262 : : Size i;
263 : :
4443 264 [ - + ]: 204 : if (origlen != newlen)
4443 rhaas@postgresql.org 265 [ # # ]:UBC 0 : ereport(ERROR,
266 : : (errmsg("message corrupted"),
267 : : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
268 : : origlen, newlen)));
269 : :
4443 rhaas@postgresql.org 270 [ + + ]:CBC 54001299 : for (i = 0; i < origlen; ++i)
271 [ - + ]: 54001095 : if (origdata[i] != newdata[i])
4443 rhaas@postgresql.org 272 [ # # ]:UBC 0 : ereport(ERROR,
273 : : (errmsg("message corrupted"),
274 : : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
4443 rhaas@postgresql.org 275 :CBC 204 : }
|