Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * syncutils.c
3 : : * PostgreSQL logical replication: common synchronization code
4 : : *
5 : : * Copyright (c) 2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/syncutils.c
9 : : *
10 : : * NOTES
11 : : * This file contains code common for synchronization workers.
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "catalog/pg_subscription_rel.h"
18 : : #include "pgstat.h"
19 : : #include "replication/worker_internal.h"
20 : : #include "storage/ipc.h"
21 : : #include "utils/lsyscache.h"
22 : : #include "utils/memutils.h"
23 : :
24 : : /*
25 : : * Enum for phases of the subscription relations state.
26 : : *
27 : : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
28 : : * state is no longer valid, and the subscription relations should be rebuilt.
29 : : *
30 : : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
31 : : * relations state is being rebuilt.
32 : : *
33 : : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
34 : : * up-to-date and valid.
35 : : */
36 : : typedef enum
37 : : {
38 : : SYNC_RELATIONS_STATE_NEEDS_REBUILD,
39 : : SYNC_RELATIONS_STATE_REBUILD_STARTED,
40 : : SYNC_RELATIONS_STATE_VALID,
41 : : } SyncingRelationsState;
42 : :
43 : : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
44 : :
45 : : /*
46 : : * Exit routine for synchronization worker.
47 : : */
48 : : pg_noreturn void
12 akapila@postgresql.o 49 :GNC 184 : FinishSyncWorker(void)
50 : : {
51 : : /*
52 : : * Commit any outstanding transaction. This is the usual case, unless
53 : : * there was nothing to do for the table.
54 : : */
55 [ + - ]: 184 : if (IsTransactionState())
56 : : {
57 : 184 : CommitTransactionCommand();
58 : 184 : pgstat_report_stat(true);
59 : : }
60 : :
61 : : /* And flush all writes. */
62 : 184 : XLogFlush(GetXLogWriteRecPtr());
63 : :
64 : 184 : StartTransactionCommand();
65 [ + - ]: 184 : ereport(LOG,
66 : : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
67 : : MySubscription->name,
68 : : get_rel_name(MyLogicalRepWorker->relid))));
69 : 184 : CommitTransactionCommand();
70 : :
71 : : /* Find the leader apply worker and signal it. */
72 : 184 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
73 : :
74 : : /* Stop gracefully */
75 : 184 : proc_exit(0);
76 : : }
77 : :
78 : : /*
79 : : * Callback from syscache invalidation.
80 : : */
81 : : void
82 : 1710 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
83 : : {
84 : 1710 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
85 : 1710 : }
86 : :
87 : : /*
88 : : * Process possible state change(s) of relations that are being synchronized.
89 : : */
90 : : void
91 : 3251 : ProcessSyncingRelations(XLogRecPtr current_lsn)
92 : : {
93 [ + + + - : 3251 : switch (MyLogicalRepWorker->type)
- ]
94 : : {
95 : 22 : case WORKERTYPE_PARALLEL_APPLY:
96 : :
97 : : /*
98 : : * Skip for parallel apply workers because they only operate on
99 : : * tables that are in a READY state. See pa_can_start() and
100 : : * should_apply_changes_for_rel().
101 : : */
102 : 22 : break;
103 : :
104 : 197 : case WORKERTYPE_TABLESYNC:
105 : 197 : ProcessSyncingTablesForSync(current_lsn);
106 : 13 : break;
107 : :
108 : 3032 : case WORKERTYPE_APPLY:
109 : 3032 : ProcessSyncingTablesForApply(current_lsn);
110 : 3024 : break;
111 : :
12 akapila@postgresql.o 112 :UNC 0 : case WORKERTYPE_UNKNOWN:
113 : : /* Should never happen. */
114 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
115 : : }
12 akapila@postgresql.o 116 :GNC 3059 : }
117 : :
118 : : /*
119 : : * Common code to fetch the up-to-date sync state info into the static lists.
120 : : *
121 : : * Returns true if subscription has 1 or more tables, else false.
122 : : *
123 : : * Note: If this function started the transaction (indicated by the parameter)
124 : : * then it is the caller's responsibility to commit it.
125 : : */
126 : : bool
127 : 3297 : FetchRelationStates(bool *started_tx)
128 : : {
129 : : static bool has_subtables = false;
130 : :
131 : 3297 : *started_tx = false;
132 : :
133 [ + + ]: 3297 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
134 : : {
135 : : MemoryContext oldctx;
136 : : List *rstates;
137 : : ListCell *lc;
138 : : SubscriptionRelState *rstate;
139 : :
140 : 891 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
141 : :
142 : : /* Clean the old lists. */
143 : 891 : list_free_deep(table_states_not_ready);
144 : 891 : table_states_not_ready = NIL;
145 : :
146 [ + + ]: 891 : if (!IsTransactionState())
147 : : {
148 : 875 : StartTransactionCommand();
149 : 875 : *started_tx = true;
150 : : }
151 : :
152 : : /* Fetch tables that are in non-ready state. */
5 153 : 891 : rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
154 : : true);
155 : :
156 : : /* Allocate the tracking info in a permanent memory context. */
12 157 : 891 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
158 [ + + + + : 2435 : foreach(lc, rstates)
+ + ]
159 : : {
160 : 1544 : rstate = palloc(sizeof(SubscriptionRelState));
161 : 1544 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
162 : 1544 : table_states_not_ready = lappend(table_states_not_ready, rstate);
163 : : }
164 : 891 : MemoryContextSwitchTo(oldctx);
165 : :
166 : : /*
167 : : * Does the subscription have tables?
168 : : *
169 : : * If there were not-READY tables found then we know it does. But if
170 : : * table_states_not_ready was empty we still need to check again to
171 : : * see if there are 0 tables.
172 : : */
173 [ + + + + ]: 1140 : has_subtables = (table_states_not_ready != NIL) ||
174 : 249 : HasSubscriptionTables(MySubscription->oid);
175 : :
176 : : /*
177 : : * If the subscription relation cache has been invalidated since we
178 : : * entered this routine, we still use and return the relations we just
179 : : * finished constructing, to avoid infinite loops, but we leave the
180 : : * table states marked as stale so that we'll rebuild it again on next
181 : : * access. Otherwise, we mark the table states as valid.
182 : : */
183 [ + - ]: 891 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
184 : 891 : relation_states_validity = SYNC_RELATIONS_STATE_VALID;
185 : : }
186 : :
187 : 3297 : return has_subtables;
188 : : }
|