Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * astreamer_zstd.c
4 : : *
5 : : * Archive streamers that deal with data compressed using zstd.
6 : : * astreamer_zstd_compressor applies zstd compression to the input stream,
7 : : * and astreamer_zstd_decompressor does the reverse.
8 : : *
9 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
10 : : *
11 : : * IDENTIFICATION
12 : : * src/fe_utils/astreamer_zstd.c
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres_fe.h"
17 : :
18 : : #include <unistd.h>
19 : :
20 : : #ifdef USE_ZSTD
21 : : #include <zstd.h>
22 : : #endif
23 : :
24 : : #include "common/logging.h"
25 : : #include "fe_utils/astreamer.h"
26 : :
27 : : #ifdef USE_ZSTD
28 : :
29 : : typedef struct astreamer_zstd_frame
30 : : {
31 : : astreamer base;
32 : :
33 : : ZSTD_CCtx *cctx;
34 : : ZSTD_DCtx *dctx;
35 : : ZSTD_outBuffer zstd_outBuf;
36 : : } astreamer_zstd_frame;
37 : :
38 : : static void astreamer_zstd_compressor_content(astreamer *streamer,
39 : : astreamer_member *member,
40 : : const char *data, int len,
41 : : astreamer_archive_context context);
42 : : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
43 : : static void astreamer_zstd_compressor_free(astreamer *streamer);
44 : :
45 : : static const astreamer_ops astreamer_zstd_compressor_ops = {
46 : : .content = astreamer_zstd_compressor_content,
47 : : .finalize = astreamer_zstd_compressor_finalize,
48 : : .free = astreamer_zstd_compressor_free
49 : : };
50 : :
51 : : static void astreamer_zstd_decompressor_content(astreamer *streamer,
52 : : astreamer_member *member,
53 : : const char *data, int len,
54 : : astreamer_archive_context context);
55 : : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
56 : : static void astreamer_zstd_decompressor_free(astreamer *streamer);
57 : :
58 : : static const astreamer_ops astreamer_zstd_decompressor_ops = {
59 : : .content = astreamer_zstd_decompressor_content,
60 : : .finalize = astreamer_zstd_decompressor_finalize,
61 : : .free = astreamer_zstd_decompressor_free
62 : : };
63 : : #endif
64 : :
65 : : /*
66 : : * Create a new base backup streamer that performs zstd compression of tar
67 : : * blocks.
68 : : */
69 : : astreamer *
587 rhaas@postgresql.org 70 :CBC 3 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
71 : : {
72 : : #ifdef USE_ZSTD
73 : : astreamer_zstd_frame *streamer;
74 : : size_t ret;
75 : :
1469 76 [ - + ]: 3 : Assert(next != NULL);
77 : :
96 michael@paquier.xyz 78 :GNC 3 : streamer = palloc0_object(astreamer_zstd_frame);
79 : :
587 rhaas@postgresql.org 80 :CBC 3 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
81 : : &astreamer_zstd_compressor_ops;
82 : :
1469 83 : 3 : streamer->base.bbs_next = next;
84 : 3 : initStringInfo(&streamer->base.bbs_buffer);
85 : 3 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86 : :
87 : 3 : streamer->cctx = ZSTD_createCCtx();
88 [ - + ]: 3 : if (!streamer->cctx)
1437 tgl@sss.pgh.pa.us 89 :UBC 0 : pg_fatal("could not create zstd compression context");
90 : :
91 : : /* Set compression level */
1278 michael@paquier.xyz 92 :CBC 3 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93 : : compress->level);
94 [ - + ]: 3 : if (ZSTD_isError(ret))
1278 michael@paquier.xyz 95 :UBC 0 : pg_fatal("could not set zstd compression level to %d: %s",
96 : : compress->level, ZSTD_getErrorName(ret));
97 : :
98 : : /* Set # of workers, if specified */
1433 michael@paquier.xyz 99 [ + + ]:CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 : : {
101 : : /*
102 : : * On older versions of libzstd, this option does not exist, and
103 : : * trying to set it will fail. Similarly for newer versions if they
104 : : * are compiled without threading support.
105 : : */
1446 rhaas@postgresql.org 106 : 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 : : compress->workers);
108 [ - + ]: 1 : if (ZSTD_isError(ret))
1437 tgl@sss.pgh.pa.us 109 :UBC 0 : pg_fatal("could not set compression worker count to %d: %s",
110 : : compress->workers, ZSTD_getErrorName(ret));
111 : : }
112 : :
1074 tomas.vondra@postgre 113 [ + + ]:CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 : : {
115 : 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
116 : : ZSTD_c_enableLongDistanceMatching,
117 : 1 : compress->long_distance);
118 [ - + ]: 1 : if (ZSTD_isError(ret))
10 tgl@sss.pgh.pa.us 119 :UBC 0 : pg_fatal("could not enable long-distance mode: %s",
120 : : ZSTD_getErrorName(ret));
121 : : }
122 : :
123 : : /* Initialize the ZSTD output buffer. */
1469 rhaas@postgresql.org 124 :CBC 3 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
125 : 3 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
126 : 3 : streamer->zstd_outBuf.pos = 0;
127 : :
128 : 3 : return &streamer->base;
129 : : #else
130 : : pg_fatal("this build does not support compression with %s", "ZSTD");
131 : : return NULL; /* keep compiler quiet */
132 : : #endif
133 : : }
134 : :
135 : : #ifdef USE_ZSTD
136 : : /*
137 : : * Compress the input data to output buffer.
138 : : *
139 : : * Find out the compression bound based on input data length for each
140 : : * invocation to make sure that output buffer has enough capacity to
141 : : * accommodate the compressed data. In case if the output buffer
142 : : * capacity falls short of compression bound then forward the content
143 : : * of output buffer to next streamer and empty the buffer.
144 : : */
145 : : static void
587 146 : 8214 : astreamer_zstd_compressor_content(astreamer *streamer,
147 : : astreamer_member *member,
148 : : const char *data, int len,
149 : : astreamer_archive_context context)
150 : : {
151 : 8214 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
1469 152 : 8214 : ZSTD_inBuffer inBuf = {data, len, 0};
153 : :
154 [ + + ]: 16434 : while (inBuf.pos < inBuf.size)
155 : : {
156 : : size_t yet_to_flush;
157 : 8220 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
158 : :
159 : : /*
160 : : * If the output buffer is not left with enough space, send the
161 : : * compressed bytes to the next streamer, and empty the buffer.
162 : : */
163 [ + + ]: 8220 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
164 : : max_needed)
165 : : {
587 166 : 25 : astreamer_content(mystreamer->base.bbs_next, member,
167 : 25 : mystreamer->zstd_outBuf.dst,
168 : 25 : mystreamer->zstd_outBuf.pos,
169 : : context);
170 : :
171 : : /* Reset the ZSTD output buffer. */
1469 172 : 25 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
173 : 25 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
174 : 25 : mystreamer->zstd_outBuf.pos = 0;
175 : : }
176 : :
177 : : yet_to_flush =
178 : 8220 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
179 : : &inBuf, ZSTD_e_continue);
180 : :
181 [ - + ]: 8220 : if (ZSTD_isError(yet_to_flush))
10 tgl@sss.pgh.pa.us 182 :UBC 0 : pg_fatal("could not compress data: %s",
183 : : ZSTD_getErrorName(yet_to_flush));
184 : : }
1469 rhaas@postgresql.org 185 :CBC 8214 : }
186 : :
187 : : /*
188 : : * End-of-stream processing.
189 : : */
190 : : static void
587 191 : 3 : astreamer_zstd_compressor_finalize(astreamer *streamer)
192 : : {
193 : 3 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
194 : : size_t yet_to_flush;
195 : :
196 : : do
197 : : {
1469 198 : 7 : ZSTD_inBuffer in = {NULL, 0, 0};
199 : 7 : size_t max_needed = ZSTD_compressBound(0);
200 : :
201 : : /*
202 : : * If the output buffer is not left with enough space, send the
203 : : * compressed bytes to the next streamer, and empty the buffer.
204 : : */
205 [ + + ]: 7 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
206 : : max_needed)
207 : : {
587 rhaas@postgresql.org 208 :GBC 4 : astreamer_content(mystreamer->base.bbs_next, NULL,
209 : 4 : mystreamer->zstd_outBuf.dst,
210 : 4 : mystreamer->zstd_outBuf.pos,
211 : : ASTREAMER_UNKNOWN);
212 : :
213 : : /* Reset the ZSTD output buffer. */
1469 214 : 4 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
215 : 4 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
216 : 4 : mystreamer->zstd_outBuf.pos = 0;
217 : : }
218 : :
1469 rhaas@postgresql.org 219 :CBC 7 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
220 : : &mystreamer->zstd_outBuf,
221 : : &in, ZSTD_e_end);
222 : :
223 [ - + ]: 7 : if (ZSTD_isError(yet_to_flush))
10 tgl@sss.pgh.pa.us 224 :UBC 0 : pg_fatal("could not compress data: %s",
225 : : ZSTD_getErrorName(yet_to_flush));
226 : :
1469 rhaas@postgresql.org 227 [ + + ]:CBC 7 : } while (yet_to_flush > 0);
228 : :
229 : : /* Make sure to pass any remaining bytes to the next streamer. */
230 [ + - ]: 3 : if (mystreamer->zstd_outBuf.pos > 0)
587 231 : 3 : astreamer_content(mystreamer->base.bbs_next, NULL,
232 : 3 : mystreamer->zstd_outBuf.dst,
233 : 3 : mystreamer->zstd_outBuf.pos,
234 : : ASTREAMER_UNKNOWN);
235 : :
236 : 3 : astreamer_finalize(mystreamer->base.bbs_next);
1469 237 : 3 : }
238 : :
239 : : /*
240 : : * Free memory.
241 : : */
242 : : static void
587 243 : 3 : astreamer_zstd_compressor_free(astreamer *streamer)
244 : : {
245 : 3 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
246 : :
247 : 3 : astreamer_free(streamer->bbs_next);
1469 248 : 3 : ZSTD_freeCCtx(mystreamer->cctx);
249 : 3 : pfree(streamer->bbs_buffer.data);
250 : 3 : pfree(streamer);
251 : 3 : }
252 : : #endif
253 : :
254 : : /*
255 : : * Create a new base backup streamer that performs decompression of zstd
256 : : * compressed blocks.
257 : : */
258 : : astreamer *
587 259 : 9 : astreamer_zstd_decompressor_new(astreamer *next)
260 : : {
261 : : #ifdef USE_ZSTD
262 : : astreamer_zstd_frame *streamer;
263 : :
1469 264 [ - + ]: 9 : Assert(next != NULL);
265 : :
96 michael@paquier.xyz 266 :GNC 9 : streamer = palloc0_object(astreamer_zstd_frame);
587 rhaas@postgresql.org 267 :CBC 9 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
268 : : &astreamer_zstd_decompressor_ops;
269 : :
1469 270 : 9 : streamer->base.bbs_next = next;
271 : 9 : initStringInfo(&streamer->base.bbs_buffer);
272 : 9 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
273 : :
274 : 9 : streamer->dctx = ZSTD_createDCtx();
275 [ - + ]: 9 : if (!streamer->dctx)
1437 tgl@sss.pgh.pa.us 276 :UBC 0 : pg_fatal("could not create zstd decompression context");
277 : :
278 : : /* Initialize the ZSTD output buffer. */
1469 rhaas@postgresql.org 279 :CBC 9 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
280 : 9 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
281 : 9 : streamer->zstd_outBuf.pos = 0;
282 : :
283 : 9 : return &streamer->base;
284 : : #else
285 : : pg_fatal("this build does not support compression with %s", "ZSTD");
286 : : return NULL; /* keep compiler quiet */
287 : : #endif
288 : : }
289 : :
290 : : #ifdef USE_ZSTD
291 : : /*
292 : : * Decompress the input data to output buffer until we run out of input
293 : : * data. Each time the output buffer is full, pass on the decompressed data
294 : : * to the next streamer.
295 : : */
296 : : static void
587 297 : 320 : astreamer_zstd_decompressor_content(astreamer *streamer,
298 : : astreamer_member *member,
299 : : const char *data, int len,
300 : : astreamer_archive_context context)
301 : : {
302 : 320 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
1469 303 : 320 : ZSTD_inBuffer inBuf = {data, len, 0};
304 : :
305 [ + + ]: 1581 : while (inBuf.pos < inBuf.size)
306 : : {
307 : : size_t ret;
308 : :
309 : : /*
310 : : * If output buffer is full then forward the content to next streamer
311 : : * and update the output buffer.
312 : : */
313 [ + + ]: 1261 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
314 : : {
587 315 : 1114 : astreamer_content(mystreamer->base.bbs_next, member,
316 : 1114 : mystreamer->zstd_outBuf.dst,
317 : 1114 : mystreamer->zstd_outBuf.pos,
318 : : context);
319 : :
320 : : /* Reset the ZSTD output buffer. */
1469 321 : 1114 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
322 : 1114 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
323 : 1114 : mystreamer->zstd_outBuf.pos = 0;
324 : : }
325 : :
326 : 1261 : ret = ZSTD_decompressStream(mystreamer->dctx,
327 : : &mystreamer->zstd_outBuf, &inBuf);
328 : :
329 [ - + ]: 1261 : if (ZSTD_isError(ret))
10 tgl@sss.pgh.pa.us 330 :UBC 0 : pg_fatal("could not decompress data: %s",
331 : : ZSTD_getErrorName(ret));
332 : : }
1469 rhaas@postgresql.org 333 :CBC 320 : }
334 : :
335 : : /*
336 : : * End-of-stream processing.
337 : : */
338 : : static void
587 339 : 9 : astreamer_zstd_decompressor_finalize(astreamer *streamer)
340 : : {
341 : 9 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
342 : :
343 : : /*
344 : : * End of the stream, if there is some pending data in output buffers then
345 : : * we must forward it to next streamer.
346 : : */
1469 347 [ + - ]: 9 : if (mystreamer->zstd_outBuf.pos > 0)
587 348 : 9 : astreamer_content(mystreamer->base.bbs_next, NULL,
349 : 9 : mystreamer->base.bbs_buffer.data,
350 : : mystreamer->base.bbs_buffer.maxlen,
351 : : ASTREAMER_UNKNOWN);
352 : :
353 : 9 : astreamer_finalize(mystreamer->base.bbs_next);
1469 354 : 9 : }
355 : :
356 : : /*
357 : : * Free memory.
358 : : */
359 : : static void
587 360 : 9 : astreamer_zstd_decompressor_free(astreamer *streamer)
361 : : {
362 : 9 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
363 : :
364 : 9 : astreamer_free(streamer->bbs_next);
1469 365 : 9 : ZSTD_freeDCtx(mystreamer->dctx);
366 : 9 : pfree(streamer->bbs_buffer.data);
367 : 9 : pfree(streamer);
368 : 9 : }
369 : : #endif
|