Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * basebackup_throttle.c
4 : : * Basebackup sink implementing throttling. Data is forwarded to the
5 : : * next base backup sink in the chain at a rate no greater than the
6 : : * configured maximum.
7 : : *
8 : : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/backup/basebackup_throttle.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include "backup/basebackup_sink.h"
18 : : #include "miscadmin.h"
19 : : #include "pgstat.h"
20 : : #include "storage/latch.h"
21 : : #include "utils/timestamp.h"
22 : : #include "utils/wait_event.h"
23 : :
24 : : typedef struct bbsink_throttle
25 : : {
26 : : /* Common information for all types of sink. */
27 : : bbsink base;
28 : :
29 : : /* The actual number of bytes, transfer of which may cause sleep. */
30 : : uint64 throttling_sample;
31 : :
32 : : /* Amount of data already transferred but not yet throttled. */
33 : : int64 throttling_counter;
34 : :
35 : : /* The minimum time required to transfer throttling_sample bytes. */
36 : : TimeOffset elapsed_min_unit;
37 : :
38 : : /* The last check of the transfer rate. */
39 : : TimestampTz throttled_last;
40 : : } bbsink_throttle;
41 : :
42 : : static void bbsink_throttle_begin_backup(bbsink *sink);
43 : : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
44 : : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
45 : : static void throttle(bbsink_throttle *sink, size_t increment);
46 : :
47 : : static const bbsink_ops bbsink_throttle_ops = {
48 : : .begin_backup = bbsink_throttle_begin_backup,
49 : : .begin_archive = bbsink_forward_begin_archive,
50 : : .archive_contents = bbsink_throttle_archive_contents,
51 : : .end_archive = bbsink_forward_end_archive,
52 : : .begin_manifest = bbsink_forward_begin_manifest,
53 : : .manifest_contents = bbsink_throttle_manifest_contents,
54 : : .end_manifest = bbsink_forward_end_manifest,
55 : : .end_backup = bbsink_forward_end_backup,
56 : : .cleanup = bbsink_forward_cleanup
57 : : };
58 : :
59 : : /*
60 : : * How frequently to throttle, as a fraction of the specified rate-second.
61 : : */
62 : : #define THROTTLING_FREQUENCY 8
63 : :
64 : : /*
65 : : * Create a new basebackup sink that performs throttling and forwards data
66 : : * to a successor sink.
67 : : */
68 : : bbsink *
1591 rhaas@postgresql.org 69 :CBC 1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
70 : : {
71 : : bbsink_throttle *sink;
72 : :
73 [ - + ]: 1 : Assert(next != NULL);
74 [ - + ]: 1 : Assert(maxrate > 0);
75 : :
95 michael@paquier.xyz 76 :GNC 1 : sink = palloc0_object(bbsink_throttle);
1591 rhaas@postgresql.org 77 :CBC 1 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
78 : 1 : sink->base.bbs_next = next;
79 : :
80 : 1 : sink->throttling_sample =
81 : 1 : (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
82 : :
83 : : /*
84 : : * The minimum amount of time for throttling_sample bytes to be
85 : : * transferred.
86 : : */
87 : 1 : sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
88 : :
89 : 1 : return &sink->base;
90 : : }
91 : :
92 : : /*
93 : : * There's no real work to do here, but we need to record the current time so
94 : : * that it can be used for future calculations.
95 : : */
96 : : static void
97 : 1 : bbsink_throttle_begin_backup(bbsink *sink)
98 : : {
99 : 1 : bbsink_throttle *mysink = (bbsink_throttle *) sink;
100 : :
101 : 1 : bbsink_forward_begin_backup(sink);
102 : :
103 : : /* The 'real data' starts now (header was ignored). */
104 : 1 : mysink->throttled_last = GetCurrentTimestamp();
105 : 1 : }
106 : :
107 : : /*
108 : : * First throttle, and then pass archive contents to next sink.
109 : : */
110 : : static void
111 : 26 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
112 : : {
113 : 26 : throttle((bbsink_throttle *) sink, len);
114 : :
115 : 26 : bbsink_forward_archive_contents(sink, len);
116 : 26 : }
117 : :
118 : : /*
119 : : * First throttle, and then pass manifest contents to next sink.
120 : : */
121 : : static void
1591 rhaas@postgresql.org 122 :UBC 0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
123 : : {
124 : 0 : throttle((bbsink_throttle *) sink, len);
125 : :
1581 126 : 0 : bbsink_forward_manifest_contents(sink, len);
1591 127 : 0 : }
128 : :
129 : : /*
130 : : * Increment the network transfer counter by the given number of bytes,
131 : : * and sleep if necessary to comply with the requested network transfer
132 : : * rate.
133 : : */
134 : : static void
1591 rhaas@postgresql.org 135 :CBC 26 : throttle(bbsink_throttle *sink, size_t increment)
136 : : {
137 : : TimeOffset elapsed_min;
138 : :
139 [ - + ]: 26 : Assert(sink->throttling_counter >= 0);
140 : :
141 : 26 : sink->throttling_counter += increment;
142 [ + + ]: 26 : if (sink->throttling_counter < sink->throttling_sample)
143 : 22 : return;
144 : :
145 : : /* How much time should have elapsed at minimum? */
146 : 4 : elapsed_min = sink->elapsed_min_unit *
147 : 4 : (sink->throttling_counter / sink->throttling_sample);
148 : :
149 : : /*
150 : : * Since the latch could be set repeatedly because of concurrently WAL
151 : : * activity, sleep in a loop to ensure enough time has passed.
152 : : */
153 : : for (;;)
1591 rhaas@postgresql.org 154 :UBC 0 : {
155 : : TimeOffset elapsed,
156 : : sleep;
157 : : int wait_result;
158 : :
159 : : /* Time elapsed since the last measurement (and possible wake up). */
1591 rhaas@postgresql.org 160 :CBC 4 : elapsed = GetCurrentTimestamp() - sink->throttled_last;
161 : :
162 : : /* sleep if the transfer is faster than it should be */
163 : 4 : sleep = elapsed_min - elapsed;
164 [ - + ]: 4 : if (sleep <= 0)
1591 rhaas@postgresql.org 165 :UBC 0 : break;
166 : :
1591 rhaas@postgresql.org 167 :CBC 4 : ResetLatch(MyLatch);
168 : :
169 : : /* We're eating a potentially set latch, so check for interrupts */
170 [ - + ]: 4 : CHECK_FOR_INTERRUPTS();
171 : :
172 : : /*
173 : : * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
174 : : * the maximum time to sleep. Thus the cast to long is safe.
175 : : */
176 : 4 : wait_result = WaitLatch(MyLatch,
177 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
178 : 4 : (long) (sleep / 1000),
179 : : WAIT_EVENT_BASE_BACKUP_THROTTLE);
180 : :
181 [ - + ]: 4 : if (wait_result & WL_LATCH_SET)
1591 rhaas@postgresql.org 182 [ # # ]:UBC 0 : CHECK_FOR_INTERRUPTS();
183 : :
184 : : /* Done waiting? */
1591 rhaas@postgresql.org 185 [ + - ]:CBC 4 : if (wait_result & WL_TIMEOUT)
186 : 4 : break;
187 : : }
188 : :
189 : : /*
190 : : * As we work with integers, only whole multiple of throttling_sample was
191 : : * processed. The rest will be done during the next call of this function.
192 : : */
193 : 4 : sink->throttling_counter %= sink->throttling_sample;
194 : :
195 : : /*
196 : : * Time interval for the remaining amount and possible next increments
197 : : * starts now.
198 : : */
199 : 4 : sink->throttled_last = GetCurrentTimestamp();
200 : : }
|