Skip to content

Commit e9967e7

Browse files
smartconnpool: bulk sweep idle stacks (#20136)
Signed-off-by: Arthur Schreiber <arthur@planetscale.com>
1 parent 08e6756 commit e9967e7

4 files changed

Lines changed: 371 additions & 63 deletions

File tree

go/pools/smartconnpool/pool.go

Lines changed: 51 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -435,32 +435,34 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
435435
return
436436
}
437437
} else {
438-
conn.timeUsed.update()
439-
438+
now := monotonicNow()
440439
lifetime := pool.extendedMaxLifetime()
441-
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
440+
if lifetime > 0 && now-conn.timeCreated.get() > lifetime {
442441
pool.Metrics.maxLifetimeClosed.Add(1)
443442
conn.Close()
444443
// Using context.Background() is fine since MySQL connection already enforces
445444
// a connect timeout via the `db-connect-timeout-ms` config param.
446-
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
445+
if err := pool.connReopen(context.Background(), conn, now); err != nil {
447446
pool.closedConn()
448447
return
449448
}
450449
}
451450
}
452451

453-
pool.tryReturnConn(conn)
452+
pool.tryReturnConn(conn, true)
454453
}
455454

456-
func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
455+
func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C], updateIdleTime bool) bool {
457456
if pool.wait.tryReturnConn(conn) {
458457
return true
459458
}
460459

461460
if pool.closeOnIdleLimitReached(conn) {
462461
return false
463462
}
463+
if updateIdleTime {
464+
conn.timeUsed.update()
465+
}
464466
connSetting := conn.Conn.Setting()
465467
if connSetting == nil {
466468
pool.clean.Push(conn)
@@ -494,13 +496,11 @@ func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] {
494496

495497
func (pool *ConnPool[C]) tryReturnAnyConn() bool {
496498
if conn := pool.pop(&pool.clean); conn != nil {
497-
conn.timeUsed.update()
498-
return pool.tryReturnConn(conn)
499+
return pool.tryReturnConn(conn, true)
499500
}
500501
for u := 0; u <= stackMask; u++ {
501502
if conn := pool.pop(&pool.settings[u]); conn != nil {
502-
conn.timeUsed.update()
503-
return pool.tryReturnConn(conn)
503+
return pool.tryReturnConn(conn, true)
504504
}
505505
}
506506
return false
@@ -808,89 +808,77 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
808808
mono := monotonicFromTime(now)
809809

810810
closeInStack := func(s *connStack[C]) {
811-
conn, ok := s.Pop()
811+
conn, ok := s.PopAll()
812812
if !ok {
813-
// Early return to skip allocating slices when the stack is empty
814813
return
815814
}
816815

817-
activeConnections := pool.Active()
818-
819-
// Only expire up to ~half of the active connections at a time. This should
820-
// prevent us from closing too many connections in one go which could lead to
821-
// a lot of `.Get` calls being added to the waitlist if there's a sudden spike
822-
// coming in _after_ connections were popped off the stack but _before_ being
823-
// returned back to the pool. This is unlikely to happen, but better safe than sorry.
824-
//
825-
// We always expire at least one connection per stack per iteration to ensure
826-
// that idle connections are eventually closed even in small pools.
827-
//
828-
// We will expire any additional connections in the next iteration of the idle closer.
829-
expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1))
830-
validConnections := make([]*Pooled[C], 0, activeConnections)
831-
832-
// Pop out connections from the stack until we get a `nil` connection
833-
for ok {
834-
if conn.timeUsed.expired(mono, timeout) {
835-
expiredConnections = append(expiredConnections, conn)
836-
837-
if len(expiredConnections) == cap(expiredConnections) {
838-
// We have collected enough connections for this iteration to expire
839-
break
840-
}
841-
} else {
842-
validConnections = append(validConnections, conn)
843-
}
844-
845-
conn, ok = s.Pop()
816+
// The stack is LIFO. Reverse the detached list so we inspect and return
817+
// connections from oldest to newest.
818+
var stackConnections int64
819+
var oldestFirst *Pooled[C]
820+
for conn != nil {
821+
next := conn.next.Load()
822+
conn.next.Store(oldestFirst)
823+
oldestFirst = conn
824+
stackConnections++
825+
conn = next
846826
}
847827

848-
// Return all the valid connections back to waiters or the stack
849-
//
850-
// The order here is not important - because we can't guarantee to
851-
// restore the order we got the connections out of the stack anyway.
852-
//
853-
// If we return the connections in the order popped off the stack:
854-
// * waiters will get the newest connection first
855-
// * stack will have the oldest connections at the top of the stack.
856-
//
857-
// If we return the connections in reverse order:
858-
// * waiters will get the oldest connection first
859-
// * stack will have the newest connections at the top of the stack.
860-
//
861-
// Neither of these is better or worse than the other.
862-
for _, conn := range validConnections {
863-
pool.tryReturnConn(conn)
828+
// Bound closes to the detached stack, not to the whole pool or just the
829+
// expired cohort. This lets ordinary cleanup drain a small expired cohort
830+
// while preventing the worker from closing and reopening an entire stack at
831+
// once when all idle connections aged out together.
832+
maxExpiredConnections := max(stackConnections/2, 1)
833+
var expiredCount int64
834+
var expiredConnections *Pooled[C]
835+
for conn := oldestFirst; conn != nil; {
836+
next := conn.next.Load()
837+
conn.next.Store(nil)
838+
839+
if expiredCount < maxExpiredConnections && conn.timeUsed.expired(mono, timeout) {
840+
expiredCount++
841+
conn.next.Store(expiredConnections)
842+
expiredConnections = conn
843+
conn = next
844+
continue
845+
}
846+
847+
pool.tryReturnConn(conn, false)
848+
conn = next
864849
}
865850

866-
// Close all the expired connections and open new ones to replace them
867-
for _, conn := range expiredConnections {
851+
for conn := expiredConnections; conn != nil; conn = conn.next.Load() {
868852
pool.Metrics.idleClosed.Add(1)
869853

870854
conn.Close()
871855
pool.closedConn()
872856
}
873857

874-
for _, conn := range expiredConnections {
875-
if pool.active.Load() >= pool.capacity.Load() {
858+
for conn := expiredConnections; conn != nil; {
859+
next := conn.next.Load()
860+
if pool.close.Load() == nil || pool.active.Load() >= pool.capacity.Load() {
876861
break
877862
}
878863

864+
conn.next.Store(nil)
879865
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
866+
conn = next
880867
continue
881868
}
882869

883870
for {
884871
open := pool.active.Load()
885-
if open >= pool.capacity.Load() {
872+
if pool.close.Load() == nil || open >= pool.capacity.Load() {
886873
conn.Close()
887874
break
888875
}
889876
if pool.active.CompareAndSwap(open, open+1) {
890-
pool.tryReturnConn(conn)
877+
pool.tryReturnConn(conn, false)
891878
break
892879
}
893880
}
881+
conn = next
894882
}
895883
}
896884

0 commit comments

Comments
 (0)