Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * astreamer_gzip.c
4 : : *
5 : : * Archive streamers that deal with data compressed using gzip.
6 : : * astreamer_gzip_writer applies gzip compression to the input data
7 : : * and writes the result to a file. astreamer_gzip_decompressor assumes
8 : : * that the input stream is compressed using gzip and decompresses it.
9 : : *
10 : : * Note that the code in this file is asymmetric with what we do for
11 : : * other compression types: for lz4 and zstd, there is a compressor and
12 : : * a decompressor, rather than a writer and a decompressor. The approach
13 : : * taken here is less flexible, because a writer can only write to a file,
14 : : * while a compressor can write to a subsequent astreamer which is free
15 : : * to do whatever it likes. The reason it's like this is because this
16 : : * code was adapted from old, less-modular pg_basebackup code that used
17 : : * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 : : * necessary to change anything at the time.
19 : : *
20 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
21 : : *
22 : : * IDENTIFICATION
23 : : * src/fe_utils/astreamer_gzip.c
24 : : *-------------------------------------------------------------------------
25 : : */
26 : :
27 : : #include "postgres_fe.h"
28 : :
29 : : #include <unistd.h>
30 : :
31 : : #ifdef HAVE_LIBZ
32 : : #include <zlib.h>
33 : : #endif
34 : :
35 : : #include "common/logging.h"
36 : : #include "fe_utils/astreamer.h"
37 : :
38 : : #ifdef HAVE_LIBZ
39 : : typedef struct astreamer_gzip_writer
40 : : {
41 : : astreamer base;
42 : : char *pathname;
43 : : gzFile gzfile;
44 : : } astreamer_gzip_writer;
45 : :
46 : : typedef struct astreamer_gzip_decompressor
47 : : {
48 : : astreamer base;
49 : : z_stream zstream;
50 : : size_t bytes_written;
51 : : } astreamer_gzip_decompressor;
52 : :
53 : : static void astreamer_gzip_writer_content(astreamer *streamer,
54 : : astreamer_member *member,
55 : : const char *data, int len,
56 : : astreamer_archive_context context);
57 : : static void astreamer_gzip_writer_finalize(astreamer *streamer);
58 : : static void astreamer_gzip_writer_free(astreamer *streamer);
59 : : static const char *get_gz_error(gzFile gzf);
60 : :
61 : : static const astreamer_ops astreamer_gzip_writer_ops = {
62 : : .content = astreamer_gzip_writer_content,
63 : : .finalize = astreamer_gzip_writer_finalize,
64 : : .free = astreamer_gzip_writer_free
65 : : };
66 : :
67 : : static void astreamer_gzip_decompressor_content(astreamer *streamer,
68 : : astreamer_member *member,
69 : : const char *data, int len,
70 : : astreamer_archive_context context);
71 : : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
72 : : static void astreamer_gzip_decompressor_free(astreamer *streamer);
73 : : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
74 : : static void gzip_pfree(void *opaque, void *address);
75 : :
76 : : static const astreamer_ops astreamer_gzip_decompressor_ops = {
77 : : .content = astreamer_gzip_decompressor_content,
78 : : .finalize = astreamer_gzip_decompressor_finalize,
79 : : .free = astreamer_gzip_decompressor_free
80 : : };
81 : : #endif
82 : :
83 : : /*
84 : : * Create a astreamer that just compresses data using gzip, and then writes
85 : : * it to a file.
86 : : *
87 : : * The caller must specify a pathname and may specify a file. The pathname is
88 : : * used for error-reporting purposes either way. If file is NULL, the pathname
89 : : * also identifies the file to which the data should be written: it is opened
90 : : * for writing and closed when done. If file is not NULL, the data is written
91 : : * there.
92 : : *
93 : : * Note that zlib does not use the FILE interface, but operates directly on
94 : : * a duplicate of the underlying fd. Hence, callers must take care if they
95 : : * plan to write any other data to the same FILE, either before or after using
96 : : * this.
97 : : */
98 : : astreamer *
397 rhaas@postgresql.org 99 :CBC 4 : astreamer_gzip_writer_new(char *pathname, FILE *file,
100 : : pg_compress_specification *compress)
101 : : {
102 : : #ifdef HAVE_LIBZ
103 : : astreamer_gzip_writer *streamer;
104 : :
105 : 4 : streamer = palloc0(sizeof(astreamer_gzip_writer));
106 : 4 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
107 : : &astreamer_gzip_writer_ops;
108 : :
1317 109 : 4 : streamer->pathname = pstrdup(pathname);
110 : :
111 [ + - ]: 4 : if (file == NULL)
112 : : {
113 : 4 : streamer->gzfile = gzopen(pathname, "wb");
114 [ - + ]: 4 : if (streamer->gzfile == NULL)
1247 tgl@sss.pgh.pa.us 115 :UBC 0 : pg_fatal("could not create compressed file \"%s\": %m",
116 : : pathname);
117 : : }
118 : : else
119 : : {
120 : : /*
121 : : * We must dup the file handle so that gzclose doesn't break the
122 : : * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123 : : */
1317 rhaas@postgresql.org 124 : 0 : int fd = dup(fileno(file));
125 : :
126 [ # # ]: 0 : if (fd < 0)
1247 tgl@sss.pgh.pa.us 127 : 0 : pg_fatal("could not duplicate stdout: %m");
128 : :
1317 rhaas@postgresql.org 129 : 0 : streamer->gzfile = gzdopen(fd, "wb");
130 [ # # ]: 0 : if (streamer->gzfile == NULL)
1247 tgl@sss.pgh.pa.us 131 : 0 : pg_fatal("could not open output file: %m");
132 : : }
133 : :
1088 michael@paquier.xyz 134 [ - + ]:CBC 4 : if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
1247 tgl@sss.pgh.pa.us 135 :UBC 0 : pg_fatal("could not set compression level %d: %s",
136 : : compress->level, get_gz_error(streamer->gzfile));
137 : :
1317 rhaas@postgresql.org 138 :CBC 4 : return &streamer->base;
139 : : #else
140 : : pg_fatal("this build does not support compression with %s", "gzip");
141 : : return NULL; /* keep compiler quiet */
142 : : #endif
143 : : }
144 : :
145 : : #ifdef HAVE_LIBZ
146 : : /*
147 : : * Write archive content to gzip file.
148 : : */
149 : : static void
397 150 : 9371 : astreamer_gzip_writer_content(astreamer *streamer,
151 : : astreamer_member *member, const char *data,
152 : : int len, astreamer_archive_context context)
153 : : {
154 : : astreamer_gzip_writer *mystreamer;
155 : :
156 : 9371 : mystreamer = (astreamer_gzip_writer *) streamer;
157 : :
1317 158 [ - + ]: 9371 : if (len == 0)
1317 rhaas@postgresql.org 159 :UBC 0 : return;
160 : :
1317 rhaas@postgresql.org 161 :CBC 9371 : errno = 0;
162 [ - + ]: 9371 : if (gzwrite(mystreamer->gzfile, data, len) != len)
163 : : {
164 : : /* if write didn't set errno, assume problem is no disk space */
1317 rhaas@postgresql.org 165 [ # # ]:UBC 0 : if (errno == 0)
166 : 0 : errno = ENOSPC;
1247 tgl@sss.pgh.pa.us 167 : 0 : pg_fatal("could not write to compressed file \"%s\": %s",
168 : : mystreamer->pathname, get_gz_error(mystreamer->gzfile));
169 : : }
170 : : }
171 : :
172 : : /*
173 : : * End-of-archive processing when writing to a gzip file consists of just
174 : : * calling gzclose.
175 : : *
176 : : * It makes no difference whether we opened the file or the caller did it,
177 : : * because libz provides no way of avoiding a close on the underlying file
178 : : * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 : : * work around this issue, so that the behavior from the caller's viewpoint
180 : : * is the same as for astreamer_plain_writer.
181 : : */
182 : : static void
397 rhaas@postgresql.org 183 :CBC 4 : astreamer_gzip_writer_finalize(astreamer *streamer)
184 : : {
185 : : astreamer_gzip_writer *mystreamer;
186 : :
187 : 4 : mystreamer = (astreamer_gzip_writer *) streamer;
188 : :
1317 189 : 4 : errno = 0; /* in case gzclose() doesn't set it */
190 [ - + ]: 4 : if (gzclose(mystreamer->gzfile) != 0)
1247 tgl@sss.pgh.pa.us 191 :UBC 0 : pg_fatal("could not close compressed file \"%s\": %m",
192 : : mystreamer->pathname);
193 : :
1317 rhaas@postgresql.org 194 :CBC 4 : mystreamer->gzfile = NULL;
195 : 4 : }
196 : :
197 : : /*
198 : : * Free memory associated with this astreamer.
199 : : */
200 : : static void
397 201 : 4 : astreamer_gzip_writer_free(astreamer *streamer)
202 : : {
203 : : astreamer_gzip_writer *mystreamer;
204 : :
205 : 4 : mystreamer = (astreamer_gzip_writer *) streamer;
206 : :
1317 207 [ - + ]: 4 : Assert(mystreamer->base.bbs_next == NULL);
208 [ - + ]: 4 : Assert(mystreamer->gzfile == NULL);
209 : :
210 : 4 : pfree(mystreamer->pathname);
211 : 4 : pfree(mystreamer);
212 : 4 : }
213 : :
214 : : /*
215 : : * Helper function for libz error reporting.
216 : : */
217 : : static const char *
1317 rhaas@postgresql.org 218 :UBC 0 : get_gz_error(gzFile gzf)
219 : : {
220 : : int errnum;
221 : : const char *errmsg;
222 : :
223 : 0 : errmsg = gzerror(gzf, &errnum);
224 [ # # ]: 0 : if (errnum == Z_ERRNO)
225 : 0 : return strerror(errno);
226 : : else
227 : 0 : return errmsg;
228 : : }
229 : : #endif
230 : :
231 : : /*
232 : : * Create a new base backup streamer that performs decompression of gzip
233 : : * compressed blocks.
234 : : */
235 : : astreamer *
397 rhaas@postgresql.org 236 :CBC 4 : astreamer_gzip_decompressor_new(astreamer *next)
237 : : {
238 : : #ifdef HAVE_LIBZ
239 : : astreamer_gzip_decompressor *streamer;
240 : : z_stream *zs;
241 : :
1317 242 [ - + ]: 4 : Assert(next != NULL);
243 : :
397 244 : 4 : streamer = palloc0(sizeof(astreamer_gzip_decompressor));
245 : 4 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
246 : : &astreamer_gzip_decompressor_ops;
247 : :
1317 248 : 4 : streamer->base.bbs_next = next;
249 : 4 : initStringInfo(&streamer->base.bbs_buffer);
250 : :
251 : : /* Initialize internal stream state for decompression */
252 : 4 : zs = &streamer->zstream;
253 : 4 : zs->zalloc = gzip_palloc;
254 : 4 : zs->zfree = gzip_pfree;
255 : 4 : zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
256 : 4 : zs->avail_out = streamer->base.bbs_buffer.maxlen;
257 : :
258 : : /*
259 : : * Data compression was initialized using deflateInit2 to request a gzip
260 : : * header. Similarly, we are using inflateInit2 to initialize data
261 : : * decompression.
262 : : *
263 : : * Per the documentation for inflateInit2, the second argument is
264 : : * "windowBits" and its value must be greater than or equal to the value
265 : : * provided while compressing the data, so we are using the maximum
266 : : * possible value for safety.
267 : : */
268 [ - + ]: 4 : if (inflateInit2(zs, 15 + 16) != Z_OK)
1247 tgl@sss.pgh.pa.us 269 :UBC 0 : pg_fatal("could not initialize compression library");
270 : :
1317 rhaas@postgresql.org 271 :CBC 4 : return &streamer->base;
272 : : #else
273 : : pg_fatal("this build does not support compression with %s", "gzip");
274 : : return NULL; /* keep compiler quiet */
275 : : #endif
276 : : }
277 : :
278 : : #ifdef HAVE_LIBZ
279 : : /*
280 : : * Decompress the input data to output buffer until we run out of input
281 : : * data. Each time the output buffer is full, pass on the decompressed data
282 : : * to the next streamer.
283 : : */
284 : : static void
397 285 : 158 : astreamer_gzip_decompressor_content(astreamer *streamer,
286 : : astreamer_member *member,
287 : : const char *data, int len,
288 : : astreamer_archive_context context)
289 : : {
290 : : astreamer_gzip_decompressor *mystreamer;
291 : : z_stream *zs;
292 : :
293 : 158 : mystreamer = (astreamer_gzip_decompressor *) streamer;
294 : :
1317 295 : 158 : zs = &mystreamer->zstream;
761 peter@eisentraut.org 296 : 158 : zs->next_in = (const uint8 *) data;
1317 rhaas@postgresql.org 297 : 158 : zs->avail_in = len;
298 : :
299 : : /* Process the current chunk */
300 [ + + ]: 123382 : while (zs->avail_in > 0)
301 : : {
302 : : int res;
303 : :
304 [ - + ]: 123066 : Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
305 : :
306 : 123066 : zs->next_out = (uint8 *)
307 : 123066 : mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
308 : 123066 : zs->avail_out =
309 : 123066 : mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
310 : :
311 : : /*
312 : : * This call decompresses data starting at zs->next_in and updates
313 : : * zs->next_in * and zs->avail_in. It generates output data starting
314 : : * at zs->next_out and updates zs->next_out and zs->avail_out
315 : : * accordingly.
316 : : */
317 : 123066 : res = inflate(zs, Z_NO_FLUSH);
318 : :
319 [ - + ]: 123066 : if (res == Z_STREAM_ERROR)
1317 rhaas@postgresql.org 320 :UBC 0 : pg_log_error("could not decompress data: %s", zs->msg);
321 : :
1317 rhaas@postgresql.org 322 :CBC 123066 : mystreamer->bytes_written =
323 : 123066 : mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
324 : :
325 : : /* If output buffer is full then pass data to next streamer */
326 [ + + ]: 123066 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
327 : : {
397 328 : 122912 : astreamer_content(mystreamer->base.bbs_next, member,
329 : 122912 : mystreamer->base.bbs_buffer.data,
330 : : mystreamer->base.bbs_buffer.maxlen, context);
1317 331 : 122912 : mystreamer->bytes_written = 0;
332 : : }
333 : : }
334 : 158 : }
335 : :
336 : : /*
337 : : * End-of-stream processing.
338 : : */
339 : : static void
397 340 : 4 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
341 : : {
342 : : astreamer_gzip_decompressor *mystreamer;
343 : :
344 : 4 : mystreamer = (astreamer_gzip_decompressor *) streamer;
345 : :
346 : : /*
347 : : * End of the stream, if there is some pending data in output buffers then
348 : : * we must forward it to next streamer.
349 : : */
350 : 4 : astreamer_content(mystreamer->base.bbs_next, NULL,
351 : 4 : mystreamer->base.bbs_buffer.data,
352 : : mystreamer->base.bbs_buffer.maxlen,
353 : : ASTREAMER_UNKNOWN);
354 : :
355 : 4 : astreamer_finalize(mystreamer->base.bbs_next);
1317 356 : 4 : }
357 : :
358 : : /*
359 : : * Free memory.
360 : : */
361 : : static void
397 362 : 4 : astreamer_gzip_decompressor_free(astreamer *streamer)
363 : : {
364 : 4 : astreamer_free(streamer->bbs_next);
1317 365 : 4 : pfree(streamer->bbs_buffer.data);
366 : 4 : pfree(streamer);
367 : 4 : }
368 : :
369 : : /*
370 : : * Wrapper function to adjust the signature of palloc to match what libz
371 : : * expects.
372 : : */
373 : : static void *
374 : 8 : gzip_palloc(void *opaque, unsigned items, unsigned size)
375 : : {
376 : 8 : return palloc(items * size);
377 : : }
378 : :
379 : : /*
380 : : * Wrapper function to adjust the signature of pfree to match what libz
381 : : * expects.
382 : : */
383 : : static void
1317 rhaas@postgresql.org 384 :UBC 0 : gzip_pfree(void *opaque, void *address)
385 : : {
386 : 0 : pfree(address);
387 : 0 : }
388 : : #endif
|