Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * astreamer_gzip.c
4 : : *
5 : : * Archive streamers that deal with data compressed using gzip.
6 : : * astreamer_gzip_writer applies gzip compression to the input data
7 : : * and writes the result to a file. astreamer_gzip_decompressor assumes
8 : : * that the input stream is compressed using gzip and decompresses it.
9 : : *
10 : : * Note that the code in this file is asymmetric with what we do for
11 : : * other compression types: for lz4 and zstd, there is a compressor and
12 : : * a decompressor, rather than a writer and a decompressor. The approach
13 : : * taken here is less flexible, because a writer can only write to a file,
14 : : * while a compressor can write to a subsequent astreamer which is free
15 : : * to do whatever it likes. The reason it's like this is because this
16 : : * code was adapted from old, less-modular pg_basebackup code that used
17 : : * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 : : * necessary to change anything at the time.
19 : : *
20 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
21 : : *
22 : : * IDENTIFICATION
23 : : * src/fe_utils/astreamer_gzip.c
24 : : *-------------------------------------------------------------------------
25 : : */
26 : :
27 : : #include "postgres_fe.h"
28 : :
29 : : #include <unistd.h>
30 : :
31 : : #ifdef HAVE_LIBZ
32 : : #include <zlib.h>
33 : : #endif
34 : :
35 : : #include "common/logging.h"
36 : : #include "fe_utils/astreamer.h"
37 : :
38 : : #ifdef HAVE_LIBZ
39 : : typedef struct astreamer_gzip_writer
40 : : {
41 : : astreamer base;
42 : : char *pathname;
43 : : gzFile gzfile;
44 : : } astreamer_gzip_writer;
45 : :
46 : : typedef struct astreamer_gzip_decompressor
47 : : {
48 : : astreamer base;
49 : : z_stream zstream;
50 : : size_t bytes_written;
51 : : } astreamer_gzip_decompressor;
52 : :
53 : : static void astreamer_gzip_writer_content(astreamer *streamer,
54 : : astreamer_member *member,
55 : : const char *data, int len,
56 : : astreamer_archive_context context);
57 : : static void astreamer_gzip_writer_finalize(astreamer *streamer);
58 : : static void astreamer_gzip_writer_free(astreamer *streamer);
59 : : static const char *get_gz_error(gzFile gzf);
60 : :
61 : : static const astreamer_ops astreamer_gzip_writer_ops = {
62 : : .content = astreamer_gzip_writer_content,
63 : : .finalize = astreamer_gzip_writer_finalize,
64 : : .free = astreamer_gzip_writer_free
65 : : };
66 : :
67 : : static void astreamer_gzip_decompressor_content(astreamer *streamer,
68 : : astreamer_member *member,
69 : : const char *data, int len,
70 : : astreamer_archive_context context);
71 : : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
72 : : static void astreamer_gzip_decompressor_free(astreamer *streamer);
73 : : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
74 : : static void gzip_pfree(void *opaque, void *address);
75 : :
76 : : static const astreamer_ops astreamer_gzip_decompressor_ops = {
77 : : .content = astreamer_gzip_decompressor_content,
78 : : .finalize = astreamer_gzip_decompressor_finalize,
79 : : .free = astreamer_gzip_decompressor_free
80 : : };
81 : : #endif
82 : :
83 : : /*
84 : : * Create a astreamer that just compresses data using gzip, and then writes
85 : : * it to a file.
86 : : *
87 : : * The caller must specify a pathname and may specify a file. The pathname is
88 : : * used for error-reporting purposes either way. If file is NULL, the pathname
89 : : * also identifies the file to which the data should be written: it is opened
90 : : * for writing and closed when done. If file is not NULL, the data is written
91 : : * there.
92 : : *
93 : : * Note that zlib does not use the FILE interface, but operates directly on
94 : : * a duplicate of the underlying fd. Hence, callers must take care if they
95 : : * plan to write any other data to the same FILE, either before or after using
96 : : * this.
97 : : */
98 : : astreamer *
638 rhaas@postgresql.org 99 :CBC 4 : astreamer_gzip_writer_new(char *pathname, FILE *file,
100 : : pg_compress_specification *compress)
101 : : {
102 : : #ifdef HAVE_LIBZ
103 : : astreamer_gzip_writer *streamer;
104 : :
147 michael@paquier.xyz 105 :GNC 4 : streamer = palloc0_object(astreamer_gzip_writer);
638 rhaas@postgresql.org 106 :CBC 4 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
107 : : &astreamer_gzip_writer_ops;
108 : :
1558 109 : 4 : streamer->pathname = pstrdup(pathname);
110 : :
111 [ + - ]: 4 : if (file == NULL)
112 : : {
113 : 4 : streamer->gzfile = gzopen(pathname, "wb");
114 [ - + ]: 4 : if (streamer->gzfile == NULL)
1488 tgl@sss.pgh.pa.us 115 :UBC 0 : pg_fatal("could not create compressed file \"%s\": %m",
116 : : pathname);
117 : : }
118 : : else
119 : : {
120 : : /*
121 : : * We must dup the file handle so that gzclose doesn't break the
122 : : * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123 : : */
1558 rhaas@postgresql.org 124 : 0 : int fd = dup(fileno(file));
125 : :
126 [ # # ]: 0 : if (fd < 0)
1488 tgl@sss.pgh.pa.us 127 : 0 : pg_fatal("could not duplicate stdout: %m");
128 : :
1558 rhaas@postgresql.org 129 : 0 : streamer->gzfile = gzdopen(fd, "wb");
130 [ # # ]: 0 : if (streamer->gzfile == NULL)
1488 tgl@sss.pgh.pa.us 131 : 0 : pg_fatal("could not open output file: %m");
132 : : }
133 : :
1329 michael@paquier.xyz 134 [ - + ]:CBC 4 : if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
1488 tgl@sss.pgh.pa.us 135 :UBC 0 : pg_fatal("could not set compression level %d: %s",
136 : : compress->level, get_gz_error(streamer->gzfile));
137 : :
1558 rhaas@postgresql.org 138 :CBC 4 : return &streamer->base;
139 : : #else
140 : : pg_fatal("this build does not support compression with %s", "gzip");
141 : : return NULL; /* keep compiler quiet */
142 : : #endif
143 : : }
144 : :
145 : : #ifdef HAVE_LIBZ
146 : : /*
147 : : * Write archive content to gzip file.
148 : : */
149 : : static void
638 150 : 9815 : astreamer_gzip_writer_content(astreamer *streamer,
151 : : astreamer_member *member, const char *data,
152 : : int len, astreamer_archive_context context)
153 : : {
154 : : astreamer_gzip_writer *mystreamer;
155 : :
156 : 9815 : mystreamer = (astreamer_gzip_writer *) streamer;
157 : :
1558 158 [ - + ]: 9815 : if (len == 0)
1558 rhaas@postgresql.org 159 :UBC 0 : return;
160 : :
1558 rhaas@postgresql.org 161 :CBC 9815 : errno = 0;
162 [ - + ]: 9815 : if (gzwrite(mystreamer->gzfile, data, len) != len)
163 : : {
164 : : /* if write didn't set errno, assume problem is no disk space */
1558 rhaas@postgresql.org 165 [ # # ]:UBC 0 : if (errno == 0)
166 : 0 : errno = ENOSPC;
1488 tgl@sss.pgh.pa.us 167 : 0 : pg_fatal("could not write to compressed file \"%s\": %s",
168 : : mystreamer->pathname, get_gz_error(mystreamer->gzfile));
169 : : }
170 : : }
171 : :
172 : : /*
173 : : * End-of-archive processing when writing to a gzip file consists of just
174 : : * calling gzclose.
175 : : *
176 : : * It makes no difference whether we opened the file or the caller did it,
177 : : * because libz provides no way of avoiding a close on the underlying file
178 : : * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 : : * work around this issue, so that the behavior from the caller's viewpoint
180 : : * is the same as for astreamer_plain_writer.
181 : : */
182 : : static void
638 rhaas@postgresql.org 183 :CBC 4 : astreamer_gzip_writer_finalize(astreamer *streamer)
184 : : {
185 : : astreamer_gzip_writer *mystreamer;
186 : :
187 : 4 : mystreamer = (astreamer_gzip_writer *) streamer;
188 : :
1558 189 : 4 : errno = 0; /* in case gzclose() doesn't set it */
190 [ - + ]: 4 : if (gzclose(mystreamer->gzfile) != 0)
1488 tgl@sss.pgh.pa.us 191 :UBC 0 : pg_fatal("could not close compressed file \"%s\": %m",
192 : : mystreamer->pathname);
193 : :
1558 rhaas@postgresql.org 194 :CBC 4 : mystreamer->gzfile = NULL;
195 : 4 : }
196 : :
197 : : /*
198 : : * Free memory associated with this astreamer.
199 : : */
200 : : static void
638 201 : 4 : astreamer_gzip_writer_free(astreamer *streamer)
202 : : {
203 : : astreamer_gzip_writer *mystreamer;
204 : :
205 : 4 : mystreamer = (astreamer_gzip_writer *) streamer;
206 : :
1558 207 [ - + ]: 4 : Assert(mystreamer->base.bbs_next == NULL);
208 [ - + ]: 4 : Assert(mystreamer->gzfile == NULL);
209 : :
210 : 4 : pfree(mystreamer->pathname);
211 : 4 : pfree(mystreamer);
212 : 4 : }
213 : :
214 : : /*
215 : : * Helper function for libz error reporting.
216 : : */
217 : : static const char *
1558 rhaas@postgresql.org 218 :UBC 0 : get_gz_error(gzFile gzf)
219 : : {
220 : : int errnum;
221 : : const char *errmsg;
222 : :
223 : 0 : errmsg = gzerror(gzf, &errnum);
224 [ # # ]: 0 : if (errnum == Z_ERRNO)
225 : 0 : return strerror(errno);
226 : : else
227 : 0 : return errmsg;
228 : : }
229 : : #endif
230 : :
231 : : /*
232 : : * Create a new base backup streamer that performs decompression of gzip
233 : : * compressed blocks.
234 : : */
235 : : astreamer *
638 rhaas@postgresql.org 236 :CBC 21 : astreamer_gzip_decompressor_new(astreamer *next)
237 : : {
238 : : #ifdef HAVE_LIBZ
239 : : astreamer_gzip_decompressor *streamer;
240 : : z_stream *zs;
241 : :
1558 242 [ - + ]: 21 : Assert(next != NULL);
243 : :
147 michael@paquier.xyz 244 :GNC 21 : streamer = palloc0_object(astreamer_gzip_decompressor);
638 rhaas@postgresql.org 245 :CBC 21 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
246 : : &astreamer_gzip_decompressor_ops;
247 : :
1558 248 : 21 : streamer->base.bbs_next = next;
249 : 21 : initStringInfo(&streamer->base.bbs_buffer);
250 : : /* Use a buffer size comparable to the other decompressors */
42 tgl@sss.pgh.pa.us 251 :GNC 21 : enlargeStringInfo(&streamer->base.bbs_buffer, 256 * 1024 - 1);
252 : :
253 : : /* Initialize internal stream state for decompression */
1558 rhaas@postgresql.org 254 :CBC 21 : zs = &streamer->zstream;
255 : 21 : zs->zalloc = gzip_palloc;
256 : 21 : zs->zfree = gzip_pfree;
257 : 21 : zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
258 : 21 : zs->avail_out = streamer->base.bbs_buffer.maxlen;
259 : :
260 : : /*
261 : : * Data compression was initialized using deflateInit2 to request a gzip
262 : : * header. Similarly, we are using inflateInit2 to initialize data
263 : : * decompression.
264 : : *
265 : : * Per the documentation for inflateInit2, the second argument is
266 : : * "windowBits" and its value must be greater than or equal to the value
267 : : * provided while compressing the data, so we are using the maximum
268 : : * possible value for safety.
269 : : */
270 [ - + ]: 21 : if (inflateInit2(zs, 15 + 16) != Z_OK)
1488 tgl@sss.pgh.pa.us 271 :UBC 0 : pg_fatal("could not initialize compression library");
272 : :
1558 rhaas@postgresql.org 273 :CBC 21 : return &streamer->base;
274 : : #else
275 : : pg_fatal("this build does not support compression with %s", "gzip");
276 : : return NULL; /* keep compiler quiet */
277 : : #endif
278 : : }
279 : :
280 : : #ifdef HAVE_LIBZ
281 : : /*
282 : : * Decompress the input data to output buffer until we run out of input
283 : : * data. Each time the output buffer is full, pass on the decompressed data
284 : : * to the next streamer.
285 : : */
286 : : static void
638 287 : 594 : astreamer_gzip_decompressor_content(astreamer *streamer,
288 : : astreamer_member *member,
289 : : const char *data, int len,
290 : : astreamer_archive_context context)
291 : : {
292 : : astreamer_gzip_decompressor *mystreamer;
293 : : z_stream *zs;
294 : :
295 : 594 : mystreamer = (astreamer_gzip_decompressor *) streamer;
296 : :
1558 297 : 594 : zs = &mystreamer->zstream;
1002 peter@eisentraut.org 298 : 594 : zs->next_in = (const uint8 *) data;
1558 rhaas@postgresql.org 299 : 594 : zs->avail_in = len;
300 : :
301 : : /* Process the current chunk */
302 [ + + ]: 5377 : while (zs->avail_in > 0)
303 : : {
304 : : int res;
305 : :
306 [ - + ]: 4189 : Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
307 : :
308 : 4189 : zs->next_out = (uint8 *)
309 : 4189 : mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
310 : 4189 : zs->avail_out =
311 : 4189 : mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
312 : :
313 : : /*
314 : : * This call decompresses data starting at zs->next_in and updates
315 : : * zs->next_in * and zs->avail_in. It generates output data starting
316 : : * at zs->next_out and updates zs->next_out and zs->avail_out
317 : : * accordingly.
318 : : */
319 : 4189 : res = inflate(zs, Z_NO_FLUSH);
320 : :
43 andrew@dunslane.net 321 [ + + - + : 4189 : if (res != Z_OK && res != Z_STREAM_END && res != Z_BUF_ERROR)
- - ]
43 andrew@dunslane.net 322 [ # # ]:UBC 0 : pg_fatal("could not decompress data: %s",
323 : : zs->msg ? zs->msg : "unknown error");
324 : :
1558 rhaas@postgresql.org 325 :CBC 4189 : mystreamer->bytes_written =
326 : 4189 : mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
327 : :
328 : : /* If output buffer is full then pass data to next streamer */
329 [ + + ]: 4189 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
330 : : {
638 331 : 3595 : astreamer_content(mystreamer->base.bbs_next, member,
332 : 3595 : mystreamer->base.bbs_buffer.data,
333 : : mystreamer->base.bbs_buffer.maxlen, context);
1558 334 : 3595 : mystreamer->bytes_written = 0;
335 : : }
336 : : }
337 : 594 : }
338 : :
339 : : /*
340 : : * End-of-stream processing.
341 : : */
342 : : static void
638 343 : 4 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
344 : : {
345 : : astreamer_gzip_decompressor *mystreamer;
346 : :
347 : 4 : mystreamer = (astreamer_gzip_decompressor *) streamer;
348 : :
349 : : /*
350 : : * End of the stream, if there is some pending data in output buffers then
351 : : * we must forward it to next streamer.
352 : : */
44 tgl@sss.pgh.pa.us 353 [ + - ]: 4 : if (mystreamer->bytes_written > 0)
354 : 4 : astreamer_content(mystreamer->base.bbs_next, NULL,
355 : 4 : mystreamer->base.bbs_buffer.data,
356 : 4 : mystreamer->bytes_written,
357 : : ASTREAMER_UNKNOWN);
358 : :
638 rhaas@postgresql.org 359 : 4 : astreamer_finalize(mystreamer->base.bbs_next);
1558 360 : 4 : }
361 : :
362 : : /*
363 : : * Free memory.
364 : : */
365 : : static void
638 366 : 19 : astreamer_gzip_decompressor_free(astreamer *streamer)
367 : : {
368 : : astreamer_gzip_decompressor *mystreamer;
369 : :
43 andrew@dunslane.net 370 : 19 : mystreamer = (astreamer_gzip_decompressor *) streamer;
371 : :
638 rhaas@postgresql.org 372 : 19 : astreamer_free(streamer->bbs_next);
43 andrew@dunslane.net 373 : 19 : inflateEnd(&mystreamer->zstream);
1558 rhaas@postgresql.org 374 : 19 : pfree(streamer->bbs_buffer.data);
375 : 19 : pfree(streamer);
376 : 19 : }
377 : :
378 : : /*
379 : : * Wrapper function to adjust the signature of palloc to match what libz
380 : : * expects.
381 : : */
382 : : static void *
383 : 41 : gzip_palloc(void *opaque, unsigned items, unsigned size)
384 : : {
385 : 41 : return palloc(items * size);
386 : : }
387 : :
388 : : /*
389 : : * Wrapper function to adjust the signature of pfree to match what libz
390 : : * expects.
391 : : */
392 : : static void
393 : 37 : gzip_pfree(void *opaque, void *address)
394 : : {
395 : 37 : pfree(address);
396 : 37 : }
397 : : #endif
|