Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execAsync.c
4 : : * Support routines for asynchronous execution
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/execAsync.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "executor/execAsync.h"
18 : : #include "executor/executor.h"
19 : : #include "executor/instrument.h"
20 : : #include "executor/nodeAppend.h"
21 : : #include "executor/nodeForeignscan.h"
22 : :
23 : : /*
24 : : * Asynchronously request a tuple from a designed async-capable node.
25 : : */
26 : : void
1861 efujita@postgresql.o 27 :CBC 6175 : ExecAsyncRequest(AsyncRequest *areq)
28 : : {
1819 29 [ + + ]: 6175 : if (areq->requestee->chgParam != NULL) /* something changed? */
tgl@sss.pgh.pa.us 30 : 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
31 : :
32 : : /* must provide our own instrumentation support */
efujita@postgresql.o 33 [ + + ]: 6175 : if (areq->requestee->instrument)
34 : 810 : InstrStartNode(areq->requestee->instrument);
35 : :
1861 36 [ + - ]: 6175 : switch (nodeTag(areq->requestee))
37 : : {
38 : 6175 : case T_ForeignScanState:
39 : 6175 : ExecAsyncForeignScanRequest(areq);
40 : 6175 : break;
1861 efujita@postgresql.o 41 :UBC 0 : default:
42 : : /* If the node doesn't support async, caller messed up. */
43 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
44 : : (int) nodeTag(areq->requestee));
45 : : }
46 : :
1861 efujita@postgresql.o 47 :CBC 6175 : ExecAsyncResponse(areq);
48 : :
49 : : /* must provide our own instrumentation support */
1819 50 [ + + ]: 6175 : if (areq->requestee->instrument)
51 : 810 : InstrStopNode(areq->requestee->instrument,
52 [ + + - + ]: 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
1861 53 : 6175 : }
54 : :
55 : : /*
56 : : * Give the asynchronous node a chance to configure the file descriptor event
57 : : * for which it wishes to wait. We expect the node-type specific callback to
58 : : * make a single call of the following form:
59 : : *
60 : : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
61 : : */
62 : : void
63 : 205 : ExecAsyncConfigureWait(AsyncRequest *areq)
64 : : {
65 : : /* must provide our own instrumentation support */
1819 66 [ + + ]: 205 : if (areq->requestee->instrument)
67 : 21 : InstrStartNode(areq->requestee->instrument);
68 : :
1861 69 [ + - ]: 205 : switch (nodeTag(areq->requestee))
70 : : {
71 : 205 : case T_ForeignScanState:
72 : 205 : ExecAsyncForeignScanConfigureWait(areq);
73 : 204 : break;
1861 efujita@postgresql.o 74 :UBC 0 : default:
75 : : /* If the node doesn't support async, caller messed up. */
76 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
77 : : (int) nodeTag(areq->requestee));
78 : : }
79 : :
80 : : /* must provide our own instrumentation support */
1819 efujita@postgresql.o 81 [ + + ]:CBC 204 : if (areq->requestee->instrument)
82 : 21 : InstrStopNode(areq->requestee->instrument, 0.0);
1861 83 : 204 : }
84 : :
85 : : /*
86 : : * Call the asynchronous node back when a relevant event has occurred.
87 : : */
88 : : void
89 : 148 : ExecAsyncNotify(AsyncRequest *areq)
90 : : {
91 : : /* must provide our own instrumentation support */
1819 92 [ + + ]: 148 : if (areq->requestee->instrument)
93 : 14 : InstrStartNode(areq->requestee->instrument);
94 : :
1861 95 [ + - ]: 148 : switch (nodeTag(areq->requestee))
96 : : {
97 : 148 : case T_ForeignScanState:
98 : 148 : ExecAsyncForeignScanNotify(areq);
99 : 148 : break;
1861 efujita@postgresql.o 100 :UBC 0 : default:
101 : : /* If the node doesn't support async, caller messed up. */
102 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
103 : : (int) nodeTag(areq->requestee));
104 : : }
105 : :
1861 efujita@postgresql.o 106 :CBC 148 : ExecAsyncResponse(areq);
107 : :
108 : : /* must provide our own instrumentation support */
1819 109 [ + + ]: 148 : if (areq->requestee->instrument)
110 : 14 : InstrStopNode(areq->requestee->instrument,
111 [ + + - + ]: 14 : TupIsNull(areq->result) ? 0.0 : 1.0);
1861 112 : 148 : }
113 : :
114 : : /*
115 : : * Call the requestor back when an asynchronous node has produced a result.
116 : : */
117 : : void
118 : 6328 : ExecAsyncResponse(AsyncRequest *areq)
119 : : {
120 [ + - ]: 6328 : switch (nodeTag(areq->requestor))
121 : : {
122 : 6328 : case T_AppendState:
123 : 6328 : ExecAsyncAppendResponse(areq);
124 : 6328 : break;
1861 efujita@postgresql.o 125 :UBC 0 : default:
126 : : /* If the node doesn't support async, caller messed up. */
127 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
128 : : (int) nodeTag(areq->requestor));
129 : : }
1861 efujita@postgresql.o 130 :CBC 6328 : }
131 : :
132 : : /*
133 : : * A requestee node should call this function to deliver the tuple to its
134 : : * requestor node. The requestee node can call this from its ExecAsyncRequest
135 : : * or ExecAsyncNotify callback.
136 : : */
137 : : void
138 : 6167 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
139 : : {
140 : 6167 : areq->request_complete = true;
141 : 6167 : areq->result = result;
142 : 6167 : }
143 : :
144 : : /*
145 : : * A requestee node should call this function to indicate that it is pending
146 : : * for a callback. The requestee node can call this from its ExecAsyncRequest
147 : : * or ExecAsyncNotify callback.
148 : : */
149 : : void
150 : 161 : ExecAsyncRequestPending(AsyncRequest *areq)
151 : : {
152 : 161 : areq->callback_pending = true;
153 : 161 : areq->request_complete = false;
154 : 161 : areq->result = NULL;
155 : 161 : }
|