Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * proto.c
4 : : * logical replication protocol functions
5 : : *
6 : : * Copyright (c) 2015-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/proto.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/sysattr.h"
16 : : #include "catalog/pg_namespace.h"
17 : : #include "catalog/pg_type.h"
18 : : #include "libpq/pqformat.h"
19 : : #include "replication/logicalproto.h"
20 : : #include "utils/lsyscache.h"
21 : : #include "utils/syscache.h"
22 : :
23 : : /*
24 : : * Protocol message flags.
25 : : */
26 : : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 : :
28 : : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : : #define TRUNCATE_CASCADE (1<<0)
30 : : #define TRUNCATE_RESTART_SEQS (1<<1)
31 : :
32 : : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : : Bitmapset *columns,
34 : : PublishGencolsType include_gencols_type);
35 : : static void logicalrep_write_tuple(StringInfo out, Relation rel,
36 : : TupleTableSlot *slot,
37 : : bool binary, Bitmapset *columns,
38 : : PublishGencolsType include_gencols_type);
39 : : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
40 : : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
41 : :
42 : : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
43 : : static const char *logicalrep_read_namespace(StringInfo in);
44 : :
45 : : /*
46 : : * Write BEGIN to the output stream.
47 : : */
48 : : void
3254 peter_e@gmx.net 49 :CBC 450 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
50 : : {
1871 akapila@postgresql.o 51 : 450 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
52 : :
53 : : /* fixed fields */
3254 peter_e@gmx.net 54 : 450 : pq_sendint64(out, txn->final_lsn);
78 peter@eisentraut.org 55 :GNC 450 : pq_sendint64(out, txn->commit_time);
2989 andres@anarazel.de 56 :CBC 450 : pq_sendint32(out, txn->xid);
3254 peter_e@gmx.net 57 : 450 : }
58 : :
59 : : /*
60 : : * Read transaction BEGIN from the stream.
61 : : */
62 : : void
63 : 482 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
64 : : {
65 : : /* read fields */
66 : 482 : begin_data->final_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 67 [ - + ]:GNC 482 : if (!XLogRecPtrIsValid(begin_data->final_lsn))
3254 peter_e@gmx.net 68 [ # # ]:UBC 0 : elog(ERROR, "final_lsn not set in begin message");
3254 peter_e@gmx.net 69 :CBC 482 : begin_data->committime = pq_getmsgint64(in);
70 : 482 : begin_data->xid = pq_getmsgint(in, 4);
71 : 482 : }
72 : :
73 : :
74 : : /*
75 : : * Write COMMIT to the output stream.
76 : : */
77 : : void
78 : 449 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
79 : : XLogRecPtr commit_lsn)
80 : : {
3136 bruce@momjian.us 81 : 449 : uint8 flags = 0;
82 : :
1871 akapila@postgresql.o 83 : 449 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
84 : :
85 : : /* send the flags field (unused for now) */
3254 peter_e@gmx.net 86 : 449 : pq_sendbyte(out, flags);
87 : :
88 : : /* send fields */
89 : 449 : pq_sendint64(out, commit_lsn);
90 : 449 : pq_sendint64(out, txn->end_lsn);
78 peter@eisentraut.org 91 :GNC 449 : pq_sendint64(out, txn->commit_time);
3254 peter_e@gmx.net 92 :CBC 449 : }
93 : :
94 : : /*
95 : : * Read transaction COMMIT from the stream.
96 : : */
97 : : void
98 : 433 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
99 : : {
100 : : /* read flags (unused for now) */
3136 bruce@momjian.us 101 : 433 : uint8 flags = pq_getmsgbyte(in);
102 : :
3254 peter_e@gmx.net 103 [ - + ]: 433 : if (flags != 0)
3148 tgl@sss.pgh.pa.us 104 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
105 : :
106 : : /* read fields */
3254 peter_e@gmx.net 107 :CBC 433 : commit_data->commit_lsn = pq_getmsgint64(in);
108 : 433 : commit_data->end_lsn = pq_getmsgint64(in);
109 : 433 : commit_data->committime = pq_getmsgint64(in);
110 : 433 : }
111 : :
112 : : /*
113 : : * Write BEGIN PREPARE to the output stream.
114 : : */
115 : : void
1617 akapila@postgresql.o 116 : 20 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
117 : : {
118 : 20 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
119 : :
120 : : /* fixed fields */
121 : 20 : pq_sendint64(out, txn->final_lsn);
122 : 20 : pq_sendint64(out, txn->end_lsn);
78 peter@eisentraut.org 123 :GNC 20 : pq_sendint64(out, txn->prepare_time);
1617 akapila@postgresql.o 124 :CBC 20 : pq_sendint32(out, txn->xid);
125 : :
126 : : /* send gid */
127 : 20 : pq_sendstring(out, txn->gid);
128 : 20 : }
129 : :
130 : : /*
131 : : * Read transaction BEGIN PREPARE from the stream.
132 : : */
133 : : void
134 : 17 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
135 : : {
136 : : /* read fields */
137 : 17 : begin_data->prepare_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 138 [ - + ]:GNC 17 : if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
1617 akapila@postgresql.o 139 [ # # ]:UBC 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
1617 akapila@postgresql.o 140 :CBC 17 : begin_data->end_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 141 [ - + ]:GNC 17 : if (!XLogRecPtrIsValid(begin_data->end_lsn))
1617 akapila@postgresql.o 142 [ # # ]:UBC 0 : elog(ERROR, "end_lsn not set in begin prepare message");
1617 akapila@postgresql.o 143 :CBC 17 : begin_data->prepare_time = pq_getmsgint64(in);
144 : 17 : begin_data->xid = pq_getmsgint(in, 4);
145 : :
146 : : /* read gid (copy it into a pre-allocated buffer) */
1611 147 : 17 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
1617 148 : 17 : }
149 : :
150 : : /*
151 : : * The core functionality for logicalrep_write_prepare and
152 : : * logicalrep_write_stream_prepare.
153 : : */
154 : : static void
1602 155 : 31 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
156 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
157 : : {
1617 158 : 31 : uint8 flags = 0;
159 : :
1602 160 : 31 : pq_sendbyte(out, type);
161 : :
162 : : /*
163 : : * This should only ever happen for two-phase commit transactions, in
164 : : * which case we expect to have a valid GID.
165 : : */
1617 166 [ - + ]: 31 : Assert(txn->gid != NULL);
308 msawada@postgresql.o 167 [ - + ]: 31 : Assert(rbtxn_is_prepared(txn));
1602 akapila@postgresql.o 168 [ - + ]: 31 : Assert(TransactionIdIsValid(txn->xid));
169 : :
170 : : /* send the flags field */
1617 171 : 31 : pq_sendbyte(out, flags);
172 : :
173 : : /* send fields */
174 : 31 : pq_sendint64(out, prepare_lsn);
175 : 31 : pq_sendint64(out, txn->end_lsn);
78 peter@eisentraut.org 176 :GNC 31 : pq_sendint64(out, txn->prepare_time);
1617 akapila@postgresql.o 177 :CBC 31 : pq_sendint32(out, txn->xid);
178 : :
179 : : /* send gid */
180 : 31 : pq_sendstring(out, txn->gid);
181 : 31 : }
182 : :
183 : : /*
184 : : * Write PREPARE to the output stream.
185 : : */
186 : : void
1602 187 : 20 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
188 : : XLogRecPtr prepare_lsn)
189 : : {
190 : 20 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
191 : : txn, prepare_lsn);
192 : 20 : }
193 : :
194 : : /*
195 : : * The core functionality for logicalrep_read_prepare and
196 : : * logicalrep_read_stream_prepare.
197 : : */
198 : : static void
199 : 27 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
200 : : LogicalRepPreparedTxnData *prepare_data)
201 : : {
202 : : /* read flags */
1617 203 : 27 : uint8 flags = pq_getmsgbyte(in);
204 : :
205 [ - + ]: 27 : if (flags != 0)
1602 akapila@postgresql.o 206 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207 : :
208 : : /* read fields */
1617 akapila@postgresql.o 209 :CBC 27 : prepare_data->prepare_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 210 [ - + ]:GNC 27 : if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
1602 akapila@postgresql.o 211 [ # # ]:UBC 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
1617 akapila@postgresql.o 212 :CBC 27 : prepare_data->end_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 213 [ - + ]:GNC 27 : if (!XLogRecPtrIsValid(prepare_data->end_lsn))
1602 akapila@postgresql.o 214 [ # # ]:UBC 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
1617 akapila@postgresql.o 215 :CBC 27 : prepare_data->prepare_time = pq_getmsgint64(in);
216 : 27 : prepare_data->xid = pq_getmsgint(in, 4);
1596 217 [ - + ]: 27 : if (prepare_data->xid == InvalidTransactionId)
1596 akapila@postgresql.o 218 [ # # ]:UBC 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219 : :
220 : : /* read gid (copy it into a pre-allocated buffer) */
1611 akapila@postgresql.o 221 :CBC 27 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
1617 222 : 27 : }
223 : :
224 : : /*
225 : : * Read transaction PREPARE from the stream.
226 : : */
227 : : void
1602 228 : 16 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
229 : : {
230 : 16 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
231 : 16 : }
232 : :
233 : : /*
234 : : * Write COMMIT PREPARED to the output stream.
235 : : */
236 : : void
1617 237 : 24 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
238 : : XLogRecPtr commit_lsn)
239 : : {
240 : 24 : uint8 flags = 0;
241 : :
242 : 24 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
243 : :
244 : : /*
245 : : * This should only ever happen for two-phase commit transactions, in
246 : : * which case we expect to have a valid GID.
247 : : */
248 [ - + ]: 24 : Assert(txn->gid != NULL);
249 : :
250 : : /* send the flags field */
251 : 24 : pq_sendbyte(out, flags);
252 : :
253 : : /* send fields */
254 : 24 : pq_sendint64(out, commit_lsn);
255 : 24 : pq_sendint64(out, txn->end_lsn);
78 peter@eisentraut.org 256 :GNC 24 : pq_sendint64(out, txn->commit_time);
1617 akapila@postgresql.o 257 :CBC 24 : pq_sendint32(out, txn->xid);
258 : :
259 : : /* send gid */
260 : 24 : pq_sendstring(out, txn->gid);
261 : 24 : }
262 : :
263 : : /*
264 : : * Read transaction COMMIT PREPARED from the stream.
265 : : */
266 : : void
267 : 21 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
268 : : {
269 : : /* read flags */
270 : 21 : uint8 flags = pq_getmsgbyte(in);
271 : :
272 [ - + ]: 21 : if (flags != 0)
1617 akapila@postgresql.o 273 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274 : :
275 : : /* read fields */
1617 akapila@postgresql.o 276 :CBC 21 : prepare_data->commit_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 277 [ - + ]:GNC 21 : if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
1617 akapila@postgresql.o 278 [ # # ]:UBC 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
1617 akapila@postgresql.o 279 :CBC 21 : prepare_data->end_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 280 [ - + ]:GNC 21 : if (!XLogRecPtrIsValid(prepare_data->end_lsn))
1617 akapila@postgresql.o 281 [ # # ]:UBC 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
1617 akapila@postgresql.o 282 :CBC 21 : prepare_data->commit_time = pq_getmsgint64(in);
283 : 21 : prepare_data->xid = pq_getmsgint(in, 4);
284 : :
285 : : /* read gid (copy it into a pre-allocated buffer) */
1611 286 : 21 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
1617 287 : 21 : }
288 : :
289 : : /*
290 : : * Write ROLLBACK PREPARED to the output stream.
291 : : */
292 : : void
293 : 7 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
294 : : XLogRecPtr prepare_end_lsn,
295 : : TimestampTz prepare_time)
296 : : {
297 : 7 : uint8 flags = 0;
298 : :
299 : 7 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
300 : :
301 : : /*
302 : : * This should only ever happen for two-phase commit transactions, in
303 : : * which case we expect to have a valid GID.
304 : : */
305 [ - + ]: 7 : Assert(txn->gid != NULL);
306 : :
307 : : /* send the flags field */
308 : 7 : pq_sendbyte(out, flags);
309 : :
310 : : /* send fields */
311 : 7 : pq_sendint64(out, prepare_end_lsn);
312 : 7 : pq_sendint64(out, txn->end_lsn);
313 : 7 : pq_sendint64(out, prepare_time);
78 peter@eisentraut.org 314 :GNC 7 : pq_sendint64(out, txn->commit_time);
1617 akapila@postgresql.o 315 :CBC 7 : pq_sendint32(out, txn->xid);
316 : :
317 : : /* send gid */
318 : 7 : pq_sendstring(out, txn->gid);
319 : 7 : }
320 : :
321 : : /*
322 : : * Read transaction ROLLBACK PREPARED from the stream.
323 : : */
324 : : void
325 : 5 : logicalrep_read_rollback_prepared(StringInfo in,
326 : : LogicalRepRollbackPreparedTxnData *rollback_data)
327 : : {
328 : : /* read flags */
329 : 5 : uint8 flags = pq_getmsgbyte(in);
330 : :
331 [ - + ]: 5 : if (flags != 0)
1617 akapila@postgresql.o 332 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333 : :
334 : : /* read fields */
1617 akapila@postgresql.o 335 :CBC 5 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 336 [ - + ]:GNC 5 : if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
1617 akapila@postgresql.o 337 [ # # ]:UBC 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
1617 akapila@postgresql.o 338 :CBC 5 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
41 alvherre@kurilemu.de 339 [ - + ]:GNC 5 : if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
1617 akapila@postgresql.o 340 [ # # ]:UBC 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
1617 akapila@postgresql.o 341 :CBC 5 : rollback_data->prepare_time = pq_getmsgint64(in);
342 : 5 : rollback_data->rollback_time = pq_getmsgint64(in);
343 : 5 : rollback_data->xid = pq_getmsgint(in, 4);
344 : :
345 : : /* read gid (copy it into a pre-allocated buffer) */
1611 346 : 5 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
1617 347 : 5 : }
348 : :
349 : : /*
350 : : * Write STREAM PREPARE to the output stream.
351 : : */
352 : : void
1596 353 : 11 : logicalrep_write_stream_prepare(StringInfo out,
354 : : ReorderBufferTXN *txn,
355 : : XLogRecPtr prepare_lsn)
356 : : {
357 : 11 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
358 : : txn, prepare_lsn);
359 : 11 : }
360 : :
361 : : /*
362 : : * Read STREAM PREPARE from the stream.
363 : : */
364 : : void
365 : 11 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
366 : : {
367 : 11 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368 : 11 : }
369 : :
370 : : /*
371 : : * Write ORIGIN to the output stream.
372 : : */
373 : : void
3254 peter_e@gmx.net 374 : 8 : logicalrep_write_origin(StringInfo out, const char *origin,
375 : : XLogRecPtr origin_lsn)
376 : : {
1871 akapila@postgresql.o 377 : 8 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
378 : :
379 : : /* fixed fields */
3254 peter_e@gmx.net 380 : 8 : pq_sendint64(out, origin_lsn);
381 : :
382 : : /* origin string */
383 : 8 : pq_sendstring(out, origin);
384 : 8 : }
385 : :
386 : : /*
387 : : * Read ORIGIN from the output stream.
388 : : */
389 : : char *
3254 peter_e@gmx.net 390 :UBC 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
391 : : {
392 : : /* fixed fields */
393 : 0 : *origin_lsn = pq_getmsgint64(in);
394 : :
395 : : /* return origin */
396 : 0 : return pstrdup(pq_getmsgstring(in));
397 : : }
398 : :
399 : : /*
400 : : * Write INSERT to the output stream.
401 : : */
402 : : void
1931 akapila@postgresql.o 403 :CBC 105918 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
404 : : TupleTableSlot *newslot, bool binary,
405 : : Bitmapset *columns,
406 : : PublishGencolsType include_gencols_type)
407 : : {
1871 408 : 105918 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
409 : :
410 : : /* transaction ID (if not valid, we're not streaming) */
1931 411 [ + + ]: 105918 : if (TransactionIdIsValid(xid))
412 : 100075 : pq_sendint32(out, xid);
413 : :
414 : : /* use Oid as relation identifier */
2989 andres@anarazel.de 415 : 105918 : pq_sendint32(out, RelationGetRelid(rel));
416 : :
3254 peter_e@gmx.net 417 : 105918 : pq_sendbyte(out, 'N'); /* new tuple follows */
328 akapila@postgresql.o 418 : 105918 : logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 : : include_gencols_type);
3254 peter_e@gmx.net 420 : 105918 : }
421 : :
422 : : /*
423 : : * Read INSERT from stream.
424 : : *
425 : : * Fills the new tuple.
426 : : */
427 : : LogicalRepRelId
428 : 75847 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
429 : : {
430 : : char action;
431 : : LogicalRepRelId relid;
432 : :
433 : : /* read the relation id */
434 : 75847 : relid = pq_getmsgint(in, 4);
435 : :
436 : 75847 : action = pq_getmsgbyte(in);
437 [ - + ]: 75847 : if (action != 'N')
3254 peter_e@gmx.net 438 [ # # ]:UBC 0 : elog(ERROR, "expected new tuple but got %d",
439 : : action);
440 : :
3254 peter_e@gmx.net 441 :CBC 75847 : logicalrep_read_tuple(in, newtup);
442 : :
443 : 75847 : return relid;
444 : : }
445 : :
446 : : /*
447 : : * Write UPDATE to the output stream.
448 : : */
449 : : void
1931 akapila@postgresql.o 450 : 34440 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
451 : : TupleTableSlot *oldslot, TupleTableSlot *newslot,
452 : : bool binary, Bitmapset *columns,
453 : : PublishGencolsType include_gencols_type)
454 : : {
1871 455 : 34440 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
456 : :
3254 peter_e@gmx.net 457 [ + + + + : 34440 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- + ]
458 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460 : :
461 : : /* transaction ID (if not valid, we're not streaming) */
1931 akapila@postgresql.o 462 [ + + ]: 34440 : if (TransactionIdIsValid(xid))
463 : 34226 : pq_sendint32(out, xid);
464 : :
465 : : /* use Oid as relation identifier */
2989 andres@anarazel.de 466 : 34440 : pq_sendint32(out, RelationGetRelid(rel));
467 : :
1394 akapila@postgresql.o 468 [ + + ]: 34440 : if (oldslot != NULL)
469 : : {
3254 peter_e@gmx.net 470 [ + + ]: 133 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
3101 tgl@sss.pgh.pa.us 471 : 63 : pq_sendbyte(out, 'O'); /* old tuple follows */
472 : : else
473 : 70 : pq_sendbyte(out, 'K'); /* old key follows */
405 akapila@postgresql.o 474 : 133 : logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 : : include_gencols_type);
476 : : }
477 : :
3254 peter_e@gmx.net 478 : 34440 : pq_sendbyte(out, 'N'); /* new tuple follows */
328 akapila@postgresql.o 479 : 34440 : logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 : : include_gencols_type);
3254 peter_e@gmx.net 481 : 34440 : }
482 : :
483 : : /*
484 : : * Read UPDATE from stream.
485 : : */
486 : : LogicalRepRelId
487 : 31940 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
488 : : LogicalRepTupleData *oldtup,
489 : : LogicalRepTupleData *newtup)
490 : : {
491 : : char action;
492 : : LogicalRepRelId relid;
493 : :
494 : : /* read the relation id */
495 : 31940 : relid = pq_getmsgint(in, 4);
496 : :
497 : : /* read and verify action */
498 : 31940 : action = pq_getmsgbyte(in);
499 [ + + + + : 31940 : if (action != 'K' && action != 'O' && action != 'N')
- + ]
3254 peter_e@gmx.net 500 [ # # ]:UBC 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 : : action);
502 : :
503 : : /* check for old tuple */
3254 peter_e@gmx.net 504 [ + + + + ]:CBC 31940 : if (action == 'K' || action == 'O')
505 : : {
506 : 135 : logicalrep_read_tuple(in, oldtup);
507 : 135 : *has_oldtuple = true;
508 : :
509 : 135 : action = pq_getmsgbyte(in);
510 : : }
511 : : else
512 : 31805 : *has_oldtuple = false;
513 : :
514 : : /* check for new tuple */
515 [ - + ]: 31940 : if (action != 'N')
3254 peter_e@gmx.net 516 [ # # ]:UBC 0 : elog(ERROR, "expected action 'N', got %c",
517 : : action);
518 : :
3254 peter_e@gmx.net 519 :CBC 31940 : logicalrep_read_tuple(in, newtup);
520 : :
521 : 31940 : return relid;
522 : : }
523 : :
524 : : /*
525 : : * Write DELETE to the output stream.
526 : : */
527 : : void
1931 akapila@postgresql.o 528 : 41880 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
529 : : TupleTableSlot *oldslot, bool binary,
530 : : Bitmapset *columns,
531 : : PublishGencolsType include_gencols_type)
532 : : {
3254 peter_e@gmx.net 533 [ + + + + : 41880 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- + ]
534 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536 : :
1871 akapila@postgresql.o 537 : 41880 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
538 : :
539 : : /* transaction ID (if not valid, we're not streaming) */
1931 540 [ + + ]: 41880 : if (TransactionIdIsValid(xid))
541 : 41620 : pq_sendint32(out, xid);
542 : :
543 : : /* use Oid as relation identifier */
2989 andres@anarazel.de 544 : 41880 : pq_sendint32(out, RelationGetRelid(rel));
545 : :
3254 peter_e@gmx.net 546 [ + + ]: 41880 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 : 128 : pq_sendbyte(out, 'O'); /* old tuple follows */
548 : : else
549 : 41752 : pq_sendbyte(out, 'K'); /* old key follows */
550 : :
328 akapila@postgresql.o 551 : 41880 : logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 : : include_gencols_type);
3254 peter_e@gmx.net 553 : 41880 : }
554 : :
555 : : /*
556 : : * Read DELETE from stream.
557 : : *
558 : : * Fills the old tuple.
559 : : */
560 : : LogicalRepRelId
561 : 40320 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
562 : : {
563 : : char action;
564 : : LogicalRepRelId relid;
565 : :
566 : : /* read the relation id */
567 : 40320 : relid = pq_getmsgint(in, 4);
568 : :
569 : : /* read and verify action */
570 : 40320 : action = pq_getmsgbyte(in);
571 [ + + - + ]: 40320 : if (action != 'K' && action != 'O')
3254 peter_e@gmx.net 572 [ # # ]:UBC 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
573 : :
3254 peter_e@gmx.net 574 :CBC 40320 : logicalrep_read_tuple(in, oldtup);
575 : :
576 : 40320 : return relid;
577 : : }
578 : :
579 : : /*
580 : : * Write TRUNCATE to the output stream.
581 : : */
582 : : void
2811 583 : 11 : logicalrep_write_truncate(StringInfo out,
584 : : TransactionId xid,
585 : : int nrelids,
586 : : Oid relids[],
587 : : bool cascade, bool restart_seqs)
588 : : {
589 : : int i;
2792 tgl@sss.pgh.pa.us 590 : 11 : uint8 flags = 0;
591 : :
1871 akapila@postgresql.o 592 : 11 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
593 : :
594 : : /* transaction ID (if not valid, we're not streaming) */
1931 595 [ - + ]: 11 : if (TransactionIdIsValid(xid))
1931 akapila@postgresql.o 596 :UBC 0 : pq_sendint32(out, xid);
597 : :
2811 peter_e@gmx.net 598 :CBC 11 : pq_sendint32(out, nrelids);
599 : :
600 : : /* encode and send truncate flags */
601 [ - + ]: 11 : if (cascade)
2811 peter_e@gmx.net 602 :UBC 0 : flags |= TRUNCATE_CASCADE;
2811 peter_e@gmx.net 603 [ - + ]:CBC 11 : if (restart_seqs)
2811 peter_e@gmx.net 604 :UBC 0 : flags |= TRUNCATE_RESTART_SEQS;
2811 peter_e@gmx.net 605 :CBC 11 : pq_sendint8(out, flags);
606 : :
607 [ + + ]: 28 : for (i = 0; i < nrelids; i++)
608 : 17 : pq_sendint32(out, relids[i]);
609 : 11 : }
610 : :
611 : : /*
612 : : * Read TRUNCATE from stream.
613 : : */
614 : : List *
615 : 19 : logicalrep_read_truncate(StringInfo in,
616 : : bool *cascade, bool *restart_seqs)
617 : : {
618 : : int i;
619 : : int nrelids;
620 : 19 : List *relids = NIL;
621 : : uint8 flags;
622 : :
623 : 19 : nrelids = pq_getmsgint(in, 4);
624 : :
625 : : /* read and decode truncate flags */
626 : 19 : flags = pq_getmsgint(in, 1);
627 : 19 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 : 19 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629 : :
630 [ + + ]: 47 : for (i = 0; i < nrelids; i++)
631 : 28 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
632 : :
633 : 19 : return relids;
634 : : }
635 : :
636 : : /*
637 : : * Write MESSAGE to stream
638 : : */
639 : : void
1716 akapila@postgresql.o 640 : 5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
641 : : bool transactional, const char *prefix, Size sz,
642 : : const char *message)
643 : : {
644 : 5 : uint8 flags = 0;
645 : :
646 : 5 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
647 : :
648 : : /* encode and send message flags */
649 [ + + ]: 5 : if (transactional)
650 : 2 : flags |= MESSAGE_TRANSACTIONAL;
651 : :
652 : : /* transaction ID (if not valid, we're not streaming) */
653 [ - + ]: 5 : if (TransactionIdIsValid(xid))
1716 akapila@postgresql.o 654 :UBC 0 : pq_sendint32(out, xid);
655 : :
1716 akapila@postgresql.o 656 :CBC 5 : pq_sendint8(out, flags);
657 : 5 : pq_sendint64(out, lsn);
658 : 5 : pq_sendstring(out, prefix);
659 : 5 : pq_sendint32(out, sz);
660 : 5 : pq_sendbytes(out, message, sz);
661 : 5 : }
662 : :
663 : : /*
664 : : * Write relation description to the output stream.
665 : : */
666 : : void
1362 tomas.vondra@postgre 667 : 369 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
668 : : Bitmapset *columns,
669 : : PublishGencolsType include_gencols_type)
670 : : {
671 : : char *relname;
672 : :
1871 akapila@postgresql.o 673 : 369 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
674 : :
675 : : /* transaction ID (if not valid, we're not streaming) */
1931 676 [ + + ]: 369 : if (TransactionIdIsValid(xid))
677 : 68 : pq_sendint32(out, xid);
678 : :
679 : : /* use Oid as relation identifier */
2989 andres@anarazel.de 680 : 369 : pq_sendint32(out, RelationGetRelid(rel));
681 : :
682 : : /* send qualified relation name */
3254 peter_e@gmx.net 683 : 369 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
684 : 369 : relname = RelationGetRelationName(rel);
685 : 369 : pq_sendstring(out, relname);
686 : :
687 : : /* send replica identity */
688 : 369 : pq_sendbyte(out, rel->rd_rel->relreplident);
689 : :
690 : : /* send the attribute info */
328 akapila@postgresql.o 691 : 369 : logicalrep_write_attrs(out, rel, columns, include_gencols_type);
3254 peter_e@gmx.net 692 : 369 : }
693 : :
694 : : /*
695 : : * Read the relation info from stream and return as LogicalRepRelation.
696 : : */
697 : : LogicalRepRelation *
698 : 427 : logicalrep_read_rel(StringInfo in)
699 : : {
7 michael@paquier.xyz 700 :GNC 427 : LogicalRepRelation *rel = palloc_object(LogicalRepRelation);
701 : :
3254 peter_e@gmx.net 702 :CBC 427 : rel->remoteid = pq_getmsgint(in, 4);
703 : :
704 : : /* Read relation name from stream */
705 : 427 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
706 : 427 : rel->relname = pstrdup(pq_getmsgstring(in));
707 : :
708 : : /* Read the replica identity. */
709 : 427 : rel->replident = pq_getmsgbyte(in);
710 : :
711 : : /* relkind is not sent */
55 akapila@postgresql.o 712 :GNC 427 : rel->relkind = 0;
713 : :
714 : : /* Get attribute description */
3254 peter_e@gmx.net 715 :CBC 427 : logicalrep_read_attrs(in, rel);
716 : :
717 : 427 : return rel;
718 : : }
719 : :
720 : : /*
721 : : * Write type info to the output stream.
722 : : *
723 : : * This function will always write base type info.
724 : : */
725 : : void
1931 akapila@postgresql.o 726 : 18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
727 : : {
3254 peter_e@gmx.net 728 : 18 : Oid basetypoid = getBaseType(typoid);
729 : : HeapTuple tup;
730 : : Form_pg_type typtup;
731 : :
1871 akapila@postgresql.o 732 : 18 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
733 : :
734 : : /* transaction ID (if not valid, we're not streaming) */
1931 735 [ - + ]: 18 : if (TransactionIdIsValid(xid))
1931 akapila@postgresql.o 736 :UBC 0 : pq_sendint32(out, xid);
737 : :
3254 peter_e@gmx.net 738 :CBC 18 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
739 [ - + ]: 18 : if (!HeapTupleIsValid(tup))
3254 peter_e@gmx.net 740 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
3254 peter_e@gmx.net 741 :CBC 18 : typtup = (Form_pg_type) GETSTRUCT(tup);
742 : :
743 : : /* use Oid as type identifier */
2989 andres@anarazel.de 744 : 18 : pq_sendint32(out, typoid);
745 : :
746 : : /* send qualified type name */
3254 peter_e@gmx.net 747 : 18 : logicalrep_write_namespace(out, typtup->typnamespace);
748 : 18 : pq_sendstring(out, NameStr(typtup->typname));
749 : :
750 : 18 : ReleaseSysCache(tup);
751 : 18 : }
752 : :
753 : : /*
754 : : * Read type info from the output stream.
755 : : */
756 : : void
757 : 18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
758 : : {
759 : 18 : ltyp->remoteid = pq_getmsgint(in, 4);
760 : :
761 : : /* Read type name from stream */
762 : 18 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
763 : 18 : ltyp->typname = pstrdup(pq_getmsgstring(in));
764 : 18 : }
765 : :
766 : : /*
767 : : * Write a tuple to the outputstream, in the most efficient format possible.
768 : : */
769 : : static void
1394 akapila@postgresql.o 770 : 182371 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
771 : : bool binary, Bitmapset *columns,
772 : : PublishGencolsType include_gencols_type)
773 : : {
774 : : TupleDesc desc;
775 : : Datum *values;
776 : : bool *isnull;
777 : : int i;
3254 peter_e@gmx.net 778 : 182371 : uint16 nliveatts = 0;
779 : :
780 : 182371 : desc = RelationGetDescr(rel);
781 : :
782 [ + + ]: 548600 : for (i = 0; i < desc->natts; i++)
783 : : {
1362 tomas.vondra@postgre 784 : 366229 : Form_pg_attribute att = TupleDescAttr(desc, i);
785 : :
328 akapila@postgresql.o 786 [ + + ]: 366229 : if (!logicalrep_should_publish_column(att, columns,
787 : : include_gencols_type))
3254 peter_e@gmx.net 788 : 141 : continue;
789 : :
790 : 366088 : nliveatts++;
791 : : }
2989 andres@anarazel.de 792 : 182371 : pq_sendint16(out, nliveatts);
793 : :
1394 akapila@postgresql.o 794 : 182371 : slot_getallattrs(slot);
795 : 182371 : values = slot->tts_values;
796 : 182371 : isnull = slot->tts_isnull;
797 : :
798 : : /* Write the values */
3254 peter_e@gmx.net 799 [ + + ]: 548600 : for (i = 0; i < desc->natts; i++)
800 : : {
801 : : HeapTuple typtup;
802 : : Form_pg_type typclass;
3041 andres@anarazel.de 803 : 366229 : Form_pg_attribute att = TupleDescAttr(desc, i);
804 : :
328 akapila@postgresql.o 805 [ + + ]: 366229 : if (!logicalrep_should_publish_column(att, columns,
806 : : include_gencols_type))
1362 tomas.vondra@postgre 807 : 141 : continue;
808 : :
3254 peter_e@gmx.net 809 [ + + ]: 366088 : if (isnull[i])
810 : : {
1978 tgl@sss.pgh.pa.us 811 : 51920 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
3254 peter_e@gmx.net 812 : 51920 : continue;
813 : : }
814 : :
134 peter@eisentraut.org 815 [ + + + + ]:GNC 314168 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
816 : : {
817 : : /*
818 : : * Unchanged toasted datum. (Note that we don't promise to detect
819 : : * unchanged data in general; this is just a cheap check to avoid
820 : : * sending large values unnecessarily.)
821 : : */
1978 tgl@sss.pgh.pa.us 822 :CBC 3 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
3254 peter_e@gmx.net 823 : 3 : continue;
824 : : }
825 : :
826 : 314165 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
827 [ - + ]: 314165 : if (!HeapTupleIsValid(typtup))
3254 peter_e@gmx.net 828 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
3254 peter_e@gmx.net 829 :CBC 314165 : typclass = (Form_pg_type) GETSTRUCT(typtup);
830 : :
831 : : /*
832 : : * Send in binary if requested and type has suitable send function.
833 : : */
1975 tgl@sss.pgh.pa.us 834 [ + + + + ]: 314165 : if (binary && OidIsValid(typclass->typsend))
1978 835 : 115043 : {
836 : : bytea *outputbytes;
837 : : int len;
838 : :
839 : 115043 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
840 : 115043 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
841 : 115043 : len = VARSIZE(outputbytes) - VARHDRSZ;
842 : 115043 : pq_sendint(out, len, 4); /* length */
843 : 115043 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
844 : 115043 : pfree(outputbytes);
845 : : }
846 : : else
847 : : {
848 : : char *outputstr;
849 : :
850 : 199122 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
851 : 199122 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
653 heikki.linnakangas@i 852 : 199122 : pq_sendcountedtext(out, outputstr, strlen(outputstr));
1978 tgl@sss.pgh.pa.us 853 : 199122 : pfree(outputstr);
854 : : }
855 : :
3254 peter_e@gmx.net 856 : 314165 : ReleaseSysCache(typtup);
857 : : }
858 : 182371 : }
859 : :
860 : : /*
861 : : * Read tuple in logical replication format from stream.
862 : : */
863 : : static void
864 : 148242 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
865 : : {
866 : : int i;
867 : : int natts;
868 : :
869 : : /* Get number of attributes */
870 : 148242 : natts = pq_getmsgint(in, 2);
871 : :
872 : : /* Allocate space for per-column values; zero out unused StringInfoDatas */
1978 tgl@sss.pgh.pa.us 873 : 148242 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
7 michael@paquier.xyz 874 :GNC 148242 : tuple->colstatus = palloc_array(char, natts);
1976 tgl@sss.pgh.pa.us 875 :CBC 148242 : tuple->ncols = natts;
876 : :
877 : : /* Read the data */
3254 peter_e@gmx.net 878 [ + + ]: 451163 : for (i = 0; i < natts; i++)
879 : : {
880 : : char *buff;
881 : : char kind;
882 : : int len;
1978 tgl@sss.pgh.pa.us 883 : 302921 : StringInfo value = &tuple->colvalues[i];
884 : :
3254 peter_e@gmx.net 885 : 302921 : kind = pq_getmsgbyte(in);
1978 tgl@sss.pgh.pa.us 886 : 302921 : tuple->colstatus[i] = kind;
887 : :
3254 peter_e@gmx.net 888 [ + + + - ]: 302921 : switch (kind)
889 : : {
1978 tgl@sss.pgh.pa.us 890 : 50371 : case LOGICALREP_COLUMN_NULL:
891 : : /* nothing more to do */
3254 peter_e@gmx.net 892 : 50371 : break;
1978 tgl@sss.pgh.pa.us 893 : 3 : case LOGICALREP_COLUMN_UNCHANGED:
894 : : /* we don't receive the value of an unchanged column */
3254 peter_e@gmx.net 895 : 3 : break;
1978 tgl@sss.pgh.pa.us 896 : 252547 : case LOGICALREP_COLUMN_TEXT:
897 : : case LOGICALREP_COLUMN_BINARY:
898 : 252547 : len = pq_getmsgint(in, 4); /* read length */
899 : :
900 : : /* and data */
783 drowley@postgresql.o 901 : 252547 : buff = palloc(len + 1);
902 : 252547 : pq_copymsgbytes(in, buff, len);
903 : :
904 : : /*
905 : : * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
906 : : * as input functions require that. For
907 : : * LOGICALREP_COLUMN_BINARY it's not technically required, but
908 : : * it's harmless.
909 : : */
910 : 252547 : buff[len] = '\0';
911 : :
912 : 252547 : initStringInfoFromString(value, buff, len);
3254 peter_e@gmx.net 913 : 252547 : break;
3254 peter_e@gmx.net 914 :UBC 0 : default:
3148 tgl@sss.pgh.pa.us 915 [ # # ]: 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
916 : : }
917 : : }
3254 peter_e@gmx.net 918 :CBC 148242 : }
919 : :
920 : : /*
921 : : * Write relation attribute metadata to the stream.
922 : : */
923 : : static void
405 akapila@postgresql.o 924 : 369 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
925 : : PublishGencolsType include_gencols_type)
926 : : {
927 : : TupleDesc desc;
928 : : int i;
3254 peter_e@gmx.net 929 : 369 : uint16 nliveatts = 0;
930 : 369 : Bitmapset *idattrs = NULL;
931 : : bool replidentfull;
932 : :
933 : 369 : desc = RelationGetDescr(rel);
934 : :
935 : : /* send number of live attributes */
936 [ + + ]: 1137 : for (i = 0; i < desc->natts; i++)
937 : : {
1362 tomas.vondra@postgre 938 : 768 : Form_pg_attribute att = TupleDescAttr(desc, i);
939 : :
328 akapila@postgresql.o 940 [ + + ]: 768 : if (!logicalrep_should_publish_column(att, columns,
941 : : include_gencols_type))
1362 tomas.vondra@postgre 942 : 71 : continue;
943 : :
3254 peter_e@gmx.net 944 : 697 : nliveatts++;
945 : : }
2989 andres@anarazel.de 946 : 369 : pq_sendint16(out, nliveatts);
947 : :
948 : : /* fetch bitmap of REPLICATION IDENTITY attributes */
3254 peter_e@gmx.net 949 : 369 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
950 [ + + ]: 369 : if (!replidentfull)
1695 akapila@postgresql.o 951 : 313 : idattrs = RelationGetIdentityKeyBitmap(rel);
952 : :
953 : : /* send the attributes */
3254 peter_e@gmx.net 954 [ + + ]: 1137 : for (i = 0; i < desc->natts; i++)
955 : : {
3041 andres@anarazel.de 956 : 768 : Form_pg_attribute att = TupleDescAttr(desc, i);
3136 bruce@momjian.us 957 : 768 : uint8 flags = 0;
958 : :
328 akapila@postgresql.o 959 [ + + ]: 768 : if (!logicalrep_should_publish_column(att, columns,
960 : : include_gencols_type))
1362 tomas.vondra@postgre 961 : 71 : continue;
962 : :
963 : : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
3254 peter_e@gmx.net 964 [ + + + + ]: 1311 : if (replidentfull ||
965 : 614 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
966 : : idattrs))
967 : 336 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
968 : :
969 : 697 : pq_sendbyte(out, flags);
970 : :
971 : : /* attribute name */
972 : 697 : pq_sendstring(out, NameStr(att->attname));
973 : :
974 : : /* attribute type id */
2989 andres@anarazel.de 975 : 697 : pq_sendint32(out, (int) att->atttypid);
976 : :
977 : : /* attribute mode */
978 : 697 : pq_sendint32(out, att->atttypmod);
979 : : }
980 : :
3254 peter_e@gmx.net 981 : 369 : bms_free(idattrs);
982 : 369 : }
983 : :
984 : : /*
985 : : * Read relation attribute metadata from the stream.
986 : : */
987 : : static void
988 : 427 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
989 : : {
990 : : int i;
991 : : int natts;
992 : : char **attnames;
993 : : Oid *atttyps;
994 : 427 : Bitmapset *attkeys = NULL;
995 : :
996 : 427 : natts = pq_getmsgint(in, 2);
7 michael@paquier.xyz 997 :GNC 427 : attnames = palloc_array(char *, natts);
998 : 427 : atttyps = palloc_array(Oid, natts);
999 : :
1000 : : /* read the attributes */
3254 peter_e@gmx.net 1001 [ + + ]:CBC 1227 : for (i = 0; i < natts; i++)
1002 : : {
1003 : : uint8 flags;
1004 : :
1005 : : /* Check for replica identity column */
1006 : 800 : flags = pq_getmsgbyte(in);
1007 [ + + ]: 800 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1008 : 378 : attkeys = bms_add_member(attkeys, i);
1009 : :
1010 : : /* attribute name */
1011 : 800 : attnames[i] = pstrdup(pq_getmsgstring(in));
1012 : :
1013 : : /* attribute type id */
1014 : 800 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
1015 : :
1016 : : /* we ignore attribute mode for now */
1017 : 800 : (void) pq_getmsgint(in, 4);
1018 : : }
1019 : :
1020 : 427 : rel->attnames = attnames;
1021 : 427 : rel->atttyps = atttyps;
1022 : 427 : rel->attkeys = attkeys;
1023 : 427 : rel->natts = natts;
1024 : 427 : }
1025 : :
1026 : : /*
1027 : : * Write the namespace name or empty string for pg_catalog (to save space).
1028 : : */
1029 : : static void
1030 : 387 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1031 : : {
1032 [ + + ]: 387 : if (nspid == PG_CATALOG_NAMESPACE)
1033 : 1 : pq_sendbyte(out, '\0');
1034 : : else
1035 : : {
3136 bruce@momjian.us 1036 : 386 : char *nspname = get_namespace_name(nspid);
1037 : :
3254 peter_e@gmx.net 1038 [ - + ]: 386 : if (nspname == NULL)
3254 peter_e@gmx.net 1039 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for namespace %u",
1040 : : nspid);
1041 : :
3254 peter_e@gmx.net 1042 :CBC 386 : pq_sendstring(out, nspname);
1043 : : }
1044 : 387 : }
1045 : :
1046 : : /*
1047 : : * Read the namespace name while treating empty string as pg_catalog.
1048 : : */
1049 : : static const char *
1050 : 445 : logicalrep_read_namespace(StringInfo in)
1051 : : {
1052 : 445 : const char *nspname = pq_getmsgstring(in);
1053 : :
1054 [ + + ]: 445 : if (nspname[0] == '\0')
1055 : 1 : nspname = "pg_catalog";
1056 : :
1057 : 445 : return nspname;
1058 : : }
1059 : :
1060 : : /*
1061 : : * Write the information for the start stream message to the output stream.
1062 : : */
1063 : : void
1931 akapila@postgresql.o 1064 : 617 : logicalrep_write_stream_start(StringInfo out,
1065 : : TransactionId xid, bool first_segment)
1066 : : {
1871 1067 : 617 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1068 : :
1931 1069 [ - + ]: 617 : Assert(TransactionIdIsValid(xid));
1070 : :
1071 : : /* transaction ID (we're starting to stream, so must be valid) */
1072 : 617 : pq_sendint32(out, xid);
1073 : :
1074 : : /* 1 if this is the first streaming segment for this xid */
1075 : 617 : pq_sendbyte(out, first_segment ? 1 : 0);
1076 : 617 : }
1077 : :
1078 : : /*
1079 : : * Read the information about the start stream message from output stream.
1080 : : */
1081 : : TransactionId
1082 : 838 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1083 : : {
1084 : : TransactionId xid;
1085 : :
1086 [ - + ]: 838 : Assert(first_segment);
1087 : :
1088 : 838 : xid = pq_getmsgint(in, 4);
1089 : 838 : *first_segment = (pq_getmsgbyte(in) == 1);
1090 : :
1091 : 838 : return xid;
1092 : : }
1093 : :
1094 : : /*
1095 : : * Write the stop stream message to the output stream.
1096 : : */
1097 : : void
1098 : 617 : logicalrep_write_stream_stop(StringInfo out)
1099 : : {
1581 1100 : 617 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1931 1101 : 617 : }
1102 : :
1103 : : /*
1104 : : * Write STREAM COMMIT to the output stream.
1105 : : */
1106 : : void
1107 : 45 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1108 : : XLogRecPtr commit_lsn)
1109 : : {
1110 : 45 : uint8 flags = 0;
1111 : :
1871 1112 : 45 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1113 : :
1931 1114 [ - + ]: 45 : Assert(TransactionIdIsValid(txn->xid));
1115 : :
1116 : : /* transaction ID */
1117 : 45 : pq_sendint32(out, txn->xid);
1118 : :
1119 : : /* send the flags field (unused for now) */
1120 : 45 : pq_sendbyte(out, flags);
1121 : :
1122 : : /* send fields */
1123 : 45 : pq_sendint64(out, commit_lsn);
1124 : 45 : pq_sendint64(out, txn->end_lsn);
78 peter@eisentraut.org 1125 :GNC 45 : pq_sendint64(out, txn->commit_time);
1931 akapila@postgresql.o 1126 :CBC 45 : }
1127 : :
1128 : : /*
1129 : : * Read STREAM COMMIT from the output stream.
1130 : : */
1131 : : TransactionId
1132 : 61 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1133 : : {
1134 : : TransactionId xid;
1135 : : uint8 flags;
1136 : :
1137 : 61 : xid = pq_getmsgint(in, 4);
1138 : :
1139 : : /* read flags (unused for now) */
1140 : 61 : flags = pq_getmsgbyte(in);
1141 : :
1142 [ - + ]: 61 : if (flags != 0)
1931 akapila@postgresql.o 1143 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
1144 : :
1145 : : /* read fields */
1931 akapila@postgresql.o 1146 :CBC 61 : commit_data->commit_lsn = pq_getmsgint64(in);
1147 : 61 : commit_data->end_lsn = pq_getmsgint64(in);
1148 : 61 : commit_data->committime = pq_getmsgint64(in);
1149 : :
1150 : 61 : return xid;
1151 : : }
1152 : :
1153 : : /*
1154 : : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1155 : : * same for the top-level transaction abort.
1156 : : *
1157 : : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1158 : : * otherwise don't.
1159 : : */
1160 : : void
1161 : 26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1162 : : TransactionId subxid, XLogRecPtr abort_lsn,
1163 : : TimestampTz abort_time, bool write_abort_info)
1164 : : {
1871 1165 : 26 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1166 : :
1931 1167 [ + - - + ]: 26 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1168 : :
1169 : : /* transaction ID */
1170 : 26 : pq_sendint32(out, xid);
1171 : 26 : pq_sendint32(out, subxid);
1172 : :
1073 1173 [ + + ]: 26 : if (write_abort_info)
1174 : : {
1175 : 12 : pq_sendint64(out, abort_lsn);
1176 : 12 : pq_sendint64(out, abort_time);
1177 : : }
1931 1178 : 26 : }
1179 : :
1180 : : /*
1181 : : * Read STREAM ABORT from the output stream.
1182 : : *
1183 : : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1184 : : * otherwise don't.
1185 : : */
1186 : : void
1073 1187 : 38 : logicalrep_read_stream_abort(StringInfo in,
1188 : : LogicalRepStreamAbortData *abort_data,
1189 : : bool read_abort_info)
1190 : : {
1191 [ - + ]: 38 : Assert(abort_data);
1192 : :
1193 : 38 : abort_data->xid = pq_getmsgint(in, 4);
1194 : 38 : abort_data->subxid = pq_getmsgint(in, 4);
1195 : :
1196 [ + + ]: 38 : if (read_abort_info)
1197 : : {
1198 : 24 : abort_data->abort_lsn = pq_getmsgint64(in);
1199 : 24 : abort_data->abort_time = pq_getmsgint64(in);
1200 : : }
1201 : : else
1202 : : {
1203 : 14 : abort_data->abort_lsn = InvalidXLogRecPtr;
1204 : 14 : abort_data->abort_time = 0;
1205 : : }
1931 1206 : 38 : }
1207 : :
1208 : : /*
1209 : : * Get string representing LogicalRepMsgType.
1210 : : */
1211 : : const char *
1573 1212 : 547 : logicalrep_message_type(LogicalRepMsgType action)
1213 : : {
1214 : : static char err_unknown[20];
1215 : :
1216 [ + + - + : 547 : switch (action)
+ + - + -
- + + - -
+ + + + +
- ]
1217 : : {
1218 : 1 : case LOGICAL_REP_MSG_BEGIN:
1219 : 1 : return "BEGIN";
1220 : 7 : case LOGICAL_REP_MSG_COMMIT:
1221 : 7 : return "COMMIT";
1573 akapila@postgresql.o 1222 :UBC 0 : case LOGICAL_REP_MSG_ORIGIN:
1223 : 0 : return "ORIGIN";
1573 akapila@postgresql.o 1224 :CBC 203 : case LOGICAL_REP_MSG_INSERT:
1225 : 203 : return "INSERT";
1226 : 22 : case LOGICAL_REP_MSG_UPDATE:
1227 : 22 : return "UPDATE";
1228 : 13 : case LOGICAL_REP_MSG_DELETE:
1229 : 13 : return "DELETE";
1573 akapila@postgresql.o 1230 :UBC 0 : case LOGICAL_REP_MSG_TRUNCATE:
1231 : 0 : return "TRUNCATE";
1573 akapila@postgresql.o 1232 :CBC 2 : case LOGICAL_REP_MSG_RELATION:
1233 : 2 : return "RELATION";
1573 akapila@postgresql.o 1234 :UBC 0 : case LOGICAL_REP_MSG_TYPE:
1235 : 0 : return "TYPE";
1236 : 0 : case LOGICAL_REP_MSG_MESSAGE:
1237 : 0 : return "MESSAGE";
1573 akapila@postgresql.o 1238 :CBC 1 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1239 : 1 : return "BEGIN PREPARE";
1240 : 2 : case LOGICAL_REP_MSG_PREPARE:
1241 : 2 : return "PREPARE";
1573 akapila@postgresql.o 1242 :UBC 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
1243 : 0 : return "COMMIT PREPARED";
1244 : 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1245 : 0 : return "ROLLBACK PREPARED";
1573 akapila@postgresql.o 1246 :CBC 13 : case LOGICAL_REP_MSG_STREAM_START:
1247 : 13 : return "STREAM START";
1248 : 231 : case LOGICAL_REP_MSG_STREAM_STOP:
1249 : 231 : return "STREAM STOP";
1250 : 31 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1251 : 31 : return "STREAM COMMIT";
1252 : 19 : case LOGICAL_REP_MSG_STREAM_ABORT:
1253 : 19 : return "STREAM ABORT";
1254 : 2 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1255 : 2 : return "STREAM PREPARE";
1256 : : }
1257 : :
1258 : : /*
1259 : : * This message provides context in the error raised when applying a
1260 : : * logical message. So we can't throw an error here. Return an unknown
1261 : : * indicator value so that the original error is still reported.
1262 : : */
876 akapila@postgresql.o 1263 :UBC 0 : snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1264 : :
1265 : 0 : return err_unknown;
1266 : : }
1267 : :
1268 : : /*
1269 : : * Check if the column 'att' of a table should be published.
1270 : : *
1271 : : * 'columns' represents the publication column list (if any) for that table.
1272 : : *
1273 : : * 'include_gencols_type' value indicates whether generated columns should be
1274 : : * published when there is no column list. Typically, this will have the same
1275 : : * value as the 'publish_generated_columns' publication parameter.
1276 : : *
1277 : : * Note that generated columns can be published only when present in a
1278 : : * publication column list, or when include_gencols_type is
1279 : : * PUBLISH_GENCOLS_STORED.
1280 : : */
1281 : : bool
405 akapila@postgresql.o 1282 :CBC 734762 : logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
1283 : : PublishGencolsType include_gencols_type)
1284 : : {
413 1285 [ + + ]: 734762 : if (att->attisdropped)
1286 : 51 : return false;
1287 : :
1288 : : /* If a column list is provided, publish only the cols in that list. */
405 1289 [ + + ]: 734711 : if (columns)
1290 : 949 : return bms_is_member(att->attnum, columns);
1291 : :
1292 : : /* All non-generated columns are always published. */
328 1293 [ + + ]: 733762 : if (!att->attgenerated)
1294 : 733705 : return true;
1295 : :
1296 : : /*
1297 : : * Stored generated columns are only published when the user sets
1298 : : * publish_generated_columns as stored.
1299 : : */
1300 [ + + ]: 57 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1301 : 33 : return include_gencols_type == PUBLISH_GENCOLS_STORED;
1302 : :
1303 : 24 : return false;
1304 : : }
|