Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * local_source.c
4 : : * Functions for using a local data directory as the source.
5 : : *
6 : : * Portions Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : : *
8 : : *-------------------------------------------------------------------------
9 : : */
10 : : #include "postgres_fe.h"
11 : :
12 : : #include <fcntl.h>
13 : : #include <unistd.h>
14 : :
15 : : #include "common/logging.h"
16 : : #include "file_ops.h"
17 : : #include "rewind_source.h"
18 : :
19 : : typedef struct
20 : : {
21 : : rewind_source common; /* common interface functions */
22 : :
23 : : const char *datadir; /* path to the source data directory */
24 : : } local_source;
25 : :
26 : : static void local_traverse_files(rewind_source *source,
27 : : process_file_callback_t callback);
28 : : static char *local_fetch_file(rewind_source *source, const char *path,
29 : : size_t *filesize);
30 : : static void local_queue_fetch_file(rewind_source *source, const char *path,
31 : : size_t len);
32 : : static void local_queue_fetch_range(rewind_source *source, const char *path,
33 : : off_t off, size_t len);
34 : : static void local_finish_fetch(rewind_source *source);
35 : : static void local_destroy(rewind_source *source);
36 : :
37 : : rewind_source *
1767 heikki.linnakangas@i 38 :CBC 12 : init_local_source(const char *datadir)
39 : : {
40 : : local_source *src;
41 : :
42 : 12 : src = pg_malloc0(sizeof(local_source));
43 : :
44 : 12 : src->common.traverse_files = local_traverse_files;
45 : 12 : src->common.fetch_file = local_fetch_file;
1250 dgustafsson@postgres 46 : 12 : src->common.queue_fetch_file = local_queue_fetch_file;
47 : 12 : src->common.queue_fetch_range = local_queue_fetch_range;
1767 heikki.linnakangas@i 48 : 12 : src->common.finish_fetch = local_finish_fetch;
49 : 12 : src->common.get_current_wal_insert_lsn = NULL;
50 : 12 : src->common.destroy = local_destroy;
51 : :
52 : 12 : src->datadir = datadir;
53 : :
54 : 12 : return &src->common;
55 : : }
56 : :
57 : : static void
58 : 8 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
59 : : {
858 dgustafsson@postgres 60 : 8 : traverse_datadir(((local_source *) source)->datadir, callback);
1767 heikki.linnakangas@i 61 : 8 : }
62 : :
63 : : static char *
64 : 26 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
65 : : {
66 : 26 : return slurpFile(((local_source *) source)->datadir, path, filesize);
67 : : }
68 : :
69 : : /*
70 : : * Copy a file from source to target.
71 : : *
72 : : * 'len' is the expected length of the file.
73 : : */
74 : : static void
1250 dgustafsson@postgres 75 : 2479 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
76 : : {
77 : 2479 : const char *datadir = ((local_source *) source)->datadir;
78 : : PGIOAlignedBlock buf;
79 : : char srcpath[MAXPGPATH];
80 : : int srcfd;
81 : : size_t written_len;
82 : :
83 : 2479 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
84 : :
85 : : /* Open source file for reading */
86 : 2479 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
87 [ - + ]: 2479 : if (srcfd < 0)
1250 dgustafsson@postgres 88 :UBC 0 : pg_fatal("could not open source file \"%s\": %m",
89 : : srcpath);
90 : :
91 : : /* Truncate and open the target file for writing */
1250 dgustafsson@postgres 92 :CBC 2479 : open_target_file(path, true);
93 : :
94 : 2479 : written_len = 0;
95 : : for (;;)
96 : 58453 : {
97 : : ssize_t read_len;
98 : :
99 : 60932 : read_len = read(srcfd, buf.data, sizeof(buf));
100 : :
101 [ - + ]: 60932 : if (read_len < 0)
1250 dgustafsson@postgres 102 :UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
1250 dgustafsson@postgres 103 [ + + ]:CBC 60932 : else if (read_len == 0)
104 : 2479 : break; /* EOF reached */
105 : :
106 : 58453 : write_target_range(buf.data, written_len, read_len);
107 : 58453 : written_len += read_len;
108 : : }
109 : :
110 : : /*
111 : : * A local source is not expected to change while we're rewinding, so
112 : : * check that the size of the file matches our earlier expectation.
113 : : */
114 [ + + ]: 2479 : if (written_len != len)
115 : 1 : pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
116 : : srcpath, (int) len, (int) written_len);
117 : :
118 [ - + ]: 2478 : if (close(srcfd) != 0)
1250 dgustafsson@postgres 119 :UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
1250 dgustafsson@postgres 120 :CBC 2478 : }
121 : :
122 : : /*
123 : : * Copy a file from source to target, starting at 'off', for 'len' bytes.
124 : : */
125 : : static void
126 : 889 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
127 : : size_t len)
128 : : {
1767 heikki.linnakangas@i 129 : 889 : const char *datadir = ((local_source *) source)->datadir;
130 : : PGIOAlignedBlock buf;
131 : : char srcpath[MAXPGPATH];
132 : : int srcfd;
133 : 889 : off_t begin = off;
134 : 889 : off_t end = off + len;
135 : :
136 : 889 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
137 : :
138 : 889 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
139 [ - + ]: 889 : if (srcfd < 0)
1767 heikki.linnakangas@i 140 :UBC 0 : pg_fatal("could not open source file \"%s\": %m",
141 : : srcpath);
142 : :
1767 heikki.linnakangas@i 143 [ - + ]:CBC 889 : if (lseek(srcfd, begin, SEEK_SET) == -1)
1767 heikki.linnakangas@i 144 :UBC 0 : pg_fatal("could not seek in source file: %m");
145 : :
1767 heikki.linnakangas@i 146 :CBC 889 : open_target_file(path, false);
147 : :
148 [ + + ]: 2024 : while (end - begin > 0)
149 : : {
150 : : ssize_t readlen;
151 : : size_t thislen;
152 : :
153 [ + + ]: 1135 : if (end - begin > sizeof(buf))
1250 dgustafsson@postgres 154 : 246 : thislen = sizeof(buf);
155 : : else
156 : 889 : thislen = end - begin;
157 : :
158 : 1135 : readlen = read(srcfd, buf.data, thislen);
159 : :
1767 heikki.linnakangas@i 160 [ - + ]: 1135 : if (readlen < 0)
1767 heikki.linnakangas@i 161 :UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
1767 heikki.linnakangas@i 162 [ - + ]:CBC 1135 : else if (readlen == 0)
1767 heikki.linnakangas@i 163 :UBC 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
164 : :
1767 heikki.linnakangas@i 165 :CBC 1135 : write_target_range(buf.data, begin, readlen);
166 : 1135 : begin += readlen;
167 : : }
168 : :
169 [ - + ]: 889 : if (close(srcfd) != 0)
1767 heikki.linnakangas@i 170 :UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
1767 heikki.linnakangas@i 171 :CBC 889 : }
172 : :
173 : : static void
174 : 7 : local_finish_fetch(rewind_source *source)
175 : : {
176 : : /*
177 : : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
178 : : */
179 : 7 : }
180 : :
181 : : static void
182 : 7 : local_destroy(rewind_source *source)
183 : : {
184 : 7 : pfree(source);
185 : 7 : }
|