@@ -20,8 +20,11 @@ import (
2020 "context"
2121 "sync"
2222 "sync/atomic"
23+ "time"
2324
2425 sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
26+ "google.golang.org/protobuf/proto"
27+ "google.golang.org/protobuf/reflect/protoreflect"
2528)
2629
2730type channelFinder struct {
@@ -30,26 +33,86 @@ type channelFinder struct {
3033 databaseID atomic.Uint64
3134 recipeCache * keyRecipeCache
3235 rangeCache * keyRangeCache
36+
37+ coalescingMu sync.Mutex
38+ pendingUpdates []* sppb.CacheUpdate
39+ flushScheduled bool
40+ coalescingDelay time.Duration
41+ scheduleFlush func (time.Duration , func ())
42+ }
43+
44+ const cacheUpdateCoalescingWindow = 5 * time .Millisecond
45+
46+ func defaultChannelFinderFlushScheduler (delay time.Duration , fn func ()) {
47+ time .AfterFunc (delay , fn )
3348}
3449
3550func newChannelFinder (endpointCache channelEndpointCache ) * channelFinder {
3651 return & channelFinder {
37- recipeCache : newKeyRecipeCache (),
38- rangeCache : newKeyRangeCache (endpointCache ),
52+ recipeCache : newKeyRecipeCache (),
53+ rangeCache : newKeyRangeCache (endpointCache ),
54+ coalescingDelay : cacheUpdateCoalescingWindow ,
55+ scheduleFlush : defaultChannelFinderFlushScheduler ,
3956 }
4057}
4158
4259func (f * channelFinder ) useDeterministicRandom () {
4360 f .rangeCache .useDeterministicRandom ()
4461}
4562
63+ func (f * channelFinder ) setLifecycleManager (lifecycleManager * endpointLifecycleManager ) {
64+ if f == nil {
65+ return
66+ }
67+ f .rangeCache .setLifecycleManager (lifecycleManager )
68+ }
69+
70+ func (f * channelFinder ) setCoalescingDelayForTest (delay time.Duration ) {
71+ if f == nil {
72+ return
73+ }
74+ f .coalescingMu .Lock ()
75+ defer f .coalescingMu .Unlock ()
76+ f .coalescingDelay = delay
77+ }
78+
79+ func (f * channelFinder ) setFlushSchedulerForTest (schedule func (time.Duration , func ())) {
80+ if f == nil {
81+ return
82+ }
83+ if schedule == nil {
84+ schedule = defaultChannelFinderFlushScheduler
85+ }
86+ f .coalescingMu .Lock ()
87+ defer f .coalescingMu .Unlock ()
88+ f .scheduleFlush = schedule
89+ }
90+
4691func (f * channelFinder ) update (update * sppb.CacheUpdate ) {
4792 if update == nil {
4893 return
4994 }
5095 f .updateMu .Lock ()
5196 defer f .updateMu .Unlock ()
97+ f .applyUpdateLocked (update )
98+ }
99+
100+ func (f * channelFinder ) applyUpdates (updates []* sppb.CacheUpdate ) {
101+ if len (updates ) == 0 {
102+ return
103+ }
104+ f .updateMu .Lock ()
105+ defer f .updateMu .Unlock ()
52106
107+ for _ , update := range updates {
108+ f .applyUpdateLocked (update )
109+ }
110+ }
111+
112+ func (f * channelFinder ) applyUpdateLocked (update * sppb.CacheUpdate ) {
113+ if update == nil {
114+ return
115+ }
53116 currentID := f .databaseID .Load ()
54117 if currentID != update .GetDatabaseId () {
55118 if currentID != 0 {
@@ -64,13 +127,103 @@ func (f *channelFinder) update(update *sppb.CacheUpdate) {
64127 f .rangeCache .addRanges (update )
65128}
66129
130+ func (f * channelFinder ) updateAsync (update * sppb.CacheUpdate ) {
131+ if ! f .shouldProcessUpdate (update ) {
132+ return
133+ }
134+ f .enqueueCoalescedUpdate (update )
135+ }
136+
137+ func (f * channelFinder ) shouldProcessUpdate (update * sppb.CacheUpdate ) bool {
138+ if update == nil {
139+ return false
140+ }
141+ // Apply any material cache update and let applyUpdateLocked handle a database
142+ // switch by clearing stale state before storing the new database ID. For
143+ // database-ID-only messages, only process them when they indicate that this
144+ // finder has switched to a different database; database IDs are treated as
145+ // identity values, not an ordered sequence.
146+ if f .isMaterialUpdate (update ) {
147+ return true
148+ }
149+ updateDatabaseID := update .GetDatabaseId ()
150+ return updateDatabaseID != 0 && f .databaseID .Load () != updateDatabaseID
151+ }
152+
153+ func (* channelFinder ) isMaterialUpdate (update * sppb.CacheUpdate ) bool {
154+ if update == nil {
155+ return false
156+ }
157+ return len (update .GetGroup ()) > 0 ||
158+ len (update .GetRange ()) > 0 ||
159+ (update .GetKeyRecipes () != nil && len (update .GetKeyRecipes ().GetRecipe ()) > 0 )
160+ }
161+
162+ func (f * channelFinder ) enqueueCoalescedUpdate (update * sppb.CacheUpdate ) {
163+ if f == nil || update == nil {
164+ return
165+ }
166+
167+ f .coalescingMu .Lock ()
168+ f .pendingUpdates = append (f .pendingUpdates , cloneCacheUpdate (update ))
169+ if f .flushScheduled {
170+ f .coalescingMu .Unlock ()
171+ return
172+ }
173+ f .flushScheduled = true
174+ delay := f .coalescingDelay
175+ scheduleFlush := f .scheduleFlush
176+ f .coalescingMu .Unlock ()
177+
178+ scheduleFlush (delay , f .flushCoalescedUpdates )
179+ }
180+
181+ func (f * channelFinder ) flushCoalescedUpdates () {
182+ if f == nil {
183+ return
184+ }
185+ f .coalescingMu .Lock ()
186+ updates := f .pendingUpdates
187+ f .pendingUpdates = nil
188+ f .flushScheduled = false
189+ f .coalescingMu .Unlock ()
190+
191+ f .applyUpdates (updates )
192+ }
193+
194+ func cloneCacheUpdate (update * sppb.CacheUpdate ) * sppb.CacheUpdate {
195+ if update == nil {
196+ return nil
197+ }
198+ return cloneProto (update )
199+ }
200+
201+ func cloneProto [M interface { ProtoReflect () protoreflect.Message }](msg M ) M {
202+ if any (msg ) == nil {
203+ var zero M
204+ return zero
205+ }
206+ return proto .Clone (msg ).(M )
207+ }
208+
67209func (f * channelFinder ) findServerRead (ctx context.Context , req * sppb.ReadRequest , preferLeader bool ) channelEndpoint {
210+ return f .findServerReadWithExclusions (ctx , req , preferLeader , nil )
211+ }
212+
213+ func (f * channelFinder ) findServerReadWithExclusions (ctx context.Context , req * sppb.ReadRequest , preferLeader bool , excludedEndpoints endpointExcluder ) channelEndpoint {
214+ endpoint , _ := f .findServerReadWithExclusionsAndDetails (ctx , req , preferLeader , excludedEndpoints )
215+ return endpoint
216+ }
217+
218+ func (f * channelFinder ) findServerReadWithExclusionsAndDetails (ctx context.Context , req * sppb.ReadRequest , preferLeader bool , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
219+ details := newRouteSelectionDetails ()
68220 if req == nil {
69- return nil
221+ details .defaultReasonCode = routeReasonRangeCacheMiss
222+ return nil , details
70223 }
71224 f .recipeCache .computeReadKeys (req )
72225 hint := ensureReadRoutingHint (req )
73- return f .fillRoutingHint (ctx , preferLeader , rangeModeCoveringSplit , req .GetDirectedReadOptions (), hint )
226+ return f .fillRoutingHintWithExclusionsAndDetails (ctx , preferLeader , rangeModeCoveringSplit , req .GetDirectedReadOptions (), hint , excludedEndpoints )
74227}
75228
76229func (f * channelFinder ) findServerReadWithTransaction (ctx context.Context , req * sppb.ReadRequest ) channelEndpoint {
@@ -81,12 +234,23 @@ func (f *channelFinder) findServerReadWithTransaction(ctx context.Context, req *
81234}
82235
83236func (f * channelFinder ) findServerExecuteSQL (ctx context.Context , req * sppb.ExecuteSqlRequest , preferLeader bool ) channelEndpoint {
237+ return f .findServerExecuteSQLWithExclusions (ctx , req , preferLeader , nil )
238+ }
239+
240+ func (f * channelFinder ) findServerExecuteSQLWithExclusions (ctx context.Context , req * sppb.ExecuteSqlRequest , preferLeader bool , excludedEndpoints endpointExcluder ) channelEndpoint {
241+ endpoint , _ := f .findServerExecuteSQLWithExclusionsAndDetails (ctx , req , preferLeader , excludedEndpoints )
242+ return endpoint
243+ }
244+
245+ func (f * channelFinder ) findServerExecuteSQLWithExclusionsAndDetails (ctx context.Context , req * sppb.ExecuteSqlRequest , preferLeader bool , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
246+ details := newRouteSelectionDetails ()
84247 if req == nil {
85- return nil
248+ details .defaultReasonCode = routeReasonRangeCacheMiss
249+ return nil , details
86250 }
87251 f .recipeCache .computeQueryKeys (req )
88252 hint := ensureExecuteSQLRoutingHint (req )
89- return f .fillRoutingHint (ctx , preferLeader , rangeModePickRandom , req .GetDirectedReadOptions (), hint )
253+ return f .fillRoutingHintWithExclusionsAndDetails (ctx , preferLeader , rangeModePickRandom , req .GetDirectedReadOptions (), hint , excludedEndpoints )
90254}
91255
92256func (f * channelFinder ) findServerExecuteSQLWithTransaction (ctx context.Context , req * sppb.ExecuteSqlRequest ) channelEndpoint {
@@ -97,46 +261,93 @@ func (f *channelFinder) findServerExecuteSQLWithTransaction(ctx context.Context,
97261}
98262
99263func (f * channelFinder ) findServerBeginTransaction (ctx context.Context , req * sppb.BeginTransactionRequest ) channelEndpoint {
264+ return f .findServerBeginTransactionWithExclusions (ctx , req , nil )
265+ }
266+
267+ func (f * channelFinder ) findServerBeginTransactionWithExclusions (ctx context.Context , req * sppb.BeginTransactionRequest , excludedEndpoints endpointExcluder ) channelEndpoint {
268+ endpoint , _ := f .findServerBeginTransactionWithExclusionsAndDetails (ctx , req , excludedEndpoints )
269+ return endpoint
270+ }
271+
272+ func (f * channelFinder ) findServerBeginTransactionWithExclusionsAndDetails (ctx context.Context , req * sppb.BeginTransactionRequest , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
273+ details := newRouteSelectionDetails ()
100274 if req == nil || req .GetMutationKey () == nil {
101- return nil
275+ details .defaultReasonCode = routeReasonRangeCacheMiss
276+ return nil , details
102277 }
103- return f .routeMutation (ctx , req .GetMutationKey (), preferLeaderFromTransactionOptions (req .GetOptions ()), ensureBeginTransactionRoutingHint (req ))
278+ return f .routeMutationWithExclusionsAndDetails (ctx , req .GetMutationKey (), preferLeaderFromTransactionOptions (req .GetOptions ()), ensureBeginTransactionRoutingHint (req ), excludedEndpoints )
104279}
105280
106281func (f * channelFinder ) fillCommitRoutingHint (ctx context.Context , req * sppb.CommitRequest ) channelEndpoint {
282+ return f .fillCommitRoutingHintWithExclusions (ctx , req , nil )
283+ }
284+
285+ func (f * channelFinder ) fillCommitRoutingHintWithExclusions (ctx context.Context , req * sppb.CommitRequest , excludedEndpoints endpointExcluder ) channelEndpoint {
286+ endpoint , _ := f .fillCommitRoutingHintWithExclusionsAndDetails (ctx , req , excludedEndpoints )
287+ return endpoint
288+ }
289+
290+ func (f * channelFinder ) fillCommitRoutingHintWithExclusionsAndDetails (ctx context.Context , req * sppb.CommitRequest , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
291+ details := newRouteSelectionDetails ()
107292 if req == nil {
108- return nil
293+ details .defaultReasonCode = routeReasonRangeCacheMiss
294+ return nil , details
109295 }
110296 mutation := selectMutationProtoForRouting (req .GetMutations ())
111297 if mutation == nil {
112- return nil
298+ details .defaultReasonCode = routeReasonRangeCacheMiss
299+ return nil , details
113300 }
114- return f .routeMutation (ctx , mutation , true , ensureCommitRoutingHint (req ))
301+ return f .routeMutationWithExclusionsAndDetails (ctx , mutation , true , ensureCommitRoutingHint (req ), excludedEndpoints )
115302}
116303
117304func (f * channelFinder ) routeMutation (ctx context.Context , mutation * sppb.Mutation , preferLeader bool , hint * sppb.RoutingHint ) channelEndpoint {
305+ return f .routeMutationWithExclusions (ctx , mutation , preferLeader , hint , nil )
306+ }
307+
308+ func (f * channelFinder ) routeMutationWithExclusions (ctx context.Context , mutation * sppb.Mutation , preferLeader bool , hint * sppb.RoutingHint , excludedEndpoints endpointExcluder ) channelEndpoint {
309+ endpoint , _ := f .routeMutationWithExclusionsAndDetails (ctx , mutation , preferLeader , hint , excludedEndpoints )
310+ return endpoint
311+ }
312+
313+ func (f * channelFinder ) routeMutationWithExclusionsAndDetails (ctx context.Context , mutation * sppb.Mutation , preferLeader bool , hint * sppb.RoutingHint , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
314+ details := newRouteSelectionDetails ()
118315 if mutation == nil || hint == nil {
119- return nil
316+ details .defaultReasonCode = routeReasonRangeCacheMiss
317+ return nil , details
120318 }
121319 f .recipeCache .applySchemaGeneration (hint )
122320 target := f .recipeCache .mutationToTargetRange (mutation )
123321 if target == nil {
124- return nil
322+ details .defaultReasonCode = routeReasonRangeCacheMiss
323+ return nil , details
125324 }
126325 f .recipeCache .applyTargetRange (hint , target )
127- return f .fillRoutingHint (ctx , preferLeader , rangeModeCoveringSplit , & sppb.DirectedReadOptions {}, hint )
326+ return f .fillRoutingHintWithExclusionsAndDetails (ctx , preferLeader , rangeModeCoveringSplit , & sppb.DirectedReadOptions {}, hint , excludedEndpoints )
128327}
129328
130329func (f * channelFinder ) fillRoutingHint (ctx context.Context , preferLeader bool , mode rangeMode , directedReadOptions * sppb.DirectedReadOptions , hint * sppb.RoutingHint ) channelEndpoint {
330+ return f .fillRoutingHintWithExclusions (ctx , preferLeader , mode , directedReadOptions , hint , nil )
331+ }
332+
333+ func (f * channelFinder ) fillRoutingHintWithExclusions (ctx context.Context , preferLeader bool , mode rangeMode , directedReadOptions * sppb.DirectedReadOptions , hint * sppb.RoutingHint , excludedEndpoints endpointExcluder ) channelEndpoint {
334+ endpoint , _ := f .fillRoutingHintWithExclusionsAndDetails (ctx , preferLeader , mode , directedReadOptions , hint , excludedEndpoints )
335+ return endpoint
336+ }
337+
338+ func (f * channelFinder ) fillRoutingHintWithExclusionsAndDetails (ctx context.Context , preferLeader bool , mode rangeMode , directedReadOptions * sppb.DirectedReadOptions , hint * sppb.RoutingHint , excludedEndpoints endpointExcluder ) (channelEndpoint , routeSelectionDetails ) {
339+ details := newRouteSelectionDetails ()
131340 if hint == nil {
132- return nil
341+ details .defaultReasonCode = routeReasonRangeCacheMiss
342+ return nil , details
133343 }
134344 databaseID := f .databaseID .Load ()
135345 if databaseID == 0 {
136- return nil
346+ details .defaultReasonCode = routeReasonRangeCacheMiss
347+ return nil , details
137348 }
138349 hint .DatabaseId = databaseID
139- return f .rangeCache .fillRoutingHint (ctx , preferLeader , mode , directedReadOptions , hint )
350+ return f .rangeCache .fillRoutingHintWithExclusionsAndDetails (ctx , preferLeader , mode , directedReadOptions , hint , excludedEndpoints )
140351}
141352
142353func preferLeaderFromSelector (selector * sppb.TransactionSelector ) bool {
0 commit comments