Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * astreamer_lz4.c
4 : : *
5 : : * Archive streamers that deal with data compressed using lz4.
6 : : * astreamer_lz4_compressor applies lz4 compression to the input stream,
7 : : * and astreamer_lz4_decompressor does the reverse.
8 : : *
9 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 : : *
11 : : * IDENTIFICATION
12 : : * src/fe_utils/astreamer_lz4.c
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres_fe.h"
17 : :
18 : : #include <unistd.h>
19 : :
20 : : #ifdef USE_LZ4
21 : : #include <lz4frame.h>
22 : : #endif
23 : :
24 : : #include "common/logging.h"
25 : : #include "fe_utils/astreamer.h"
26 : :
27 : : #ifdef USE_LZ4
28 : : typedef struct astreamer_lz4_frame
29 : : {
30 : : astreamer base;
31 : :
32 : : LZ4F_compressionContext_t cctx;
33 : : LZ4F_decompressionContext_t dctx;
34 : : LZ4F_preferences_t prefs;
35 : :
36 : : size_t bytes_written;
37 : : bool header_written;
38 : : } astreamer_lz4_frame;
39 : :
40 : : static void astreamer_lz4_compressor_content(astreamer *streamer,
41 : : astreamer_member *member,
42 : : const char *data, int len,
43 : : astreamer_archive_context context);
44 : : static void astreamer_lz4_compressor_finalize(astreamer *streamer);
45 : : static void astreamer_lz4_compressor_free(astreamer *streamer);
46 : :
47 : : static const astreamer_ops astreamer_lz4_compressor_ops = {
48 : : .content = astreamer_lz4_compressor_content,
49 : : .finalize = astreamer_lz4_compressor_finalize,
50 : : .free = astreamer_lz4_compressor_free
51 : : };
52 : :
53 : : static void astreamer_lz4_decompressor_content(astreamer *streamer,
54 : : astreamer_member *member,
55 : : const char *data, int len,
56 : : astreamer_archive_context context);
57 : : static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
58 : : static void astreamer_lz4_decompressor_free(astreamer *streamer);
59 : :
60 : : static const astreamer_ops astreamer_lz4_decompressor_ops = {
61 : : .content = astreamer_lz4_decompressor_content,
62 : : .finalize = astreamer_lz4_decompressor_finalize,
63 : : .free = astreamer_lz4_decompressor_free
64 : : };
65 : : #endif
66 : :
67 : : /*
68 : : * Create a new base backup streamer that performs lz4 compression of tar
69 : : * blocks.
70 : : */
71 : : astreamer *
397 rhaas@postgresql.org 72 :CBC 2 : astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
73 : : {
74 : : #ifdef USE_LZ4
75 : : astreamer_lz4_frame *streamer;
76 : : LZ4F_errorCode_t ctxError;
77 : : LZ4F_preferences_t *prefs;
78 : :
1303 79 [ - + ]: 2 : Assert(next != NULL);
80 : :
397 81 : 2 : streamer = palloc0(sizeof(astreamer_lz4_frame));
82 : 2 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
83 : : &astreamer_lz4_compressor_ops;
84 : :
1303 85 : 2 : streamer->base.bbs_next = next;
86 : 2 : initStringInfo(&streamer->base.bbs_buffer);
87 : 2 : streamer->header_written = false;
88 : :
89 : : /* Initialize stream compression preferences */
90 : 2 : prefs = &streamer->prefs;
91 : 2 : memset(prefs, 0, sizeof(LZ4F_preferences_t));
92 : 2 : prefs->frameInfo.blockSizeID = LZ4F_max256KB;
1088 michael@paquier.xyz 93 : 2 : prefs->compressionLevel = compress->level;
94 : :
1303 rhaas@postgresql.org 95 : 2 : ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96 [ - + ]: 2 : if (LZ4F_isError(ctxError))
1247 tgl@sss.pgh.pa.us 97 :UBC 0 : pg_log_error("could not create lz4 compression context: %s",
98 : : LZ4F_getErrorName(ctxError));
99 : :
1303 rhaas@postgresql.org 100 :CBC 2 : return &streamer->base;
101 : : #else
102 : : pg_fatal("this build does not support compression with %s", "LZ4");
103 : : return NULL; /* keep compiler quiet */
104 : : #endif
105 : : }
106 : :
107 : : #ifdef USE_LZ4
108 : : /*
109 : : * Compress the input data to output buffer.
110 : : *
111 : : * Find out the compression bound based on input data length for each
112 : : * invocation to make sure that output buffer has enough capacity to
113 : : * accommodate the compressed data. In case if the output buffer
114 : : * capacity falls short of compression bound then forward the content
115 : : * of output buffer to next streamer and empty the buffer.
116 : : */
117 : : static void
397 118 : 5482 : astreamer_lz4_compressor_content(astreamer *streamer,
119 : : astreamer_member *member,
120 : : const char *data, int len,
121 : : astreamer_archive_context context)
122 : : {
123 : : astreamer_lz4_frame *mystreamer;
124 : : uint8 *next_in,
125 : : *next_out;
126 : : size_t out_bound,
127 : : compressed_size,
128 : : avail_out;
129 : :
130 : 5482 : mystreamer = (astreamer_lz4_frame *) streamer;
1303 131 : 5482 : next_in = (uint8 *) data;
132 : :
133 : : /* Write header before processing the first input chunk. */
134 [ + + ]: 5482 : if (!mystreamer->header_written)
135 : : {
136 : 2 : compressed_size = LZ4F_compressBegin(mystreamer->cctx,
137 : 2 : (uint8 *) mystreamer->base.bbs_buffer.data,
138 : 2 : mystreamer->base.bbs_buffer.maxlen,
139 : 2 : &mystreamer->prefs);
140 : :
141 [ - + ]: 2 : if (LZ4F_isError(compressed_size))
1303 rhaas@postgresql.org 142 :UBC 0 : pg_log_error("could not write lz4 header: %s",
143 : : LZ4F_getErrorName(compressed_size));
144 : :
1303 rhaas@postgresql.org 145 :CBC 2 : mystreamer->bytes_written += compressed_size;
146 : 2 : mystreamer->header_written = true;
147 : : }
148 : :
149 : : /*
150 : : * Update the offset and capacity of output buffer based on number of
151 : : * bytes written to output buffer.
152 : : */
153 : 5482 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
154 : 5482 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
155 : :
156 : : /*
157 : : * Find out the compression bound and make sure that output buffer has the
158 : : * required capacity for the success of LZ4F_compressUpdate. If needed
159 : : * forward the content to next streamer and empty the buffer.
160 : : */
161 : 5482 : out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
1278 162 [ + + ]: 5482 : if (avail_out < out_bound)
163 : : {
397 164 : 34 : astreamer_content(mystreamer->base.bbs_next, member,
165 : 34 : mystreamer->base.bbs_buffer.data,
166 : 34 : mystreamer->bytes_written,
167 : : context);
168 : :
169 : : /* Enlarge buffer if it falls short of out bound. */
1213 tgl@sss.pgh.pa.us 170 [ + + ]: 34 : if (mystreamer->base.bbs_buffer.maxlen < out_bound)
171 : 2 : enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
172 : :
173 : 34 : avail_out = mystreamer->base.bbs_buffer.maxlen;
174 : 34 : mystreamer->bytes_written = 0;
175 : 34 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
176 : : }
177 : :
178 : : /*
179 : : * This call compresses the data starting at next_in and generates the
180 : : * output starting at next_out. It expects the caller to provide the size
181 : : * of input buffer and capacity of output buffer by providing parameters
182 : : * len and avail_out.
183 : : *
184 : : * It returns the number of bytes compressed to output buffer.
185 : : */
1303 rhaas@postgresql.org 186 : 5482 : compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
187 : : next_out, avail_out,
188 : : next_in, len, NULL);
189 : :
190 [ - + ]: 5482 : if (LZ4F_isError(compressed_size))
1303 rhaas@postgresql.org 191 :UBC 0 : pg_log_error("could not compress data: %s",
192 : : LZ4F_getErrorName(compressed_size));
193 : :
1303 rhaas@postgresql.org 194 :CBC 5482 : mystreamer->bytes_written += compressed_size;
195 : 5482 : }
196 : :
197 : : /*
198 : : * End-of-stream processing.
199 : : */
200 : : static void
397 201 : 2 : astreamer_lz4_compressor_finalize(astreamer *streamer)
202 : : {
203 : : astreamer_lz4_frame *mystreamer;
204 : : uint8 *next_out;
205 : : size_t footer_bound,
206 : : compressed_size,
207 : : avail_out;
208 : :
209 : 2 : mystreamer = (astreamer_lz4_frame *) streamer;
210 : :
211 : : /* Find out the footer bound and update the output buffer. */
1303 212 : 2 : footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
1278 213 [ - + ]: 2 : if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
214 : : footer_bound)
215 : : {
397 rhaas@postgresql.org 216 :UBC 0 : astreamer_content(mystreamer->base.bbs_next, NULL,
217 : 0 : mystreamer->base.bbs_buffer.data,
218 : 0 : mystreamer->bytes_written,
219 : : ASTREAMER_UNKNOWN);
220 : :
221 : : /* Enlarge buffer if it falls short of footer bound. */
1213 tgl@sss.pgh.pa.us 222 [ # # ]: 0 : if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
223 : 0 : enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
224 : :
225 : 0 : avail_out = mystreamer->base.bbs_buffer.maxlen;
226 : 0 : mystreamer->bytes_written = 0;
227 : 0 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
228 : : }
229 : : else
230 : : {
1303 rhaas@postgresql.org 231 :CBC 2 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
232 : 2 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
233 : : }
234 : :
235 : : /*
236 : : * Finalize the frame and flush whatever data remaining in compression
237 : : * context.
238 : : */
239 : 2 : compressed_size = LZ4F_compressEnd(mystreamer->cctx,
240 : : next_out, avail_out, NULL);
241 : :
242 [ - + ]: 2 : if (LZ4F_isError(compressed_size))
1303 rhaas@postgresql.org 243 :UBC 0 : pg_log_error("could not end lz4 compression: %s",
244 : : LZ4F_getErrorName(compressed_size));
245 : :
1303 rhaas@postgresql.org 246 :CBC 2 : mystreamer->bytes_written += compressed_size;
247 : :
397 248 : 2 : astreamer_content(mystreamer->base.bbs_next, NULL,
249 : 2 : mystreamer->base.bbs_buffer.data,
250 : 2 : mystreamer->bytes_written,
251 : : ASTREAMER_UNKNOWN);
252 : :
253 : 2 : astreamer_finalize(mystreamer->base.bbs_next);
1303 254 : 2 : }
255 : :
256 : : /*
257 : : * Free memory.
258 : : */
259 : : static void
397 260 : 2 : astreamer_lz4_compressor_free(astreamer *streamer)
261 : : {
262 : : astreamer_lz4_frame *mystreamer;
263 : :
264 : 2 : mystreamer = (astreamer_lz4_frame *) streamer;
265 : 2 : astreamer_free(streamer->bbs_next);
1303 266 : 2 : LZ4F_freeCompressionContext(mystreamer->cctx);
267 : 2 : pfree(streamer->bbs_buffer.data);
268 : 2 : pfree(streamer);
269 : 2 : }
270 : : #endif
271 : :
272 : : /*
273 : : * Create a new base backup streamer that performs decompression of lz4
274 : : * compressed blocks.
275 : : */
276 : : astreamer *
397 277 : 7 : astreamer_lz4_decompressor_new(astreamer *next)
278 : : {
279 : : #ifdef USE_LZ4
280 : : astreamer_lz4_frame *streamer;
281 : : LZ4F_errorCode_t ctxError;
282 : :
1303 283 [ - + ]: 7 : Assert(next != NULL);
284 : :
397 285 : 7 : streamer = palloc0(sizeof(astreamer_lz4_frame));
286 : 7 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
287 : : &astreamer_lz4_decompressor_ops;
288 : :
1303 289 : 7 : streamer->base.bbs_next = next;
290 : 7 : initStringInfo(&streamer->base.bbs_buffer);
291 : :
292 : : /* Initialize internal stream state for decompression */
293 : 7 : ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294 [ - + ]: 7 : if (LZ4F_isError(ctxError))
1247 tgl@sss.pgh.pa.us 295 :UBC 0 : pg_fatal("could not initialize compression library: %s",
296 : : LZ4F_getErrorName(ctxError));
297 : :
1303 rhaas@postgresql.org 298 :CBC 7 : return &streamer->base;
299 : : #else
300 : : pg_fatal("this build does not support compression with %s", "LZ4");
301 : : return NULL; /* keep compiler quiet */
302 : : #endif
303 : : }
304 : :
305 : : #ifdef USE_LZ4
306 : : /*
307 : : * Decompress the input data to output buffer until we run out of input
308 : : * data. Each time the output buffer is full, pass on the decompressed data
309 : : * to the next streamer.
310 : : */
311 : : static void
397 312 : 255 : astreamer_lz4_decompressor_content(astreamer *streamer,
313 : : astreamer_member *member,
314 : : const char *data, int len,
315 : : astreamer_archive_context context)
316 : : {
317 : : astreamer_lz4_frame *mystreamer;
318 : : uint8 *next_in,
319 : : *next_out;
320 : : size_t avail_in,
321 : : avail_out;
322 : :
323 : 255 : mystreamer = (astreamer_lz4_frame *) streamer;
1303 324 : 255 : next_in = (uint8 *) data;
66 michael@paquier.xyz 325 : 255 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
1303 rhaas@postgresql.org 326 : 255 : avail_in = len;
66 michael@paquier.xyz 327 : 255 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
328 : :
1303 rhaas@postgresql.org 329 [ + + ]: 205691 : while (avail_in > 0)
330 : : {
331 : : size_t ret,
332 : : read_size,
333 : : out_size;
334 : :
335 : 205436 : read_size = avail_in;
336 : 205436 : out_size = avail_out;
337 : :
338 : : /*
339 : : * This call decompresses the data starting at next_in and generates
340 : : * the output data starting at next_out. It expects the caller to
341 : : * provide size of the input buffer and total capacity of the output
342 : : * buffer by providing the read_size and out_size parameters
343 : : * respectively.
344 : : *
345 : : * Per the documentation of LZ4, parameters read_size and out_size
346 : : * behaves as dual parameters. On return, the number of bytes consumed
347 : : * from the input buffer will be written back to read_size and the
348 : : * number of bytes decompressed to output buffer will be written back
349 : : * to out_size respectively.
350 : : */
351 : 205436 : ret = LZ4F_decompress(mystreamer->dctx,
352 : : next_out, &out_size,
353 : : next_in, &read_size, NULL);
354 : :
355 [ - + ]: 205436 : if (LZ4F_isError(ret))
1303 rhaas@postgresql.org 356 :UBC 0 : pg_log_error("could not decompress data: %s",
357 : : LZ4F_getErrorName(ret));
358 : :
359 : : /* Update input buffer based on number of bytes consumed */
1303 rhaas@postgresql.org 360 :CBC 205436 : avail_in -= read_size;
361 : 205436 : next_in += read_size;
362 : :
363 : 205436 : mystreamer->bytes_written += out_size;
364 : :
365 : : /*
366 : : * If output buffer is full then forward the content to next streamer
367 : : * and update the output buffer.
368 : : */
369 [ + + ]: 205436 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
370 : : {
397 371 : 205284 : astreamer_content(mystreamer->base.bbs_next, member,
372 : 205284 : mystreamer->base.bbs_buffer.data,
373 : : mystreamer->base.bbs_buffer.maxlen,
374 : : context);
375 : :
1303 376 : 205284 : avail_out = mystreamer->base.bbs_buffer.maxlen;
377 : 205284 : mystreamer->bytes_written = 0;
378 : 205284 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
379 : : }
380 : : else
381 : : {
382 : 152 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
383 : 152 : next_out += mystreamer->bytes_written;
384 : : }
385 : : }
386 : 255 : }
387 : :
388 : : /*
389 : : * End-of-stream processing.
390 : : */
391 : : static void
397 392 : 7 : astreamer_lz4_decompressor_finalize(astreamer *streamer)
393 : : {
394 : : astreamer_lz4_frame *mystreamer;
395 : :
396 : 7 : mystreamer = (astreamer_lz4_frame *) streamer;
397 : :
398 : : /*
399 : : * End of the stream, if there is some pending data in output buffers then
400 : : * we must forward it to next streamer.
401 : : */
402 : 7 : astreamer_content(mystreamer->base.bbs_next, NULL,
403 : 7 : mystreamer->base.bbs_buffer.data,
404 : : mystreamer->base.bbs_buffer.maxlen,
405 : : ASTREAMER_UNKNOWN);
406 : :
407 : 7 : astreamer_finalize(mystreamer->base.bbs_next);
1303 408 : 7 : }
409 : :
410 : : /*
411 : : * Free memory.
412 : : */
413 : : static void
397 414 : 7 : astreamer_lz4_decompressor_free(astreamer *streamer)
415 : : {
416 : : astreamer_lz4_frame *mystreamer;
417 : :
418 : 7 : mystreamer = (astreamer_lz4_frame *) streamer;
419 : 7 : astreamer_free(streamer->bbs_next);
1303 420 : 7 : LZ4F_freeDecompressionContext(mystreamer->dctx);
421 : 7 : pfree(streamer->bbs_buffer.data);
422 : 7 : pfree(streamer);
423 : 7 : }
424 : : #endif
|