Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgrepack.c
4 : : * Logical Replication output plugin for REPACK command
5 : : *
6 : : * Copyright (c) 2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgrepack/pgrepack.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/detoast.h"
16 : : #include "commands/repack_internal.h"
17 : : #include "replication/snapbuild.h"
18 : : #include "utils/memutils.h"
19 : :
54 alvherre@kurilemu.de 20 :GNC 7 : PG_MODULE_MAGIC;
21 : :
22 : : static void repack_startup(LogicalDecodingContext *ctx,
23 : : OutputPluginOptions *opt, bool is_init);
24 : : static void repack_shutdown(LogicalDecodingContext *ctx);
25 : : static void repack_begin_txn(LogicalDecodingContext *ctx,
26 : : ReorderBufferTXN *txn);
27 : : static void repack_commit_txn(LogicalDecodingContext *ctx,
28 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
29 : : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
30 : : Relation relation, ReorderBufferChange *change);
31 : : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
32 : : ConcurrentChangeKind kind, HeapTuple tuple);
33 : :
34 : : void
35 : 7 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
36 : : {
37 : 7 : cb->startup_cb = repack_startup;
38 : 7 : cb->begin_cb = repack_begin_txn;
39 : 7 : cb->change_cb = repack_process_change;
40 : 7 : cb->commit_cb = repack_commit_txn;
41 : 7 : cb->shutdown_cb = repack_shutdown;
42 : 7 : }
43 : :
44 : :
45 : : /* initialize this plugin */
46 : : static void
47 : 7 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
48 : : bool is_init)
49 : : {
50 : 7 : ctx->output_plugin_private = NULL;
51 : :
52 : : /* Probably unnecessary, as we don't use the SQL interface ... */
53 : 7 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
54 : :
55 [ - + ]: 7 : if (ctx->output_plugin_options != NIL)
56 : : {
54 alvherre@kurilemu.de 57 [ # # ]:UNC 0 : ereport(ERROR,
58 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59 : : errmsg("this plugin does not expect any options"));
60 : : }
54 alvherre@kurilemu.de 61 :GNC 7 : }
62 : :
63 : : static void
64 : 7 : repack_shutdown(LogicalDecodingContext *ctx)
65 : : {
66 : 7 : }
67 : :
68 : : /*
69 : : * As we don't release the slot during processing of particular table, there's
70 : : * no room for SQL interface, even for debugging purposes. Therefore we need
71 : : * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
72 : : * callbacks. (Although we might want to write custom callbacks, this API
73 : : * seems to be unnecessarily generic for our purposes.)
74 : : */
75 : :
76 : : /* BEGIN callback */
77 : : static void
78 : 11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
79 : : {
80 : 11 : }
81 : :
82 : : /* COMMIT callback */
83 : : static void
84 : 11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
85 : : XLogRecPtr commit_lsn)
86 : : {
87 : 11 : }
88 : :
89 : : /*
90 : : * Callback for individual changed tuples
91 : : */
92 : : static void
93 : 32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
94 : : Relation relation, ReorderBufferChange *change)
95 : : {
96 : 32 : RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
97 : : (RepackDecodingState *) ctx->output_writer_private;
98 : :
99 : : /* Changes of other relation should not have been decoded. */
100 [ - + ]: 32 : Assert(RelationGetRelid(relation) == private->relid);
101 : :
102 : : /* Decode entry depending on its type */
103 [ + + + - ]: 32 : switch (change->action)
104 : : {
105 : 7 : case REORDER_BUFFER_CHANGE_INSERT:
106 : : {
107 : : HeapTuple newtuple;
108 : :
109 : 7 : newtuple = change->data.tp.newtuple;
110 : :
111 : : /*
112 : : * Identity checks in the main function should have made this
113 : : * impossible.
114 : : */
115 [ - + ]: 7 : if (newtuple == NULL)
54 alvherre@kurilemu.de 116 [ # # ]:UNC 0 : elog(ERROR, "incomplete insert info");
117 : :
54 alvherre@kurilemu.de 118 :GNC 7 : repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
119 : : }
120 : 7 : break;
121 : 22 : case REORDER_BUFFER_CHANGE_UPDATE:
122 : : {
123 : : HeapTuple oldtuple,
124 : : newtuple;
125 : :
126 : 22 : oldtuple = change->data.tp.oldtuple;
127 : 22 : newtuple = change->data.tp.newtuple;
128 : :
129 [ - + ]: 22 : if (newtuple == NULL)
54 alvherre@kurilemu.de 130 [ # # ]:UNC 0 : elog(ERROR, "incomplete update info");
131 : :
54 alvherre@kurilemu.de 132 [ + + ]:GNC 22 : if (oldtuple != NULL)
133 : 8 : repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
134 : :
135 : 22 : repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
136 : : }
137 : 22 : break;
138 : 3 : case REORDER_BUFFER_CHANGE_DELETE:
139 : : {
140 : : HeapTuple oldtuple;
141 : :
142 : 3 : oldtuple = change->data.tp.oldtuple;
143 : :
144 [ - + ]: 3 : if (oldtuple == NULL)
54 alvherre@kurilemu.de 145 [ # # ]:UNC 0 : elog(ERROR, "incomplete delete info");
146 : :
54 alvherre@kurilemu.de 147 :GNC 3 : repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
148 : : }
149 : 3 : break;
54 alvherre@kurilemu.de 150 :UNC 0 : default:
151 : :
152 : : /*
153 : : * Should not come here. This includes TRUNCATE of the table being
154 : : * processed. heap_decode() cannot check the file locator easily,
155 : : * but we assume that TRUNCATE uses AccessExclusiveLock on the
156 : : * table so it should not occur during REPACK (CONCURRENTLY).
157 : : */
158 : 0 : Assert(false);
159 : : break;
160 : : }
54 alvherre@kurilemu.de 161 :GNC 32 : }
162 : :
163 : : /*
164 : : * Write the given tuple, with the given change kind, to the repack spill
165 : : * file. Later, the repack decoding worker can read these and replay
166 : : * the operations on the new copy of the table.
167 : : *
168 : : * For each change affecting the table being repacked, we store enough
169 : : * information about each tuple in it, so that it can be replayed in the
170 : : * new copy of the table.
171 : : */
172 : : static void
173 : 40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
174 : : ConcurrentChangeKind kind, HeapTuple tuple)
175 : : {
176 : : RepackDecodingState *dstate;
177 : : MemoryContext oldcxt;
178 : : BufFile *file;
179 : 40 : List *attrs_ext = NIL;
180 : : int natt_ext;
181 : :
182 : 40 : dstate = (RepackDecodingState *) ctx->output_writer_private;
183 : 40 : file = dstate->file;
184 : :
185 : : /* Store the change kind. */
186 : 40 : BufFileWrite(file, &kind, 1);
187 : :
188 : : /* Use a frequently-reset context to avoid dealing with leaks manually */
189 : 40 : oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
190 : :
191 : : /*
192 : : * If the tuple contains "external indirect" attributes, we need to write
193 : : * the contents to the file because we have no control over that memory.
194 : : */
195 [ + + ]: 40 : if (HeapTupleHasExternal(tuple))
196 : : {
197 : 13 : TupleDesc desc = RelationGetDescr(relation);
198 : : TupleTableSlot *slot;
199 : :
200 : : /* Initialize the slot, if not done already */
201 [ + + ]: 13 : if (dstate->slot == NULL)
202 : : {
203 : : ResourceOwner saveResourceOwner;
204 : :
205 : 1 : MemoryContextSwitchTo(dstate->worker_cxt);
206 : 1 : saveResourceOwner = CurrentResourceOwner;
207 : 1 : CurrentResourceOwner = dstate->worker_resowner;
208 : 1 : dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
209 : 1 : MemoryContextSwitchTo(dstate->change_cxt);
210 : 1 : CurrentResourceOwner = saveResourceOwner;
211 : : }
212 : :
213 : 13 : slot = dstate->slot;
214 : 13 : ExecStoreHeapTuple(tuple, slot, false);
215 : :
216 : : /*
217 : : * Loop over all attributes, and find out which ones we need to spill
218 : : * separately, to wit: each one that's a non-null varlena and stored
219 : : * out of line.
220 : : */
221 [ + + ]: 78 : for (int i = 0; i < desc->natts; i++)
222 : : {
223 : 65 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
224 : : varlena *varlen;
225 : :
226 [ + + + + : 91 : if (attr->attisdropped || attr->attlen != -1 ||
- + ]
227 : 26 : slot_attisnull(slot, i + 1))
228 : 39 : continue;
229 : :
230 : 26 : slot_getsomeattrs(slot, i + 1);
231 : :
232 : : /*
233 : : * This is a non-null varlena datum, but we only care if it's
234 : : * out-of-line
235 : : */
236 : 26 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
237 [ + + ]: 26 : if (!VARATT_IS_EXTERNAL(varlen))
238 : 9 : continue;
239 : :
240 : : /*
241 : : * We spill any indirect-external attributes separately from the
242 : : * heap tuple. Anything else is written as is.
243 : : */
244 [ + + ]: 17 : if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
245 : 15 : attrs_ext = lappend(attrs_ext, varlen);
246 : : else
247 : : {
248 : : /*
249 : : * Logical decoding should not produce "external expanded"
250 : : * attributes (those actually should never appear on disk), so
251 : : * only TOASTed attribute can be seen here.
252 : : *
253 : : * We get here if the table has external values but only
254 : : * in-line values are being updated now.
255 : : */
256 [ - + ]: 2 : Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
257 : : }
258 : : }
259 : :
260 : 13 : ExecClearTuple(slot);
261 : : }
262 : :
263 : : /*
264 : : * First, write the original heap tuple, prefixed by its length. Note
265 : : * that the external-toast tag for each toasted attribute will be present
266 : : * in what we write, so that we know where to restore each one later.
267 : : */
268 : 40 : BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
269 : 40 : BufFileWrite(file, tuple->t_data, tuple->t_len);
270 : :
271 : : /* Then, write the number of external attributes we found. */
272 : 40 : natt_ext = list_length(attrs_ext);
273 : 40 : BufFileWrite(file, &natt_ext, sizeof(natt_ext));
274 : :
275 : : /* Finally, the attributes themselves, if any */
276 [ + + + + : 95 : foreach_ptr(varlena, attr_val, attrs_ext)
+ + ]
277 : : {
278 : 15 : attr_val = detoast_external_attr(attr_val);
279 : 15 : BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
280 : : /* These attributes could be large, so free them right away */
281 : 15 : pfree(attr_val);
282 : : }
283 : :
284 : : /* Cleanup. */
285 : 40 : MemoryContextSwitchTo(oldcxt);
286 : 40 : MemoryContextReset(dstate->change_cxt);
287 : 40 : }
|