Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * compress_lz4.c
4 : : * Routines for archivers to write a LZ4 compressed data stream.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/bin/pg_dump/compress_lz4.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : : #include "postgres_fe.h"
15 : : #include <unistd.h>
16 : :
17 : : #include "compress_lz4.h"
18 : : #include "pg_backup_utils.h"
19 : :
20 : : #ifdef USE_LZ4
21 : : #include <lz4frame.h>
22 : :
23 : : /*
24 : : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
25 : : * Redefine it for installations with a lesser version.
26 : : */
27 : : #ifndef LZ4F_HEADER_SIZE_MAX
28 : : #define LZ4F_HEADER_SIZE_MAX 32
29 : : #endif
30 : :
31 : : /*---------------------------------
32 : : * Common to both compression APIs
33 : : *---------------------------------
34 : : */
35 : :
36 : : /*
37 : : * (de)compression state used by both the Compressor and Stream APIs.
38 : : */
39 : : typedef struct LZ4State
40 : : {
41 : : /*
42 : : * Used by the Stream API to keep track of the file stream.
43 : : */
44 : : FILE *fp;
45 : :
46 : : LZ4F_preferences_t prefs;
47 : :
48 : : LZ4F_compressionContext_t ctx;
49 : : LZ4F_decompressionContext_t dtx;
50 : :
51 : : /*
52 : : * Used by the Stream API's lazy initialization.
53 : : */
54 : : bool inited;
55 : :
56 : : /*
57 : : * Used by the Stream API to distinguish between compression and
58 : : * decompression operations.
59 : : */
60 : : bool compressing;
61 : :
62 : : /*
63 : : * Used by the Compressor API to mark if the compression headers have been
64 : : * written after initialization.
65 : : */
66 : : bool needs_header_flush;
67 : :
68 : : size_t buflen;
69 : : char *buffer;
70 : :
71 : : /*
72 : : * Used by the Stream API to store already uncompressed data that the
73 : : * caller has not consumed.
74 : : */
75 : : size_t overflowalloclen;
76 : : size_t overflowlen;
77 : : char *overflowbuf;
78 : :
79 : : /*
80 : : * Used by both APIs to keep track of the compressed data length stored in
81 : : * the buffer.
82 : : */
83 : : size_t compressedlen;
84 : :
85 : : /*
86 : : * Used by both APIs to keep track of error codes.
87 : : */
88 : : size_t errcode;
89 : : } LZ4State;
90 : :
91 : : /*
92 : : * LZ4State_compression_init
93 : : * Initialize the required LZ4State members for compression.
94 : : *
95 : : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
96 : : * this function can choose when and how to write the header to a file stream.
97 : : *
98 : : * Returns true on success. In case of a failure returns false, and stores the
99 : : * error code in state->errcode.
100 : : */
101 : : static bool
889 tomas.vondra@postgre 102 :CBC 80 : LZ4State_compression_init(LZ4State *state)
103 : : {
104 : : size_t status;
105 : :
106 : 80 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
107 : :
108 : : /*
109 : : * LZ4F_compressBegin requires a buffer that is greater or equal to
110 : : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
111 : : */
112 [ - + ]: 80 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
889 tomas.vondra@postgre 113 :UBC 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
114 : :
889 tomas.vondra@postgre 115 :CBC 80 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
116 [ - + ]: 80 : if (LZ4F_isError(status))
117 : : {
889 tomas.vondra@postgre 118 :UBC 0 : state->errcode = status;
119 : 0 : return false;
120 : : }
121 : :
889 tomas.vondra@postgre 122 :CBC 80 : state->buffer = pg_malloc(state->buflen);
123 : 80 : status = LZ4F_compressBegin(state->ctx,
124 : 80 : state->buffer, state->buflen,
125 : 80 : &state->prefs);
126 [ - + ]: 80 : if (LZ4F_isError(status))
127 : : {
889 tomas.vondra@postgre 128 :UBC 0 : state->errcode = status;
129 : 0 : return false;
130 : : }
131 : :
889 tomas.vondra@postgre 132 :CBC 80 : state->compressedlen = status;
133 : :
134 : 80 : return true;
135 : : }
136 : :
137 : : /*----------------------
138 : : * Compressor API
139 : : *----------------------
140 : : */
141 : :
142 : : /* Private routines that support LZ4 compressed data I/O */
143 : :
144 : : static void
926 145 : 40 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
146 : : {
147 : : size_t r;
148 : : size_t readbuflen;
149 : : char *outbuf;
150 : : char *readbuf;
889 151 : 40 : LZ4F_decompressionContext_t ctx = NULL;
152 : : LZ4F_decompressOptions_t dec_opt;
153 : : LZ4F_errorCode_t status;
154 : :
155 : 40 : memset(&dec_opt, 0, sizeof(dec_opt));
156 : 40 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
157 [ - + ]: 40 : if (LZ4F_isError(status))
889 tomas.vondra@postgre 158 :UBC 0 : pg_fatal("could not create LZ4 decompression context: %s",
159 : : LZ4F_getErrorName(status));
160 : :
889 tomas.vondra@postgre 161 :CBC 40 : outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
162 : 40 : readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
163 : 40 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
164 [ + + ]: 120 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
165 : : {
166 : : char *readp;
167 : : char *readend;
168 : :
169 : : /* Process one chunk */
170 : 80 : readp = readbuf;
171 : 80 : readend = readbuf + r;
172 [ + + ]: 163 : while (readp < readend)
173 : : {
174 : 83 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
175 : 83 : size_t read_size = readend - readp;
176 : :
177 : 83 : memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
178 : 83 : status = LZ4F_decompress(ctx, outbuf, &out_size,
179 : : readp, &read_size, &dec_opt);
180 [ - + ]: 83 : if (LZ4F_isError(status))
889 tomas.vondra@postgre 181 :UBC 0 : pg_fatal("could not decompress: %s",
182 : : LZ4F_getErrorName(status));
183 : :
889 tomas.vondra@postgre 184 :CBC 83 : ahwrite(outbuf, 1, out_size, AH);
185 : 83 : readp += read_size;
186 : : }
187 : : }
188 : :
189 : 40 : pg_free(outbuf);
190 : 40 : pg_free(readbuf);
191 : :
192 : 40 : status = LZ4F_freeDecompressionContext(ctx);
193 [ - + ]: 40 : if (LZ4F_isError(status))
889 tomas.vondra@postgre 194 :UBC 0 : pg_fatal("could not free LZ4 decompression context: %s",
195 : : LZ4F_getErrorName(status));
926 tomas.vondra@postgre 196 :CBC 40 : }
197 : :
198 : : static void
199 : 169 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
200 : : const void *data, size_t dLen)
201 : : {
889 202 : 169 : LZ4State *state = (LZ4State *) cs->private_data;
203 : 169 : size_t remaining = dLen;
204 : : size_t status;
205 : : size_t chunk;
206 : :
207 : : /* Write the header if not yet written. */
208 [ + + ]: 169 : if (state->needs_header_flush)
209 : : {
210 : 39 : cs->writeF(AH, state->buffer, state->compressedlen);
211 : 39 : state->needs_header_flush = false;
212 : : }
213 : :
214 [ + + ]: 341 : while (remaining > 0)
215 : : {
216 : :
217 [ + + ]: 172 : if (remaining > DEFAULT_IO_BUFFER_SIZE)
218 : 3 : chunk = DEFAULT_IO_BUFFER_SIZE;
219 : : else
220 : 169 : chunk = remaining;
221 : :
222 : 172 : remaining -= chunk;
223 : 172 : status = LZ4F_compressUpdate(state->ctx,
224 : 172 : state->buffer, state->buflen,
225 : : data, chunk, NULL);
226 : :
227 [ - + ]: 172 : if (LZ4F_isError(status))
841 peter@eisentraut.org 228 :UBC 0 : pg_fatal("could not compress data: %s",
229 : : LZ4F_getErrorName(status));
230 : :
889 tomas.vondra@postgre 231 :CBC 172 : cs->writeF(AH, state->buffer, status);
232 : :
233 : 172 : data = ((char *) data) + chunk;
234 : : }
926 235 : 169 : }
236 : :
237 : : static void
238 : 80 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
239 : : {
889 240 : 80 : LZ4State *state = (LZ4State *) cs->private_data;
241 : : size_t status;
242 : :
243 : : /* Nothing needs to be done */
244 [ + + ]: 80 : if (!state)
245 : 40 : return;
246 : :
247 : : /*
248 : : * Write the header if not yet written. The caller is not required to call
249 : : * writeData if the relation does not contain any data. Thus it is
250 : : * possible to reach here without having flushed the header. Do it before
251 : : * ending the compression.
252 : : */
253 [ + + ]: 40 : if (state->needs_header_flush)
254 : 1 : cs->writeF(AH, state->buffer, state->compressedlen);
255 : :
256 : 40 : status = LZ4F_compressEnd(state->ctx,
257 : 40 : state->buffer, state->buflen,
258 : : NULL);
259 [ - + ]: 40 : if (LZ4F_isError(status))
841 peter@eisentraut.org 260 :UBC 0 : pg_fatal("could not end compression: %s",
261 : : LZ4F_getErrorName(status));
262 : :
889 tomas.vondra@postgre 263 :CBC 40 : cs->writeF(AH, state->buffer, status);
264 : :
265 : 40 : status = LZ4F_freeCompressionContext(state->ctx);
266 [ - + ]: 40 : if (LZ4F_isError(status))
841 peter@eisentraut.org 267 :UBC 0 : pg_fatal("could not end compression: %s",
268 : : LZ4F_getErrorName(status));
269 : :
889 tomas.vondra@postgre 270 :CBC 40 : pg_free(state->buffer);
271 : 40 : pg_free(state);
272 : :
273 : 40 : cs->private_data = NULL;
274 : : }
275 : :
276 : : /*
277 : : * Public routines that support LZ4 compressed data I/O
278 : : */
279 : : void
926 280 : 80 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
281 : : {
282 : : LZ4State *state;
283 : :
284 : 80 : cs->readData = ReadDataFromArchiveLZ4;
285 : 80 : cs->writeData = WriteDataToArchiveLZ4;
286 : 80 : cs->end = EndCompressorLZ4;
287 : :
288 : 80 : cs->compression_spec = compression_spec;
289 : :
290 : : /*
291 : : * Read operations have access to the whole input. No state needs to be
292 : : * carried between calls.
293 : : */
889 294 [ + + ]: 80 : if (cs->readF)
295 : 40 : return;
296 : :
297 : 40 : state = pg_malloc0(sizeof(*state));
298 [ + - ]: 40 : if (cs->compression_spec.level >= 0)
299 : 40 : state->prefs.compressionLevel = cs->compression_spec.level;
300 : :
301 [ - + ]: 40 : if (!LZ4State_compression_init(state))
889 tomas.vondra@postgre 302 :UBC 0 : pg_fatal("could not initialize LZ4 compression: %s",
303 : : LZ4F_getErrorName(state->errcode));
304 : :
305 : : /* Remember that the header has not been written. */
889 tomas.vondra@postgre 306 :CBC 40 : state->needs_header_flush = true;
307 : 40 : cs->private_data = state;
308 : : }
309 : :
310 : : /*----------------------
311 : : * Compress Stream API
312 : : *----------------------
313 : : */
314 : :
315 : :
316 : : /*
317 : : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
318 : : * decompressed output in the overflow buffer and the end of the backing file
319 : : * is reached.
320 : : */
321 : : static bool
889 tomas.vondra@postgre 322 :UBC 0 : LZ4Stream_eof(CompressFileHandle *CFH)
323 : : {
324 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
325 : :
326 [ # # # # ]: 0 : return state->overflowlen == 0 && feof(state->fp);
327 : : }
328 : :
329 : : static const char *
330 : 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
331 : : {
332 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
333 : : const char *errmsg;
334 : :
335 [ # # ]: 0 : if (LZ4F_isError(state->errcode))
336 : 0 : errmsg = LZ4F_getErrorName(state->errcode);
337 : : else
926 338 : 0 : errmsg = strerror(errno);
339 : :
340 : 0 : return errmsg;
341 : : }
342 : :
343 : : /*
344 : : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
345 : : *
346 : : * Creates the necessary contexts for either compression or decompression. When
347 : : * compressing data (indicated by compressing=true), it additionally writes the
348 : : * LZ4 header in the output stream.
349 : : *
350 : : * Returns true on success. In case of a failure returns false, and stores the
351 : : * error code in state->errcode.
352 : : */
353 : : static bool
889 tomas.vondra@postgre 354 :CBC 2459 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
355 : : {
356 : : size_t status;
357 : :
358 [ + + ]: 2459 : if (state->inited)
898 359 : 2379 : return true;
360 : :
889 361 : 80 : state->compressing = compressing;
362 : :
363 : : /* When compressing, write LZ4 header to the output stream. */
364 [ + + ]: 80 : if (state->compressing)
365 : : {
366 : :
367 [ - + ]: 40 : if (!LZ4State_compression_init(state))
898 tomas.vondra@postgre 368 :UBC 0 : return false;
369 : :
8 dgustafsson@postgres 370 :CBC 40 : errno = 0;
889 tomas.vondra@postgre 371 [ - + ]: 40 : if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
372 : : {
926 tomas.vondra@postgre 373 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
898 374 : 0 : return false;
375 : : }
376 : : }
377 : : else
378 : : {
889 tomas.vondra@postgre 379 :CBC 40 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
926 380 [ - + ]: 40 : if (LZ4F_isError(status))
381 : : {
889 tomas.vondra@postgre 382 :UBC 0 : state->errcode = status;
898 383 : 0 : return false;
384 : : }
385 : :
889 tomas.vondra@postgre 386 :CBC 40 : state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
387 : 40 : state->buffer = pg_malloc(state->buflen);
388 : :
389 : 40 : state->overflowalloclen = state->buflen;
390 : 40 : state->overflowbuf = pg_malloc(state->overflowalloclen);
391 : 40 : state->overflowlen = 0;
392 : : }
393 : :
8 dgustafsson@postgres 394 : 80 : state->inited = true;
898 tomas.vondra@postgre 395 : 80 : return true;
396 : : }
397 : :
398 : : /*
399 : : * Read already decompressed content from the overflow buffer into 'ptr' up to
400 : : * 'size' bytes, if available. If the eol_flag is set, then stop at the first
401 : : * occurrence of the newline char prior to 'size' bytes.
402 : : *
403 : : * Any unread content in the overflow buffer is moved to the beginning.
404 : : *
405 : : * Returns the number of bytes read from the overflow buffer (and copied into
406 : : * the 'ptr' buffer), or 0 if the overflow buffer is empty.
407 : : */
408 : : static int
889 409 : 82 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
410 : : {
411 : : char *p;
926 412 : 82 : int readlen = 0;
413 : :
889 414 [ + + ]: 82 : if (state->overflowlen == 0)
926 415 : 79 : return 0;
416 : :
889 417 [ + + ]: 3 : if (state->overflowlen >= size)
926 418 : 2 : readlen = size;
419 : : else
889 420 : 1 : readlen = state->overflowlen;
421 : :
422 [ - + - - ]: 3 : if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
423 : : /* Include the line terminating char */
889 tomas.vondra@postgre 424 :UBC 0 : readlen = p - state->overflowbuf + 1;
425 : :
889 tomas.vondra@postgre 426 :CBC 3 : memcpy(ptr, state->overflowbuf, readlen);
427 : 3 : state->overflowlen -= readlen;
428 : :
429 [ + + ]: 3 : if (state->overflowlen > 0)
430 : 2 : memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
431 : :
926 432 : 3 : return readlen;
433 : : }
434 : :
435 : : /*
436 : : * The workhorse for reading decompressed content out of an LZ4 compressed
437 : : * stream.
438 : : *
439 : : * It will read up to 'ptrsize' decompressed content, or up to the new line
440 : : * char if found first when the eol_flag is set. It is possible that the
441 : : * decompressed output generated by reading any compressed input via the
442 : : * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
443 : : * at an overflow buffer within LZ4State. Of course, when the function is
444 : : * called, it will first try to consume any decompressed content already
445 : : * present in the overflow buffer, before decompressing new content.
446 : : *
447 : : * Returns the number of bytes of decompressed data copied into the ptr
448 : : * buffer, or -1 in case of error.
449 : : */
450 : : static int
889 451 : 82 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
452 : : {
898 453 : 82 : int dsize = 0;
454 : : int rsize;
455 : 82 : int size = ptrsize;
926 456 : 82 : bool eol_found = false;
457 : :
458 : : void *readbuf;
459 : :
460 : : /* Lazy init */
889 461 [ - + ]: 82 : if (!LZ4Stream_init(state, size, false /* decompressing */ ))
462 : : {
8 dgustafsson@postgres 463 :UBC 0 : pg_log_error("unable to initialize LZ4 library: %s",
464 : : LZ4F_getErrorName(state->errcode));
926 tomas.vondra@postgre 465 : 0 : return -1;
466 : : }
467 : :
468 : : /* No work needs to be done for a zero-sized output buffer */
843 tomas.vondra@postgre 469 [ - + ]:CBC 82 : if (size <= 0)
843 tomas.vondra@postgre 470 :UBC 0 : return 0;
471 : :
472 : : /* Verify that there is enough space in the outbuf */
889 tomas.vondra@postgre 473 [ - + ]:CBC 82 : if (size > state->buflen)
474 : : {
889 tomas.vondra@postgre 475 :UBC 0 : state->buflen = size;
476 : 0 : state->buffer = pg_realloc(state->buffer, size);
477 : : }
478 : :
479 : : /* use already decompressed content if available */
889 tomas.vondra@postgre 480 :CBC 82 : dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
926 481 [ + + - + : 82 : if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
- - ]
482 : 2 : return dsize;
483 : :
484 : 80 : readbuf = pg_malloc(size);
485 : :
486 : : do
487 : : {
488 : : char *rp;
489 : : char *rend;
490 : :
889 491 : 83 : rsize = fread(readbuf, 1, size, state->fp);
492 [ + + - + ]: 83 : if (rsize < size && !feof(state->fp))
493 : : {
8 dgustafsson@postgres 494 :UBC 0 : pg_log_error("could not read from input file: %m");
926 tomas.vondra@postgre 495 : 0 : return -1;
496 : : }
497 : :
926 tomas.vondra@postgre 498 :CBC 83 : rp = (char *) readbuf;
499 : 83 : rend = (char *) readbuf + rsize;
500 : :
501 [ + + ]: 128 : while (rp < rend)
502 : : {
503 : : size_t status;
889 504 : 45 : size_t outlen = state->buflen;
926 505 : 45 : size_t read_remain = rend - rp;
506 : :
889 507 : 45 : memset(state->buffer, 0, outlen);
508 : 45 : status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
509 : : rp, &read_remain, NULL);
926 510 [ - + ]: 45 : if (LZ4F_isError(status))
511 : : {
889 tomas.vondra@postgre 512 :UBC 0 : state->errcode = status;
8 dgustafsson@postgres 513 : 0 : pg_log_error("could not read from input file: %s",
514 : : LZ4F_getErrorName(state->errcode));
926 tomas.vondra@postgre 515 : 0 : return -1;
516 : : }
517 : :
926 tomas.vondra@postgre 518 :CBC 45 : rp += read_remain;
519 : :
520 : : /*
521 : : * fill in what space is available in ptr if the eol flag is set,
522 : : * either skip if one already found or fill up to EOL if present
523 : : * in the outbuf
524 : : */
525 [ + + + + : 45 : if (outlen > 0 && dsize < size && eol_found == false)
+ - ]
526 : : {
527 : : char *p;
528 [ + - ]: 39 : size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
529 : 39 : size_t len = outlen < lib ? outlen : lib;
530 : :
531 [ - + ]: 39 : if (eol_flag &&
889 tomas.vondra@postgre 532 [ # # ]:UBC 0 : (p = memchr(state->buffer, '\n', outlen)) &&
533 [ # # ]: 0 : (size_t) (p - state->buffer + 1) <= len)
534 : : {
535 : 0 : len = p - state->buffer + 1;
926 536 : 0 : eol_found = true;
537 : : }
538 : :
889 tomas.vondra@postgre 539 :CBC 39 : memcpy((char *) ptr + dsize, state->buffer, len);
926 540 : 39 : dsize += len;
541 : :
542 : : /* move what did not fit, if any, at the beginning of the buf */
543 [ - + ]: 39 : if (len < outlen)
889 tomas.vondra@postgre 544 :UBC 0 : memmove(state->buffer, state->buffer + len, outlen - len);
926 tomas.vondra@postgre 545 :CBC 39 : outlen -= len;
546 : : }
547 : :
548 : : /* if there is available output, save it */
549 [ + + ]: 45 : if (outlen > 0)
550 : : {
889 551 [ + + ]: 5 : while (state->overflowlen + outlen > state->overflowalloclen)
552 : : {
553 : 2 : state->overflowalloclen *= 2;
554 : 2 : state->overflowbuf = pg_realloc(state->overflowbuf,
555 : : state->overflowalloclen);
556 : : }
557 : :
558 : 3 : memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
559 : 3 : state->overflowlen += outlen;
560 : : }
561 : : }
898 562 [ + + + - : 83 : } while (rsize == size && dsize < size && eol_found == false);
+ - ]
563 : :
926 564 : 80 : pg_free(readbuf);
565 : :
898 566 : 80 : return dsize;
567 : : }
568 : :
569 : : /*
570 : : * Compress size bytes from ptr and write them to the stream.
571 : : */
572 : : static void
889 573 : 2377 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
574 : : {
575 : 2377 : LZ4State *state = (LZ4State *) CFH->private_data;
576 : : size_t status;
926 577 : 2377 : int remaining = size;
578 : :
579 : : /* Lazy init */
889 580 [ - + ]: 2377 : if (!LZ4Stream_init(state, size, true))
8 dgustafsson@postgres 581 :UBC 0 : pg_fatal("unable to initialize LZ4 library: %s",
582 : : LZ4F_getErrorName(state->errcode));
583 : :
926 tomas.vondra@postgre 584 [ + + ]:CBC 4760 : while (remaining > 0)
585 : : {
898 586 : 2383 : int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
587 : :
926 588 : 2383 : remaining -= chunk;
589 : :
889 590 : 2383 : status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
591 : : ptr, chunk, NULL);
926 592 [ - + ]: 2383 : if (LZ4F_isError(status))
8 dgustafsson@postgres 593 :UBC 0 : pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
594 : :
8 dgustafsson@postgres 595 :CBC 2383 : errno = 0;
889 tomas.vondra@postgre 596 [ - + ]: 2383 : if (fwrite(state->buffer, 1, status, state->fp) != status)
597 : : {
926 tomas.vondra@postgre 598 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
8 dgustafsson@postgres 599 : 0 : pg_fatal("error during writing: %m");
600 : : }
601 : :
843 tomas.vondra@postgre 602 :CBC 2383 : ptr = ((const char *) ptr) + chunk;
603 : : }
926 604 : 2377 : }
605 : :
606 : : /*
607 : : * fread() equivalent implementation for LZ4 compressed files.
608 : : */
609 : : static size_t
8 dgustafsson@postgres 610 : 82 : LZ4Stream_read(void *ptr, size_t size, CompressFileHandle *CFH)
611 : : {
889 tomas.vondra@postgre 612 : 82 : LZ4State *state = (LZ4State *) CFH->private_data;
613 : : int ret;
614 : :
615 [ - + ]: 82 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
889 tomas.vondra@postgre 616 :UBC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
617 : :
8 dgustafsson@postgres 618 :CBC 82 : return (size_t) ret;
619 : : }
620 : :
621 : : /*
622 : : * fgetc() equivalent implementation for LZ4 compressed files.
623 : : */
624 : : static int
889 tomas.vondra@postgre 625 :UBC 0 : LZ4Stream_getc(CompressFileHandle *CFH)
626 : : {
627 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
628 : : unsigned char c;
629 : :
630 [ # # ]: 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
631 : : {
632 [ # # ]: 0 : if (!LZ4Stream_eof(CFH))
633 : 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
634 : : else
926 635 : 0 : pg_fatal("could not read from input file: end of file");
636 : : }
637 : :
638 : 0 : return c;
639 : : }
640 : :
641 : : /*
642 : : * fgets() equivalent implementation for LZ4 compressed files.
643 : : */
644 : : static char *
889 645 : 0 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
646 : : {
647 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
648 : : int ret;
649 : :
843 650 : 0 : ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
651 : :
652 : : /*
653 : : * LZ4Stream_read_internal returning 0 or -1 means that it was either an
654 : : * EOF or an error, but gets_func is defined to return NULL in either case
655 : : * so we can treat both the same here.
656 : : */
8 dgustafsson@postgres 657 [ # # ]: 0 : if (ret <= 0)
926 tomas.vondra@postgre 658 : 0 : return NULL;
659 : :
660 : : /*
661 : : * Our caller expects the return string to be NULL terminated and we know
662 : : * that ret is greater than zero.
663 : : */
843 664 : 0 : ptr[ret - 1] = '\0';
665 : :
926 666 : 0 : return ptr;
667 : : }
668 : :
669 : : /*
670 : : * Finalize (de)compression of a stream. When compressing it will write any
671 : : * remaining content and/or generated footer from the LZ4 API.
672 : : */
673 : : static bool
889 tomas.vondra@postgre 674 :CBC 81 : LZ4Stream_close(CompressFileHandle *CFH)
675 : : {
676 : : FILE *fp;
677 : 81 : LZ4State *state = (LZ4State *) CFH->private_data;
678 : : size_t status;
679 : : int ret;
680 : :
681 : 81 : fp = state->fp;
682 [ + + ]: 81 : if (state->inited)
683 : : {
684 [ + + ]: 80 : if (state->compressing)
685 : : {
686 : 40 : status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
926 687 [ - + ]: 40 : if (LZ4F_isError(status))
688 : : {
8 dgustafsson@postgres 689 :UBC 0 : pg_log_error("could not end compression: %s",
690 : : LZ4F_getErrorName(status));
691 : : }
692 : : else
693 : : {
8 dgustafsson@postgres 694 :CBC 40 : errno = 0;
695 [ - + ]: 40 : if (fwrite(state->buffer, 1, status, state->fp) != status)
696 : : {
8 dgustafsson@postgres 697 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
698 : 0 : pg_log_error("could not write to output file: %m");
699 : : }
700 : : }
701 : :
889 tomas.vondra@postgre 702 :CBC 40 : status = LZ4F_freeCompressionContext(state->ctx);
926 703 [ - + ]: 40 : if (LZ4F_isError(status))
8 dgustafsson@postgres 704 :UBC 0 : pg_log_error("could not end compression: %s",
705 : : LZ4F_getErrorName(status));
706 : : }
707 : : else
708 : : {
889 tomas.vondra@postgre 709 :CBC 40 : status = LZ4F_freeDecompressionContext(state->dtx);
926 710 [ - + ]: 40 : if (LZ4F_isError(status))
8 dgustafsson@postgres 711 :UBC 0 : pg_log_error("could not end decompression: %s",
712 : : LZ4F_getErrorName(status));
889 tomas.vondra@postgre 713 :CBC 40 : pg_free(state->overflowbuf);
714 : : }
715 : :
716 : 80 : pg_free(state->buffer);
717 : : }
718 : :
719 : 81 : pg_free(state);
8 dgustafsson@postgres 720 : 81 : CFH->private_data = NULL;
721 : :
722 : 81 : errno = 0;
723 : 81 : ret = fclose(fp);
724 [ - + ]: 81 : if (ret != 0)
725 : : {
8 dgustafsson@postgres 726 :UBC 0 : pg_log_error("could not close file: %m");
727 : 0 : return false;
728 : : }
729 : :
8 dgustafsson@postgres 730 :CBC 81 : return true;
731 : : }
732 : :
733 : : static bool
889 tomas.vondra@postgre 734 : 81 : LZ4Stream_open(const char *path, int fd, const char *mode,
735 : : CompressFileHandle *CFH)
736 : : {
737 : 81 : LZ4State *state = (LZ4State *) CFH->private_data;
738 : :
926 739 [ - + ]: 81 : if (fd >= 0)
8 dgustafsson@postgres 740 :UBC 0 : state->fp = fdopen(dup(fd), mode);
741 : : else
8 dgustafsson@postgres 742 :CBC 81 : state->fp = fopen(path, mode);
743 [ - + ]: 81 : if (state->fp == NULL)
744 : : {
889 tomas.vondra@postgre 745 :UBC 0 : state->errcode = errno;
898 746 : 0 : return false;
747 : : }
748 : :
898 tomas.vondra@postgre 749 :CBC 81 : return true;
750 : : }
751 : :
752 : : static bool
889 753 : 40 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
754 : : {
755 : : char *fname;
756 : : int save_errno;
757 : : bool ret;
758 : :
926 759 : 40 : fname = psprintf("%s.lz4", path);
760 : 40 : ret = CFH->open_func(fname, -1, mode, CFH);
761 : :
898 762 : 40 : save_errno = errno;
926 763 : 40 : pg_free(fname);
898 764 : 40 : errno = save_errno;
765 : :
926 766 : 40 : return ret;
767 : : }
768 : :
769 : : /*
770 : : * Public routines
771 : : */
772 : : void
773 : 81 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
774 : : const pg_compress_specification compression_spec)
775 : : {
776 : : LZ4State *state;
777 : :
889 778 : 81 : CFH->open_func = LZ4Stream_open;
779 : 81 : CFH->open_write_func = LZ4Stream_open_write;
780 : 81 : CFH->read_func = LZ4Stream_read;
781 : 81 : CFH->write_func = LZ4Stream_write;
782 : 81 : CFH->gets_func = LZ4Stream_gets;
783 : 81 : CFH->getc_func = LZ4Stream_getc;
784 : 81 : CFH->eof_func = LZ4Stream_eof;
785 : 81 : CFH->close_func = LZ4Stream_close;
786 : 81 : CFH->get_error_func = LZ4Stream_get_error;
787 : :
926 788 : 81 : CFH->compression_spec = compression_spec;
889 789 : 81 : state = pg_malloc0(sizeof(*state));
926 790 [ + - ]: 81 : if (CFH->compression_spec.level >= 0)
889 791 : 81 : state->prefs.compressionLevel = CFH->compression_spec.level;
792 : :
793 : 81 : CFH->private_data = state;
926 794 : 81 : }
795 : : #else /* USE_LZ4 */
796 : : void
797 : : InitCompressorLZ4(CompressorState *cs,
798 : : const pg_compress_specification compression_spec)
799 : : {
800 : : pg_fatal("this build does not support compression with %s", "LZ4");
801 : : }
802 : :
803 : : void
804 : : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
805 : : const pg_compress_specification compression_spec)
806 : : {
807 : : pg_fatal("this build does not support compression with %s", "LZ4");
808 : : }
809 : : #endif /* USE_LZ4 */
|