Skip to content

Commit 4061458

Browse files
smartconnpool: avoid close deadlock with refresh reopen (#20157)
Signed-off-by: Arthur Schreiber <arthur@planetscale.com>
1 parent 4caae44 commit 4061458

3 files changed

Lines changed: 351 additions & 3 deletions

File tree

go/pools/smartconnpool/pool.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,10 @@ func (pool *ConnPool[C]) Close() {
295295
// pool closing operation
296296
func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
297297
pool.capacityMu.Lock()
298-
defer pool.capacityMu.Unlock()
299298

300-
if pool.close.Load() == nil || pool.capacity.Load() == 0 {
299+
if pool.close.Load() == nil {
301300
// already closed
301+
pool.capacityMu.Unlock()
302302
return nil
303303
}
304304

@@ -310,6 +310,7 @@ func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
310310
closeChan := *pool.close.Swap(nil)
311311
close(closeChan)
312312

313+
pool.capacityMu.Unlock()
313314
pool.workers.Wait()
314315
return err
315316
}
@@ -318,6 +319,10 @@ func (pool *ConnPool[C]) reopen() {
318319
pool.capacityMu.Lock()
319320
defer pool.capacityMu.Unlock()
320321

322+
if pool.close.Load() == nil {
323+
return
324+
}
325+
321326
capacity := pool.capacity.Load()
322327
if capacity == 0 {
323328
return
@@ -411,6 +416,9 @@ func (pool *ConnPool[C]) Get(ctx context.Context, setting *Setting) (*Pooled[C],
411416
if ctx.Err() != nil {
412417
return nil, ErrCtxTimeout
413418
}
419+
if pool.close.Load() == nil {
420+
return nil, ErrConnPoolClosed
421+
}
414422
if pool.capacity.Load() == 0 {
415423
return nil, ErrConnPoolClosed
416424
}
@@ -427,6 +435,10 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
427435

428436
if conn == nil {
429437
var err error
438+
if pool.close.Load() == nil {
439+
pool.closedConn()
440+
return
441+
}
430442
// Using context.Background() is fine since MySQL connection already enforces
431443
// a connect timeout via the `db-connect-timeout-ms` config param.
432444
conn, err = pool.connNew(context.Background())
@@ -453,6 +465,12 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
453465
}
454466

455467
func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C], updateIdleTime bool) bool {
468+
if pool.close.Load() == nil {
469+
conn.Close()
470+
pool.closedConn()
471+
return false
472+
}
473+
456474
if pool.wait.tryReturnConn(conn) {
457475
return true
458476
}
@@ -588,6 +606,10 @@ func (pool *ConnPool[C]) closedConn() {
588606

589607
func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) {
590608
for {
609+
if pool.close.Load() == nil {
610+
return nil, ErrConnPoolClosed
611+
}
612+
591613
open := pool.active.Load()
592614
if open >= pool.capacity.Load() {
593615
return nil, nil
@@ -599,6 +621,11 @@ func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) {
599621
pool.closedConn()
600622
return nil, err
601623
}
624+
if pool.close.Load() == nil {
625+
conn.Close()
626+
pool.closedConn()
627+
return nil, ErrConnPoolClosed
628+
}
602629
return conn, nil
603630
}
604631
}
@@ -763,7 +790,12 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
763790
}
764791

765792
oldcap := pool.capacity.Swap(newcap)
766-
if oldcap == newcap {
793+
// Skip the drain only when capacity is unchanged AND we're already at or
794+
// below the target. Otherwise we may have been left with active > newcap
795+
// by a prior call that timed out (e.g. SetCapacity(0) racing with held
796+
// conns), and CloseWithContext relies on a follow-up call here to finish
797+
// draining.
798+
if oldcap == newcap && pool.active.Load() <= newcap {
767799
return nil
768800
}
769801
// update the idle count to match the new capacity if necessary

go/pools/smartconnpool/pool_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,78 @@ func TestUserClosing(t *testing.T) {
659659
}
660660
}
661661

662+
func TestCloseWithContextAfterSetCapacityZeroClosesPool(t *testing.T) {
663+
var state TestState
664+
665+
p := NewPool(&Config[*TestConn]{
666+
Capacity: 1,
667+
LogWait: state.LogWait,
668+
}).Open(newConnector(&state), nil)
669+
670+
t.Cleanup(func() {
671+
if closeChan := p.close.Swap(nil); closeChan != nil {
672+
close(*closeChan)
673+
p.workers.Wait()
674+
}
675+
})
676+
677+
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
678+
defer cancel()
679+
680+
require.NoError(t, p.SetCapacity(ctx, 0))
681+
require.True(t, p.IsOpen())
682+
683+
require.NoError(t, p.CloseWithContext(ctx))
684+
require.False(t, p.IsOpen())
685+
}
686+
687+
// TestCloseWithContextDrainsAfterTimedOutSetCapacityZero guards against an
688+
// early return in setCapacity that previously caused CloseWithContext to
689+
// return nil despite active conns being out, when a prior SetCapacity(0)
690+
// had timed out with conns still borrowed. The path is:
691+
//
692+
// - SetCapacity(0) times out: capacity is swapped to 0 first, then the
693+
// drain loop hits the deadline and returns an error.
694+
// - The next CloseWithContext calls setCapacity(0) again. With the bug,
695+
// oldcap == newcap == 0 short-circuited the drain loop and Close
696+
// returned nil even though pool.active was still > 0.
697+
//
698+
// CloseWithContext must instead attempt to drain and surface a timeout
699+
// when borrowed conns aren't returned.
700+
func TestCloseWithContextDrainsAfterTimedOutSetCapacityZero(t *testing.T) {
701+
var state TestState
702+
703+
p := NewPool(&Config[*TestConn]{
704+
Capacity: 2,
705+
LogWait: state.LogWait,
706+
}).Open(newConnector(&state), nil)
707+
708+
t.Cleanup(func() {
709+
if closeChan := p.close.Swap(nil); closeChan != nil {
710+
close(*closeChan)
711+
p.workers.Wait()
712+
}
713+
})
714+
715+
conn, err := p.Get(t.Context(), nil)
716+
require.NoError(t, err)
717+
718+
// SetCapacity(0) must time out: a conn is still borrowed.
719+
cancelledCtx, cancel := context.WithCancel(t.Context())
720+
cancel()
721+
require.Error(t, p.SetCapacity(cancelledCtx, 0))
722+
require.EqualValues(t, 0, p.Capacity())
723+
require.GreaterOrEqual(t, p.Active(), int64(1))
724+
725+
// CloseWithContext must still attempt to drain — without the fix it
726+
// returns nil immediately because setCapacity short-circuits on
727+
// oldcap == newcap.
728+
require.Error(t, p.CloseWithContext(cancelledCtx))
729+
730+
// Release the held conn so it is properly closed during teardown.
731+
conn.Recycle()
732+
}
733+
662734
func TestConnReopen(t *testing.T) {
663735
var state TestState
664736

0 commit comments

Comments
 (0)