Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiver.h
4 : : * Exports from replication/walreceiverfuncs.c.
5 : : *
6 : : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7 : : *
8 : : * src/include/replication/walreceiver.h
9 : : *
10 : : *-------------------------------------------------------------------------
11 : : */
12 : : #ifndef _WALRECEIVER_H
13 : : #define _WALRECEIVER_H
14 : :
15 : : #include <netdb.h>
16 : :
17 : : #include "access/xlog.h"
18 : : #include "access/xlogdefs.h"
19 : : #include "pgtime.h"
20 : : #include "port/atomics.h"
21 : : #include "replication/logicalproto.h"
22 : : #include "replication/walsender.h"
23 : : #include "storage/condition_variable.h"
24 : : #include "storage/spin.h"
25 : : #include "utils/tuplestore.h"
26 : :
27 : : /* user-settable parameters */
28 : : extern PGDLLIMPORT int wal_receiver_status_interval;
29 : : extern PGDLLIMPORT int wal_receiver_timeout;
30 : : extern PGDLLIMPORT bool hot_standby_feedback;
31 : :
32 : : /*
33 : : * MAXCONNINFO: maximum size of a connection string.
34 : : *
35 : : * XXX: Should this move to pg_config_manual.h?
36 : : */
37 : : #define MAXCONNINFO 1024
38 : :
39 : : /* Can we allow the standby to accept replication connection from another standby? */
40 : : #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
41 : :
42 : : /*
43 : : * Values for WalRcv->walRcvState.
44 : : */
45 : : typedef enum
46 : : {
47 : : WALRCV_STOPPED, /* stopped and mustn't start up again */
48 : : WALRCV_STARTING, /* launched, but the process hasn't
49 : : * initialized yet */
50 : : WALRCV_STREAMING, /* walreceiver is streaming */
51 : : WALRCV_WAITING, /* stopped streaming, waiting for orders */
52 : : WALRCV_RESTARTING, /* asked to restart streaming */
53 : : WALRCV_STOPPING, /* requested to stop, but still running */
54 : : } WalRcvState;
55 : :
56 : : /* Shared memory area for management of walreceiver process */
57 : : typedef struct
58 : : {
59 : : /*
60 : : * Currently active walreceiver process's proc number and PID.
61 : : *
62 : : * The startup process uses the proc number to wake it up after telling it
63 : : * where to start streaming (after setting receiveStart and
64 : : * receiveStartTLI), and also to tell it to send apply feedback to the
65 : : * primary whenever specially marked commit records are applied.
66 : : */
67 : : ProcNumber procno;
68 : : pid_t pid;
69 : :
70 : : /* Its current state */
71 : : WalRcvState walRcvState;
72 : : ConditionVariable walRcvStoppedCV;
73 : :
74 : : /*
75 : : * Its start time (actually, the time at which it was requested to be
76 : : * started).
77 : : */
78 : : pg_time_t startTime;
79 : :
80 : : /*
81 : : * receiveStart and receiveStartTLI indicate the first byte position and
82 : : * timeline that will be received. When startup process starts the
83 : : * walreceiver, it sets these to the point where it wants the streaming to
84 : : * begin.
85 : : */
86 : : XLogRecPtr receiveStart;
87 : : TimeLineID receiveStartTLI;
88 : :
89 : : /*
90 : : * flushedUpto-1 is the last byte position that has already been received,
91 : : * and receivedTLI is the timeline it came from. At the first startup of
92 : : * walreceiver, these are set to receiveStart and receiveStartTLI. After
93 : : * that, walreceiver updates these whenever it flushes the received WAL to
94 : : * disk.
95 : : */
96 : : XLogRecPtr flushedUpto;
97 : : TimeLineID receivedTLI;
98 : :
99 : : /*
100 : : * latestChunkStart is the starting byte position of the current "batch"
101 : : * of received WAL. It's actually the same as the previous value of
102 : : * flushedUpto before the last flush to disk. Startup process can use
103 : : * this to detect whether it's keeping up or not.
104 : : */
105 : : XLogRecPtr latestChunkStart;
106 : :
107 : : /*
108 : : * Time of send and receive of any message received.
109 : : */
110 : : TimestampTz lastMsgSendTime;
111 : : TimestampTz lastMsgReceiptTime;
112 : :
113 : : /*
114 : : * Latest reported end of WAL on the sender
115 : : */
116 : : XLogRecPtr latestWalEnd;
117 : : TimestampTz latestWalEndTime;
118 : :
119 : : /*
120 : : * connection string; initially set to connect to the primary, and later
121 : : * clobbered to hide security-sensitive fields.
122 : : */
123 : : char conninfo[MAXCONNINFO];
124 : :
125 : : /*
126 : : * Host name (this can be a host name, an IP address, or a directory path)
127 : : * and port number of the active replication connection.
128 : : */
129 : : char sender_host[NI_MAXHOST];
130 : : int sender_port;
131 : :
132 : : /*
133 : : * replication slot name; is also used for walreceiver to connect with the
134 : : * primary
135 : : */
136 : : char slotname[NAMEDATALEN];
137 : :
138 : : /*
139 : : * If it's a temporary replication slot, it needs to be recreated when
140 : : * connecting.
141 : : */
142 : : bool is_temp_slot;
143 : :
144 : : /* set true once conninfo is ready to display (obfuscated pwds etc) */
145 : : bool ready_to_display;
146 : :
147 : : slock_t mutex; /* locks shared variables shown above */
148 : :
149 : : /*
150 : : * Like flushedUpto, but advanced after writing and before flushing,
151 : : * without the need to acquire the spin lock. Data can be read by another
152 : : * process up to this point, but shouldn't be used for data integrity
153 : : * purposes.
154 : : */
155 : : pg_atomic_uint64 writtenUpto;
156 : :
157 : : /*
158 : : * force walreceiver reply? This doesn't need to be locked; memory
159 : : * barriers for ordering are sufficient. But we do need atomic fetch and
160 : : * store semantics, so use sig_atomic_t.
161 : : */
162 : : sig_atomic_t force_reply; /* used as a bool */
163 : : } WalRcvData;
164 : :
165 : : extern PGDLLIMPORT WalRcvData *WalRcv;
166 : :
167 : : typedef struct
168 : : {
169 : : bool logical; /* True if this is logical replication stream,
170 : : * false if physical stream. */
171 : : char *slotname; /* Name of the replication slot or NULL. */
172 : : XLogRecPtr startpoint; /* LSN of starting point. */
173 : :
174 : : union
175 : : {
176 : : struct
177 : : {
178 : : TimeLineID startpointTLI; /* Starting timeline */
179 : : } physical;
180 : : struct
181 : : {
182 : : uint32 proto_version; /* Logical protocol version */
183 : : List *publication_names; /* String list of publications */
184 : : bool binary; /* Ask publisher to use binary */
185 : : char *streaming_str; /* Streaming of large transactions */
186 : : bool twophase; /* Streaming of two-phase transactions at
187 : : * prepare time */
188 : : char *origin; /* Only publish data originating from the
189 : : * specified origin */
190 : : } logical;
191 : : } proto;
192 : : } WalRcvStreamOptions;
193 : :
194 : : struct WalReceiverConn;
195 : : typedef struct WalReceiverConn WalReceiverConn;
196 : :
197 : : /*
198 : : * Status of walreceiver query execution.
199 : : *
200 : : * We only define statuses that are currently used.
201 : : */
202 : : typedef enum
203 : : {
204 : : WALRCV_ERROR, /* There was error when executing the query. */
205 : : WALRCV_OK_COMMAND, /* Query executed utility or replication
206 : : * command. */
207 : : WALRCV_OK_TUPLES, /* Query returned tuples. */
208 : : WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
209 : : WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
210 : : WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication
211 : : * protocol. */
212 : : } WalRcvExecStatus;
213 : :
214 : : /*
215 : : * Return value for walrcv_exec, returns the status of the execution and
216 : : * tuples if any.
217 : : */
218 : : typedef struct WalRcvExecResult
219 : : {
220 : : WalRcvExecStatus status;
221 : : int sqlstate;
222 : : char *err;
223 : : Tuplestorestate *tuplestore;
224 : : TupleDesc tupledesc;
225 : : } WalRcvExecResult;
226 : :
227 : : /* WAL receiver - libpqwalreceiver hooks */
228 : :
229 : : /*
230 : : * walrcv_connect_fn
231 : : *
232 : : * Establish connection to a cluster. 'replication' is true if the
233 : : * connection is a replication connection, and false if it is a
234 : : * regular connection. If it is a replication connection, it could
235 : : * be either logical or physical based on input argument 'logical'.
236 : : * 'appname' is a name associated to the connection, to use for example
237 : : * with fallback_application_name or application_name. Returns the
238 : : * details about the connection established, as defined by
239 : : * WalReceiverConn for each WAL receiver module. On error, NULL is
240 : : * returned with 'err' including the error generated.
241 : : */
242 : : typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
243 : : bool replication,
244 : : bool logical,
245 : : bool must_use_password,
246 : : const char *appname,
247 : : char **err);
248 : :
249 : : /*
250 : : * walrcv_check_conninfo_fn
251 : : *
252 : : * Parse and validate the connection string given as of 'conninfo'.
253 : : */
254 : : typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
255 : : bool must_use_password);
256 : :
257 : : /*
258 : : * walrcv_get_conninfo_fn
259 : : *
260 : : * Returns a user-displayable conninfo string. Note that any
261 : : * security-sensitive fields should be obfuscated.
262 : : */
263 : : typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
264 : :
265 : : /*
266 : : * walrcv_get_senderinfo_fn
267 : : *
268 : : * Provide information of the WAL sender this WAL receiver is connected
269 : : * to, as of 'sender_host' for the host of the sender and 'sender_port'
270 : : * for its port.
271 : : */
272 : : typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
273 : : char **sender_host,
274 : : int *sender_port);
275 : :
276 : : /*
277 : : * walrcv_identify_system_fn
278 : : *
279 : : * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
280 : : * identity of the cluster. Returns the system ID of the cluster
281 : : * connected to. 'primary_tli' is the timeline ID of the sender.
282 : : */
283 : : typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
284 : : TimeLineID *primary_tli);
285 : :
286 : : /*
287 : : * walrcv_get_dbname_from_conninfo_fn
288 : : *
289 : : * Returns the database name from the primary_conninfo
290 : : */
291 : : typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
292 : :
293 : : /*
294 : : * walrcv_server_version_fn
295 : : *
296 : : * Returns the version number of the cluster connected to.
297 : : */
298 : : typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
299 : :
300 : : /*
301 : : * walrcv_readtimelinehistoryfile_fn
302 : : *
303 : : * Fetch from cluster the timeline history file for timeline 'tli'.
304 : : * Returns the name of the timeline history file as of 'filename', its
305 : : * contents as of 'content' and its 'size'.
306 : : */
307 : : typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
308 : : TimeLineID tli,
309 : : char **filename,
310 : : char **content,
311 : : int *size);
312 : :
313 : : /*
314 : : * walrcv_startstreaming_fn
315 : : *
316 : : * Start streaming WAL data from given streaming options. Returns true
317 : : * if the connection has switched successfully to copy-both mode and false
318 : : * if the server received the command and executed it successfully, but
319 : : * didn't switch to copy-mode.
320 : : */
321 : : typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
322 : : const WalRcvStreamOptions *options);
323 : :
324 : : /*
325 : : * walrcv_endstreaming_fn
326 : : *
327 : : * Stop streaming of WAL data. Returns the next timeline ID of the cluster
328 : : * connected to in 'next_tli', or 0 if there was no report.
329 : : */
330 : : typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
331 : : TimeLineID *next_tli);
332 : :
333 : : /*
334 : : * walrcv_receive_fn
335 : : *
336 : : * Receive a message available from the WAL stream. 'buffer' is a pointer
337 : : * to a buffer holding the message received. Returns the length of the data,
338 : : * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
339 : : * be waited on before a retry), and -1 if the cluster ended the COPY.
340 : : */
341 : : typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
342 : : char **buffer,
343 : : pgsocket *wait_fd);
344 : :
345 : : /*
346 : : * walrcv_send_fn
347 : : *
348 : : * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
349 : : * contents.
350 : : */
351 : : typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
352 : : const char *buffer,
353 : : int nbytes);
354 : :
355 : : /*
356 : : * walrcv_create_slot_fn
357 : : *
358 : : * Create a new replication slot named 'slotname'. 'temporary' defines
359 : : * if the slot is temporary. 'snapshot_action' defines the behavior wanted
360 : : * for an exported snapshot (see replication protocol for more details).
361 : : * 'lsn' includes the LSN position at which the created slot became
362 : : * consistent. Returns the name of the exported snapshot for a logical
363 : : * slot, or NULL for a physical slot.
364 : : */
365 : : typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
366 : : const char *slotname,
367 : : bool temporary,
368 : : bool two_phase,
369 : : bool failover,
370 : : CRSSnapshotAction snapshot_action,
371 : : XLogRecPtr *lsn);
372 : :
373 : : /*
374 : : * walrcv_alter_slot_fn
375 : : *
376 : : * Change the definition of a replication slot. Currently, it supports
377 : : * changing the failover and two_phase properties of the slot.
378 : : */
379 : : typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
380 : : const char *slotname,
381 : : const bool *failover,
382 : : const bool *two_phase);
383 : :
384 : :
385 : : /*
386 : : * walrcv_get_backend_pid_fn
387 : : *
388 : : * Returns the PID of the remote backend process.
389 : : */
390 : : typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
391 : :
392 : : /*
393 : : * walrcv_exec_fn
394 : : *
395 : : * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
396 : : * is the expected number of returned attributes, and 'retTypes' an array
397 : : * including their type OIDs. Returns the status of the execution and
398 : : * tuples if any.
399 : : */
400 : : typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
401 : : const char *query,
402 : : const int nRetTypes,
403 : : const Oid *retTypes);
404 : :
405 : : /*
406 : : * walrcv_disconnect_fn
407 : : *
408 : : * Disconnect with the cluster.
409 : : */
410 : : typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
411 : :
412 : : typedef struct WalReceiverFunctionsType
413 : : {
414 : : walrcv_connect_fn walrcv_connect;
415 : : walrcv_check_conninfo_fn walrcv_check_conninfo;
416 : : walrcv_get_conninfo_fn walrcv_get_conninfo;
417 : : walrcv_get_senderinfo_fn walrcv_get_senderinfo;
418 : : walrcv_identify_system_fn walrcv_identify_system;
419 : : walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
420 : : walrcv_server_version_fn walrcv_server_version;
421 : : walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
422 : : walrcv_startstreaming_fn walrcv_startstreaming;
423 : : walrcv_endstreaming_fn walrcv_endstreaming;
424 : : walrcv_receive_fn walrcv_receive;
425 : : walrcv_send_fn walrcv_send;
426 : : walrcv_create_slot_fn walrcv_create_slot;
427 : : walrcv_alter_slot_fn walrcv_alter_slot;
428 : : walrcv_get_backend_pid_fn walrcv_get_backend_pid;
429 : : walrcv_exec_fn walrcv_exec;
430 : : walrcv_disconnect_fn walrcv_disconnect;
431 : : } WalReceiverFunctionsType;
432 : :
433 : : extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
434 : :
435 : : #define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
436 : : WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
437 : : #define walrcv_check_conninfo(conninfo, must_use_password) \
438 : : WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
439 : : #define walrcv_get_conninfo(conn) \
440 : : WalReceiverFunctions->walrcv_get_conninfo(conn)
441 : : #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
442 : : WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
443 : : #define walrcv_identify_system(conn, primary_tli) \
444 : : WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
445 : : #define walrcv_get_dbname_from_conninfo(conninfo) \
446 : : WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
447 : : #define walrcv_server_version(conn) \
448 : : WalReceiverFunctions->walrcv_server_version(conn)
449 : : #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
450 : : WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
451 : : #define walrcv_startstreaming(conn, options) \
452 : : WalReceiverFunctions->walrcv_startstreaming(conn, options)
453 : : #define walrcv_endstreaming(conn, next_tli) \
454 : : WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
455 : : #define walrcv_receive(conn, buffer, wait_fd) \
456 : : WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
457 : : #define walrcv_send(conn, buffer, nbytes) \
458 : : WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
459 : : #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
460 : : WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
461 : : #define walrcv_alter_slot(conn, slotname, failover, two_phase) \
462 : : WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
463 : : #define walrcv_get_backend_pid(conn) \
464 : : WalReceiverFunctions->walrcv_get_backend_pid(conn)
465 : : #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
466 : : WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
467 : : #define walrcv_disconnect(conn) \
468 : : WalReceiverFunctions->walrcv_disconnect(conn)
469 : :
470 : : static inline void
3089 peter_e@gmx.net 471 :CBC 1941 : walrcv_clear_result(WalRcvExecResult *walres)
472 : : {
473 [ - + ]: 1941 : if (!walres)
3089 peter_e@gmx.net 474 :UBC 0 : return;
475 : :
3089 peter_e@gmx.net 476 [ + + ]:CBC 1941 : if (walres->err)
477 : 2 : pfree(walres->err);
478 : :
479 [ + + ]: 1941 : if (walres->tuplestore)
480 : 1108 : tuplestore_end(walres->tuplestore);
481 : :
482 [ + + ]: 1941 : if (walres->tupledesc)
483 : 1108 : FreeTupleDesc(walres->tupledesc);
484 : :
485 : 1941 : pfree(walres);
486 : : }
487 : :
488 : : /* prototypes for functions in walreceiver.c */
489 : : pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len);
490 : : extern void WalRcvForceReply(void);
491 : :
492 : : /* prototypes for functions in walreceiverfuncs.c */
493 : : extern Size WalRcvShmemSize(void);
494 : : extern void WalRcvShmemInit(void);
495 : : extern void ShutdownWalRcv(void);
496 : : extern bool WalRcvStreaming(void);
497 : : extern bool WalRcvRunning(void);
498 : : extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
499 : : const char *conninfo, const char *slotname,
500 : : bool create_temp_slot);
501 : : extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
502 : : extern XLogRecPtr GetWalRcvWriteRecPtr(void);
503 : : extern int GetReplicationApplyDelay(void);
504 : : extern int GetReplicationTransferLatency(void);
505 : :
506 : : #endif /* _WALRECEIVER_H */
|