Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * sinvaladt.c
4 : : * POSTGRES shared cache invalidation data manager.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/storage/ipc/sinvaladt.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include <signal.h>
18 : : #include <unistd.h>
19 : :
20 : : #include "miscadmin.h"
21 : : #include "storage/ipc.h"
22 : : #include "storage/proc.h"
23 : : #include "storage/procnumber.h"
24 : : #include "storage/procsignal.h"
25 : : #include "storage/shmem.h"
26 : : #include "storage/sinvaladt.h"
27 : : #include "storage/spin.h"
28 : : #include "storage/subsystems.h"
29 : :
30 : : /*
31 : : * Conceptually, the shared cache invalidation messages are stored in an
32 : : * infinite array, where maxMsgNum is the next array subscript to store a
33 : : * submitted message in, minMsgNum is the smallest array subscript containing
34 : : * a message not yet read by all backends, and we always have maxMsgNum >=
35 : : * minMsgNum. (They are equal when there are no messages pending.) For each
36 : : * active backend, there is a nextMsgNum pointer indicating the next message it
37 : : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38 : : * backend.
39 : : *
40 : : * (In the current implementation, minMsgNum is a lower bound for the
41 : : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42 : : * smallest nextMsgNum --- it may lag behind. We only update it when
43 : : * SICleanupQueue is called, and we try not to do that often.)
44 : : *
45 : : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46 : : * entries. We translate MsgNum values into circular-buffer indexes by
47 : : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48 : : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49 : : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50 : : * in the buffer. If the buffer does overflow, we recover by setting the
51 : : * "reset" flag for each backend that has fallen too far behind. A backend
52 : : * that is in "reset" state is ignored while determining minMsgNum. When
53 : : * it does finally attempt to receive inval messages, it must discard all
54 : : * its invalidatable state, since it won't know what it missed.
55 : : *
56 : : * To reduce the probability of needing resets, we send a "catchup" interrupt
57 : : * to any backend that seems to be falling unreasonably far behind. The
58 : : * normal behavior is that at most one such interrupt is in flight at a time;
59 : : * when a backend completes processing a catchup interrupt, it executes
60 : : * SICleanupQueue, which will signal the next-furthest-behind backend if
61 : : * needed. This avoids undue contention from multiple backends all trying
62 : : * to catch up at once. However, the furthest-back backend might be stuck
63 : : * in a state where it can't catch up. Eventually it will get reset, so it
64 : : * won't cause any more problems for anyone but itself. But we don't want
65 : : * to find that a bunch of other backends are now too close to the reset
66 : : * threshold to be saved. So SICleanupQueue is designed to occasionally
67 : : * send extra catchup interrupts as the queue gets fuller, to backends that
68 : : * are far behind and haven't gotten one yet. As long as there aren't a lot
69 : : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70 : : * that aren't stuck will propagate their interrupts to the next guy.
71 : : *
72 : : * We would have problems if the MsgNum values overflow an integer, so
73 : : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74 : : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75 : : * large so that we don't need to do this often. It must be a multiple of
76 : : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77 : : * to be moved when we do it.
78 : : *
79 : : * Access to the shared sinval array is protected by two locks, SInvalReadLock
80 : : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81 : : * authorizes them to modify their own ProcState but not to modify or even
82 : : * look at anyone else's. When we need to perform array-wide updates,
83 : : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84 : : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85 : : * mode) to serialize adding messages to the queue. Note that a writer
86 : : * can operate in parallel with one or more readers, because the writer
87 : : * has no need to touch anyone's ProcState, except in the infrequent cases
88 : : * when SICleanupQueue is needed. The only point of overlap is that
89 : : * the writer wants to change maxMsgNum while readers need to read it.
90 : : * We deal with that by having a spinlock that readers must take for just
91 : : * long enough to read maxMsgNum, while writers take it for just long enough
92 : : * to write maxMsgNum. (The exact rule is that you need the spinlock to
93 : : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94 : : * spinlock to write maxMsgNum unless you are holding both locks.)
95 : : *
96 : : * Note: since maxMsgNum is an int and hence presumably atomically readable/
97 : : * writable, the spinlock might seem unnecessary. The reason it is needed
98 : : * is to provide a memory barrier: we need to be sure that messages written
99 : : * to the array are actually there before maxMsgNum is increased, and that
100 : : * readers will see that data after fetching maxMsgNum. Multiprocessors
101 : : * that have weak memory-ordering guarantees can fail without the memory
102 : : * barrier instructions that are included in the spinlock sequences.
103 : : */
104 : :
105 : :
106 : : /*
107 : : * Configurable parameters.
108 : : *
109 : : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110 : : * Must be a power of 2 for speed.
111 : : *
112 : : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113 : : * Must be a multiple of MAXNUMMESSAGES. Should be large.
114 : : *
115 : : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116 : : * before we bother to call SICleanupQueue.
117 : : *
118 : : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119 : : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120 : : *
121 : : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122 : : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123 : : *
124 : : * WRITE_QUANTUM: the max number of messages to push into the buffer per
125 : : * iteration of SIInsertDataEntries. Noncritical but should be less than
126 : : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127 : : * per iteration.
128 : : */
129 : :
130 : : #define MAXNUMMESSAGES 4096
131 : : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 : : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 : : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 : : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 : : #define WRITE_QUANTUM 64
136 : :
137 : : /* Per-backend state in shared invalidation structure */
138 : : typedef struct ProcState
139 : : {
140 : : /* procPid is zero in an inactive ProcState array entry. */
141 : : pid_t procPid; /* PID of backend, for signaling */
142 : : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
143 : : int nextMsgNum; /* next message number to read */
144 : : bool resetState; /* backend needs to reset its state */
145 : : bool signaled; /* backend has been sent catchup signal */
146 : : bool hasMessages; /* backend has unread messages */
147 : :
148 : : /*
149 : : * Backend only sends invalidations, never receives them. This only makes
150 : : * sense for Startup process during recovery because it doesn't maintain a
151 : : * relcache, yet it fires inval messages to allow query backends to see
152 : : * schema changes.
153 : : */
154 : : bool sendOnly; /* backend only sends, never receives */
155 : :
156 : : /*
157 : : * Next LocalTransactionId to use for each idle backend slot. We keep
158 : : * this here because it is indexed by ProcNumber and it is convenient to
159 : : * copy the value to and from local memory when MyProcNumber is set. It's
160 : : * meaningless in an active ProcState entry.
161 : : */
162 : : LocalTransactionId nextLXID;
163 : : } ProcState;
164 : :
165 : : /* Shared cache invalidation memory segment */
166 : : typedef struct SISeg
167 : : {
168 : : /*
169 : : * General state information
170 : : */
171 : : int minMsgNum; /* oldest message still needed */
172 : : int maxMsgNum; /* next message number to be assigned */
173 : : int nextThreshold; /* # of messages to call SICleanupQueue */
174 : :
175 : : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
176 : :
177 : : /*
178 : : * Circular buffer holding shared-inval messages
179 : : */
180 : : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
181 : :
182 : : /*
183 : : * Per-backend invalidation state info.
184 : : *
185 : : * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
186 : : * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
187 : : * a dense array of their indexes, to speed up scanning all in-use slots.
188 : : *
189 : : * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
190 : : * having our separate copy avoids contention on ProcArrayLock, and allows
191 : : * us to track only the processes that participate in shared cache
192 : : * invalidations.
193 : : */
194 : : int numProcs;
195 : : int *pgprocnos;
196 : : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
197 : : } SISeg;
198 : :
199 : : /*
200 : : * We reserve a slot for each possible ProcNumber, plus one for each
201 : : * possible auxiliary process type. (This scheme assumes there is not
202 : : * more than one of any auxiliary process type at a time, except for
203 : : * IO workers.)
204 : : */
205 : : #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
206 : :
207 : : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
208 : :
209 : : static void SharedInvalShmemRequest(void *arg);
210 : : static void SharedInvalShmemInit(void *arg);
211 : :
212 : : const ShmemCallbacks SharedInvalShmemCallbacks = {
213 : : .request_fn = SharedInvalShmemRequest,
214 : : .init_fn = SharedInvalShmemInit,
215 : : };
216 : :
217 : :
218 : : static LocalTransactionId nextLocalTransactionId;
219 : :
220 : : static void CleanupInvalidationState(int status, Datum arg);
221 : :
222 : :
223 : : /*
224 : : * SharedInvalShmemRequest
225 : : * Register shared memory needs for the SI message buffer
226 : : */
227 : : static void
29 heikki.linnakangas@i 228 :GNC 1244 : SharedInvalShmemRequest(void *arg)
229 : : {
230 : : Size size;
231 : :
7563 tgl@sss.pgh.pa.us 232 :CBC 1244 : size = offsetof(SISeg, procState);
793 heikki.linnakangas@i 233 : 1244 : size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
234 : 1244 : size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
235 : :
29 heikki.linnakangas@i 236 :GNC 1244 : ShmemRequestStruct(.name = "shmInvalBuffer",
237 : : .size = size,
238 : : .ptr = (void **) &shmInvalBuffer,
239 : : );
10892 scrappy@hub.org 240 :GIC 1244 : }
241 : :
242 : : static void
29 heikki.linnakangas@i 243 :GNC 1241 : SharedInvalShmemInit(void *arg)
244 : : {
245 : : int i;
246 : :
247 : : /* Clear message counters, init spinlock */
6624 alvherre@alvh.no-ip. 248 :CBC 1241 : shmInvalBuffer->minMsgNum = 0;
249 : 1241 : shmInvalBuffer->maxMsgNum = 0;
6529 tgl@sss.pgh.pa.us 250 : 1241 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
6528 251 : 1241 : SpinLockInit(&shmInvalBuffer->msgnumLock);
252 : :
253 : : /* The buffer[] array is initially all unused, so we need not fill it */
254 : :
255 : : /* Mark all backends inactive, and initialize nextLXID */
793 heikki.linnakangas@i 256 [ + + ]: 164024 : for (i = 0; i < NumProcStateSlots; i++)
257 : : {
3240 tgl@sss.pgh.pa.us 258 : 162783 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
6172 bruce@momjian.us 259 : 162783 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
6624 alvherre@alvh.no-ip. 260 : 162783 : shmInvalBuffer->procState[i].resetState = false;
6529 tgl@sss.pgh.pa.us 261 : 162783 : shmInvalBuffer->procState[i].signaled = false;
5394 rhaas@postgresql.org 262 : 162783 : shmInvalBuffer->procState[i].hasMessages = false;
6529 tgl@sss.pgh.pa.us 263 : 162783 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
264 : : }
793 heikki.linnakangas@i 265 : 1241 : shmInvalBuffer->numProcs = 0;
266 : 1241 : shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
10892 scrappy@hub.org 267 :GIC 1241 : }
268 : :
269 : : /*
270 : : * SharedInvalBackendInit
271 : : * Initialize a new backend to operate on the sinval buffer
272 : : */
273 : : void
5981 simon@2ndQuadrant.co 274 :CBC 18708 : SharedInvalBackendInit(bool sendOnly)
275 : : {
276 : : ProcState *stateP;
277 : : pid_t oldPid;
6624 alvherre@alvh.no-ip. 278 : 18708 : SISeg *segP = shmInvalBuffer;
279 : :
793 heikki.linnakangas@i 280 [ - + ]: 18708 : if (MyProcNumber < 0)
793 heikki.linnakangas@i 281 [ # # ]:UBC 0 : elog(ERROR, "MyProcNumber not set");
793 heikki.linnakangas@i 282 [ - + ]:CBC 18708 : if (MyProcNumber >= NumProcStateSlots)
793 heikki.linnakangas@i 283 [ # # ]:UBC 0 : elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
284 : : MyProcNumber, NumProcStateSlots);
793 heikki.linnakangas@i 285 :CBC 18708 : stateP = &segP->procState[MyProcNumber];
286 : :
287 : : /*
288 : : * This can run in parallel with read operations, but not with write
289 : : * operations, since SIInsertDataEntries relies on the pgprocnos array to
290 : : * set hasMessages appropriately.
291 : : */
6529 tgl@sss.pgh.pa.us 292 : 18708 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
293 : :
793 heikki.linnakangas@i 294 : 18708 : oldPid = stateP->procPid;
295 [ - + ]: 18708 : if (oldPid != 0)
296 : : {
793 heikki.linnakangas@i 297 :UBC 0 : LWLockRelease(SInvalWriteLock);
298 [ # # ]: 0 : elog(ERROR, "sinval slot for backend %d is already in use by process %d",
299 : : MyProcNumber, (int) oldPid);
300 : : }
301 : :
793 heikki.linnakangas@i 302 :CBC 18708 : shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
303 : :
304 : : /* Fetch next local transaction ID into local memory */
6529 tgl@sss.pgh.pa.us 305 : 18708 : nextLocalTransactionId = stateP->nextLXID;
306 : :
307 : : /* mark myself active, with all extant messages already read */
6356 heikki.linnakangas@i 308 : 18708 : stateP->procPid = MyProcPid;
9738 tgl@sss.pgh.pa.us 309 : 18708 : stateP->nextMsgNum = segP->maxMsgNum;
9720 310 : 18708 : stateP->resetState = false;
6529 311 : 18708 : stateP->signaled = false;
5394 rhaas@postgresql.org 312 : 18708 : stateP->hasMessages = false;
5981 simon@2ndQuadrant.co 313 : 18708 : stateP->sendOnly = sendOnly;
314 : :
6529 tgl@sss.pgh.pa.us 315 : 18708 : LWLockRelease(SInvalWriteLock);
316 : :
317 : : /* register exit routine to mark my entry inactive at exit */
9346 peter_e@gmx.net 318 : 18708 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
10467 bruce@momjian.us 319 : 18708 : }
320 : :
321 : : /*
322 : : * CleanupInvalidationState
323 : : * Mark the current backend as no longer active.
324 : : *
325 : : * This function is called via on_shmem_exit() during backend shutdown.
326 : : *
327 : : * arg is really of type "SISeg*".
328 : : */
329 : : static void
9346 peter_e@gmx.net 330 : 18708 : CleanupInvalidationState(int status, Datum arg)
331 : : {
9305 tgl@sss.pgh.pa.us 332 : 18708 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
333 : : ProcState *stateP;
334 : : int i;
335 : :
223 peter@eisentraut.org 336 [ - + ]:GNC 18708 : Assert(segP);
337 : :
6529 tgl@sss.pgh.pa.us 338 :CBC 18708 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
339 : :
793 heikki.linnakangas@i 340 : 18708 : stateP = &segP->procState[MyProcNumber];
341 : :
342 : : /* Update next local transaction ID for next holder of this proc number */
6529 tgl@sss.pgh.pa.us 343 : 18708 : stateP->nextLXID = nextLocalTransactionId;
344 : :
345 : : /* Mark myself inactive */
6356 heikki.linnakangas@i 346 : 18708 : stateP->procPid = 0;
6529 tgl@sss.pgh.pa.us 347 : 18708 : stateP->nextMsgNum = 0;
348 : 18708 : stateP->resetState = false;
349 : 18708 : stateP->signaled = false;
350 : :
793 heikki.linnakangas@i 351 [ + - ]: 26599 : for (i = segP->numProcs - 1; i >= 0; i--)
352 : : {
353 [ + + ]: 26599 : if (segP->pgprocnos[i] == MyProcNumber)
354 : : {
355 [ + + ]: 18708 : if (i != segP->numProcs - 1)
356 : 3146 : segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
357 : 18708 : break;
358 : : }
359 : : }
360 [ - + ]: 18708 : if (i < 0)
793 heikki.linnakangas@i 361 [ # # ]:UBC 0 : elog(PANIC, "could not find entry in sinval array");
793 heikki.linnakangas@i 362 :CBC 18708 : segP->numProcs--;
363 : :
4452 rhaas@postgresql.org 364 : 18708 : LWLockRelease(SInvalWriteLock);
365 : 18708 : }
366 : :
367 : : /*
368 : : * SIInsertDataEntries
369 : : * Add new invalidation message(s) to the buffer.
370 : : */
371 : : void
6529 tgl@sss.pgh.pa.us 372 : 462386 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
373 : : {
374 : 462386 : SISeg *segP = shmInvalBuffer;
375 : :
376 : : /*
377 : : * N can be arbitrarily large. We divide the work into groups of no more
378 : : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
379 : : * an unreasonably long time. (This is not so much because we care about
380 : : * letting in other writers, as that some just-caught-up backend might be
381 : : * trying to do SICleanupQueue to pass on its signal, and we don't want it
382 : : * to have to wait a long time.) Also, we need to consider calling
383 : : * SICleanupQueue every so often.
384 : : */
385 [ + + ]: 955228 : while (n > 0)
386 : : {
6172 bruce@momjian.us 387 : 492842 : int nthistime = Min(n, WRITE_QUANTUM);
388 : : int numMsgs;
389 : : int max;
390 : : int i;
391 : :
6529 tgl@sss.pgh.pa.us 392 : 492842 : n -= nthistime;
393 : :
394 : 492842 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
395 : :
396 : : /*
397 : : * If the buffer is full, we *must* acquire some space. Clean the
398 : : * queue and reset anyone who is preventing space from being freed.
399 : : * Otherwise, clean the queue only when it's exceeded the next
400 : : * fullness threshold. We have to loop and recheck the buffer state
401 : : * after any call of SICleanupQueue.
402 : : */
403 : : for (;;)
404 : : {
6500 405 : 499541 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
406 [ + + ]: 499541 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
407 [ + + ]: 499298 : numMsgs >= segP->nextThreshold)
408 : 6699 : SICleanupQueue(true, nthistime);
409 : : else
410 : : break;
411 : : }
412 : :
413 : : /*
414 : : * Insert new message(s) into proper slot of circular buffer
415 : : */
6528 416 : 492842 : max = segP->maxMsgNum;
6529 417 [ + + ]: 5335451 : while (nthistime-- > 0)
418 : : {
6528 419 : 4842609 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
420 : 4842609 : max++;
421 : : }
422 : :
423 : : /* Update current value of maxMsgNum using spinlock */
3854 rhaas@postgresql.org 424 [ + + ]: 492842 : SpinLockAcquire(&segP->msgnumLock);
425 : 492842 : segP->maxMsgNum = max;
426 : 492842 : SpinLockRelease(&segP->msgnumLock);
427 : :
428 : : /*
429 : : * Now that the maxMsgNum change is globally visible, we give everyone
430 : : * a swift kick to make sure they read the newly added messages.
431 : : * Releasing SInvalWriteLock will enforce a full memory barrier, so
432 : : * these (unlocked) changes will be committed to memory before we exit
433 : : * the function.
434 : : */
793 heikki.linnakangas@i 435 [ + + ]: 3070817 : for (i = 0; i < segP->numProcs; i++)
436 : : {
437 : 2577975 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
438 : :
5391 rhaas@postgresql.org 439 : 2577975 : stateP->hasMessages = true;
440 : : }
441 : :
6529 tgl@sss.pgh.pa.us 442 : 492842 : LWLockRelease(SInvalWriteLock);
443 : : }
10892 scrappy@hub.org 444 : 462386 : }
445 : :
446 : : /*
447 : : * SIGetDataEntries
448 : : * get next SI message(s) for current backend, if there are any
449 : : *
450 : : * Possible return values:
451 : : * 0: no SI message available
452 : : * n>0: next n SI messages have been extracted into data[]
453 : : * -1: SI reset message extracted
454 : : *
455 : : * If the return value is less than the array size "datasize", the caller
456 : : * can assume that there are no more SI messages after the one(s) returned.
457 : : * Otherwise, another call is needed to collect more messages.
458 : : *
459 : : * NB: this can run in parallel with other instances of SIGetDataEntries
460 : : * executing on behalf of other backends, since each instance will modify only
461 : : * fields of its own backend's ProcState, and no instance will look at fields
462 : : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
463 : : * in shared mode. Note that this is not exactly the normal (read-only)
464 : : * interpretation of a shared lock! Look closely at the interactions before
465 : : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
466 : : *
467 : : * NB: this can also run in parallel with SIInsertDataEntries. It is not
468 : : * guaranteed that we will return any messages added after the routine is
469 : : * entered.
470 : : *
471 : : * Note: we assume that "datasize" is not so large that it might be important
472 : : * to break our hold on SInvalReadLock into segments.
473 : : */
474 : : int
6529 tgl@sss.pgh.pa.us 475 : 23278093 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
476 : : {
477 : : SISeg *segP;
478 : : ProcState *stateP;
479 : : int max;
480 : : int n;
481 : :
6624 alvherre@alvh.no-ip. 482 : 23278093 : segP = shmInvalBuffer;
793 heikki.linnakangas@i 483 : 23278093 : stateP = &segP->procState[MyProcNumber];
484 : :
485 : : /*
486 : : * Before starting to take locks, do a quick, unlocked test to see whether
487 : : * there can possibly be anything to read. On a multiprocessor system,
488 : : * it's possible that this load could migrate backwards and occur before
489 : : * we actually enter this function, so we might miss a sinval message that
490 : : * was just added by some other processor. But they can't migrate
491 : : * backwards over a preceding lock acquisition, so it should be OK. If we
492 : : * haven't acquired a lock preventing against further relevant
493 : : * invalidations, any such occurrence is not much different than if the
494 : : * invalidation had arrived slightly later in the first place.
495 : : */
5394 rhaas@postgresql.org 496 [ + + ]: 23278093 : if (!stateP->hasMessages)
497 : 22369523 : return 0;
498 : :
499 : 908570 : LWLockAcquire(SInvalReadLock, LW_SHARED);
500 : :
501 : : /*
502 : : * We must reset hasMessages before determining how many messages we're
503 : : * going to read. That way, if new messages arrive after we have
504 : : * determined how many we're reading, the flag will get reset and we'll
505 : : * notice those messages part-way through.
506 : : *
507 : : * Note that, if we don't end up reading all of the messages, we had
508 : : * better be certain to reset this flag before exiting!
509 : : */
5391 510 : 908570 : stateP->hasMessages = false;
511 : :
512 : : /* Fetch current value of maxMsgNum using spinlock */
3854 513 [ + + ]: 908570 : SpinLockAcquire(&segP->msgnumLock);
514 : 908570 : max = segP->maxMsgNum;
515 : 908570 : SpinLockRelease(&segP->msgnumLock);
516 : :
9738 tgl@sss.pgh.pa.us 517 [ + + ]: 908570 : if (stateP->resetState)
518 : : {
519 : : /*
520 : : * Force reset. We can say we have dealt with any messages added
521 : : * since the reset, as well; and that means we should clear the
522 : : * signaled flag, too.
523 : : */
6528 524 : 280 : stateP->nextMsgNum = max;
6529 525 : 280 : stateP->resetState = false;
526 : 280 : stateP->signaled = false;
527 : 280 : LWLockRelease(SInvalReadLock);
9740 528 : 280 : return -1;
529 : : }
530 : :
531 : : /*
532 : : * Retrieve messages and advance backend's counter, until data array is
533 : : * full or there are no more messages.
534 : : *
535 : : * There may be other backends that haven't read the message(s), so we
536 : : * cannot delete them here. SICleanupQueue() will eventually remove them
537 : : * from the queue.
538 : : */
6529 539 : 908290 : n = 0;
6528 540 [ + + + + ]: 22142778 : while (n < datasize && stateP->nextMsgNum < max)
541 : : {
6529 542 : 21234488 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
543 : 21234488 : stateP->nextMsgNum++;
544 : : }
545 : :
546 : : /*
547 : : * If we have caught up completely, reset our "signaled" flag so that
548 : : * we'll get another signal if we fall behind again.
549 : : *
550 : : * If we haven't caught up completely, reset the hasMessages flag so that
551 : : * we see the remaining messages next time.
552 : : */
6528 553 [ + + ]: 908290 : if (stateP->nextMsgNum >= max)
6529 554 : 349109 : stateP->signaled = false;
555 : : else
5391 rhaas@postgresql.org 556 : 559181 : stateP->hasMessages = true;
557 : :
6529 tgl@sss.pgh.pa.us 558 : 908290 : LWLockRelease(SInvalReadLock);
559 : 908290 : return n;
560 : : }
561 : :
562 : : /*
563 : : * SICleanupQueue
564 : : * Remove messages that have been consumed by all active backends
565 : : *
566 : : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
567 : : * minFree is the minimum number of message slots to make free.
568 : : *
569 : : * Possible side effects of this routine include marking one or more
570 : : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
571 : : * to some backend that seems to be getting too far behind. We signal at
572 : : * most one backend at a time, for reasons explained at the top of the file.
573 : : *
574 : : * Caution: because we transiently release write lock when we have to signal
575 : : * some other backend, it is NOT guaranteed that there are still minFree
576 : : * free message slots at exit. Caller must recheck and perhaps retry.
577 : : */
578 : : void
579 : 9745 : SICleanupQueue(bool callerHasWriteLock, int minFree)
580 : : {
6624 alvherre@alvh.no-ip. 581 : 9745 : SISeg *segP = shmInvalBuffer;
582 : : int min,
583 : : minsig,
584 : : lowbound,
585 : : numMsgs,
586 : : i;
6529 tgl@sss.pgh.pa.us 587 : 9745 : ProcState *needSig = NULL;
588 : :
589 : : /* Lock out all writers and readers */
590 [ + + ]: 9745 : if (!callerHasWriteLock)
591 : 3046 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
592 : 9745 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
593 : :
594 : : /*
595 : : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
596 : : * furthest-back backend that needs signaling (if any), and reset any
597 : : * backends that are too far back. Note that because we ignore sendOnly
598 : : * backends here it is possible for them to keep sending messages without
599 : : * a problem even when they are the only active backend.
600 : : */
9738 601 : 9745 : min = segP->maxMsgNum;
6529 602 : 9745 : minsig = min - SIG_THRESHOLD;
603 : 9745 : lowbound = min - MAXNUMMESSAGES + minFree;
604 : :
793 heikki.linnakangas@i 605 [ + + ]: 81252 : for (i = 0; i < segP->numProcs; i++)
606 : : {
607 : 71507 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
6172 bruce@momjian.us 608 : 71507 : int n = stateP->nextMsgNum;
609 : :
610 : : /* Ignore if already in reset state */
793 heikki.linnakangas@i 611 [ - + ]: 71507 : Assert(stateP->procPid != 0);
612 [ + + + + ]: 71507 : if (stateP->resetState || stateP->sendOnly)
6529 tgl@sss.pgh.pa.us 613 : 6360 : continue;
614 : :
615 : : /*
616 : : * If we must free some space and this backend is preventing it, force
617 : : * him into reset state and then ignore until he catches up.
618 : : */
619 [ + + ]: 65147 : if (n < lowbound)
620 : : {
621 : 281 : stateP->resetState = true;
622 : : /* no point in signaling him ... */
623 : 281 : continue;
624 : : }
625 : :
626 : : /* Track the global minimum nextMsgNum */
627 [ + + ]: 64866 : if (n < min)
628 : 14878 : min = n;
629 : :
630 : : /* Also see who's furthest back of the unsignaled backends */
631 [ + + + + ]: 64866 : if (n < minsig && !stateP->signaled)
632 : : {
633 : 3515 : minsig = n;
634 : 3515 : needSig = stateP;
635 : : }
636 : : }
9738 637 : 9745 : segP->minMsgNum = min;
638 : :
639 : : /*
640 : : * When minMsgNum gets really large, decrement all message counters so as
641 : : * to forestall overflow of the counters. This happens seldom enough that
642 : : * folding it into the previous loop would be a loser.
643 : : */
644 [ - + ]: 9745 : if (min >= MSGNUMWRAPAROUND)
645 : : {
9738 tgl@sss.pgh.pa.us 646 :UBC 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
647 : 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
793 heikki.linnakangas@i 648 [ # # ]: 0 : for (i = 0; i < segP->numProcs; i++)
649 : 0 : segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
650 : : }
651 : :
652 : : /*
653 : : * Determine how many messages are still in the queue, and set the
654 : : * threshold at which we should repeat SICleanupQueue().
655 : : */
6529 tgl@sss.pgh.pa.us 656 :CBC 9745 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
657 [ + + ]: 9745 : if (numMsgs < CLEANUP_MIN)
658 : 3168 : segP->nextThreshold = CLEANUP_MIN;
659 : : else
660 : 6577 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
661 : :
662 : : /*
663 : : * Lastly, signal anyone who needs a catchup interrupt. Since
664 : : * SendProcSignal() might not be fast, we don't want to hold locks while
665 : : * executing it.
666 : : */
667 [ + + ]: 9745 : if (needSig)
668 : : {
6172 bruce@momjian.us 669 : 3422 : pid_t his_pid = needSig->procPid;
793 heikki.linnakangas@i 670 : 3422 : ProcNumber his_procNumber = (needSig - &segP->procState[0]);
671 : :
6529 tgl@sss.pgh.pa.us 672 : 3422 : needSig->signaled = true;
673 : 3422 : LWLockRelease(SInvalReadLock);
674 : 3422 : LWLockRelease(SInvalWriteLock);
6356 heikki.linnakangas@i 675 [ - + ]: 3422 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
793 676 : 3422 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
6529 tgl@sss.pgh.pa.us 677 [ + + ]: 3422 : if (callerHasWriteLock)
678 : 2844 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
679 : : }
680 : : else
681 : : {
682 : 6323 : LWLockRelease(SInvalReadLock);
683 [ + + ]: 6323 : if (!callerHasWriteLock)
684 : 2468 : LWLockRelease(SInvalWriteLock);
685 : : }
10467 bruce@momjian.us 686 : 9745 : }
687 : :
688 : :
689 : : /*
690 : : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
691 : : *
692 : : * We split VirtualTransactionIds into two parts so that it is possible
693 : : * to allocate a new one without any contention for shared memory, except
694 : : * for a bit of additional overhead during backend startup/shutdown.
695 : : * The high-order part of a VirtualTransactionId is a ProcNumber, and the
696 : : * low-order part is a LocalTransactionId, which we assign from a local
697 : : * counter. To avoid the risk of a VirtualTransactionId being reused
698 : : * within a short interval, successive procs occupying the same PGPROC slot
699 : : * should use a consecutive sequence of local IDs, which is implemented
700 : : * by copying nextLocalTransactionId as seen above.
701 : : */
702 : : LocalTransactionId
6817 tgl@sss.pgh.pa.us 703 : 423430 : GetNextLocalTransactionId(void)
704 : : {
705 : : LocalTransactionId result;
706 : :
707 : : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
708 : : do
709 : : {
710 : 434890 : result = nextLocalTransactionId++;
711 [ + + ]: 434890 : } while (!LocalTransactionIdIsValid(result));
712 : :
713 : 423430 : return result;
714 : : }
|