Skip to content

Commit e396da4

Browse files
smartconnpool: unblock Close and preserve Setting on reopen (#20122)
Signed-off-by: Arthur Schreiber <arthur@planetscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cff3492 commit e396da4

3 files changed

Lines changed: 1053 additions & 34 deletions

File tree

go/pools/smartconnpool/pool.go

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ type (
101101
RefreshCheck func() (bool, error)
102102
)
103103

104+
// lifetime carries a context that is alive for as long as the pool is
105+
// open. cancel is invoked at the start of Close so that user-supplied
106+
// callbacks (e.g. the Connector) blocked anywhere in the pool unblock
107+
// promptly rather than waiting on a backend timeout.
108+
type lifetime struct {
109+
ctx context.Context
110+
cancel context.CancelFunc
111+
}
112+
113+
// alreadyCancelled is returned for worker connects that race with a pool
114+
// close — they should fail fast rather than open a connection that has
115+
// nowhere to go.
116+
var alreadyCancelled = func() context.Context {
117+
ctx, cancel := context.WithCancel(context.Background())
118+
cancel()
119+
return ctx
120+
}()
121+
104122
type Config[C Connection] struct {
105123
Capacity int64
106124
MaxIdleCount int64
@@ -144,6 +162,13 @@ type ConnPool[C Connection] struct {
144162
close atomic.Pointer[chan struct{}]
145163
capacityMu sync.Mutex
146164

165+
// lifetime holds a context that is cancelled when the pool starts
166+
// closing. Code paths that call into user-supplied callbacks (e.g. the
167+
// connect Connector) pass it through so that calls in flight at shutdown
168+
// unblock instead of blocking on the backend's connect timeout. Held
169+
// behind a pointer to keep ConnPool's heap footprint stable.
170+
lifetime atomic.Pointer[lifetime]
171+
147172
config struct {
148173
// connect is the callback to create a new connection for the pool
149174
connect Connector[C]
@@ -222,6 +247,9 @@ func (pool *ConnPool[C]) open() {
222247
return
223248
}
224249

250+
ctx, cancel := context.WithCancel(context.Background())
251+
pool.lifetime.Store(&lifetime{ctx: ctx, cancel: cancel})
252+
225253
pool.capacity.Store(pool.config.maxCapacity)
226254
pool.setIdleCount()
227255

@@ -302,6 +330,15 @@ func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
302330
return nil
303331
}
304332

333+
// Cancel the pool's lifetime context before we start draining: any user
334+
// connect callback currently blocked behind it (e.g. the idle worker
335+
// reopening an expired connection) needs to unblock now so that
336+
// setCapacity can observe active dropping to zero and so that
337+
// workers.Wait below isn't held up by a hung connect.
338+
if lt := pool.lifetime.Swap(nil); lt != nil {
339+
lt.cancel()
340+
}
341+
305342
// close all the connections in the pool; if we time out while waiting for
306343
// users to return our connections, we still want to finish the shutdown
307344
// for the pool
@@ -434,14 +471,12 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
434471
pool.borrowed.Add(-1)
435472

436473
if conn == nil {
474+
// Taint, or Recycle on a closed conn: open a replacement so a
475+
// queued waiter can be served. We use the pool's lifetime context
476+
// so a Close in flight unblocks this connect, instead of waiting
477+
// for the backend's connect timeout.
437478
var err error
438-
if pool.close.Load() == nil {
439-
pool.closedConn()
440-
return
441-
}
442-
// Using context.Background() is fine since MySQL connection already enforces
443-
// a connect timeout via the `db-connect-timeout-ms` config param.
444-
conn, err = pool.connNew(context.Background())
479+
conn, err = pool.connNew(pool.connectCtx())
445480
if err != nil {
446481
pool.closedConn()
447482
return
@@ -450,11 +485,12 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
450485
now := monotonicNow()
451486
lifetime := pool.extendedMaxLifetime()
452487
if lifetime > 0 && now-conn.timeCreated.get() > lifetime {
488+
// Reopen the maxLifetime-exceeded conn. Pass the original
489+
// Setting so the reopened conn lands back in the same settings
490+
// stack rather than silently migrating to clean.
453491
pool.Metrics.maxLifetimeClosed.Add(1)
454492
conn.Close()
455-
// Using context.Background() is fine since MySQL connection already enforces
456-
// a connect timeout via the `db-connect-timeout-ms` config param.
457-
if err := pool.connReopen(context.Background(), conn, now); err != nil {
493+
if err := pool.connReopen(pool.connectCtx(), conn, conn.Conn.Setting(), now); err != nil {
458494
pool.closedConn()
459495
return
460496
}
@@ -465,7 +501,12 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
465501
}
466502

467503
func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C], updateIdleTime bool) bool {
468-
if pool.close.Load() == nil {
504+
// If the pool has more conns out than its configured capacity, close
505+
// this one eagerly instead of handing it off to a waiter or pushing it
506+
// onto a stack. Otherwise a setCapacity reducing capacity keeps cycling
507+
// connections from Recycle to waiter and the drain loop never observes
508+
// a non-empty stack — including the capacity=0 case during Close.
509+
if pool.active.Load() > pool.capacity.Load() {
469510
conn.Close()
470511
pool.closedConn()
471512
return false
@@ -549,20 +590,24 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
549590
return time.Duration(maxLifetime) + time.Duration(rand.Int64N(maxLifetime))
550591
}
551592

552-
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) {
593+
// connReopen replaces dbconn.Conn with a freshly-connected conn and
594+
// applies the given Setting on it (or leaves it settingless when setting
595+
// is nil). Callers refreshing a conn in place (idle worker expiry,
596+
// maxLifetime rotation) pass the conn's previous Setting so the
597+
// replacement lands back in the same settings stack; callers recovering
598+
// from a ResetSetting failure (the get()/getWithSetting error paths)
599+
// pass nil so the caller can re-apply or omit a setting as needed.
600+
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], setting *Setting, now time.Duration) (err error) {
553601
dbconn.Conn, err = pool.config.connect(ctx)
554602
if err != nil {
555603
return err
556604
}
557-
558-
if setting := dbconn.Conn.Setting(); setting != nil {
559-
err = dbconn.Conn.ApplySetting(ctx, setting)
560-
if err != nil {
605+
if setting != nil {
606+
if err = dbconn.Conn.ApplySetting(ctx, setting); err != nil {
561607
dbconn.Close()
562608
return err
563609
}
564610
}
565-
566611
dbconn.timeCreated.set(now)
567612
dbconn.timeUsed.set(now)
568613
return nil
@@ -604,6 +649,17 @@ func (pool *ConnPool[C]) closedConn() {
604649
_ = pool.active.Add(-1)
605650
}
606651

652+
// connectCtx returns a context that is cancelled when the pool starts
653+
// closing. If the pool is already closing (or never opened), it returns
654+
// a pre-cancelled context so callers don't block on user-supplied connect
655+
// callbacks for a pool that's going away.
656+
func (pool *ConnPool[C]) connectCtx() context.Context {
657+
if lt := pool.lifetime.Load(); lt != nil {
658+
return lt.ctx
659+
}
660+
return alreadyCancelled
661+
}
662+
607663
func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) {
608664
for {
609665
if pool.close.Load() == nil {
@@ -681,7 +737,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
681737
err = conn.Conn.ResetSetting(ctx)
682738
if err != nil {
683739
conn.Close()
684-
err = pool.connReopen(ctx, conn, monotonicNow())
740+
err = pool.connReopen(ctx, conn, nil, monotonicNow())
685741
if err != nil {
686742
pool.closedConn()
687743
return nil, err
@@ -750,7 +806,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
750806
err = conn.Conn.ResetSetting(ctx)
751807
if err != nil {
752808
conn.Close()
753-
err = pool.connReopen(ctx, conn, monotonicNow())
809+
err = pool.connReopen(ctx, conn, nil, monotonicNow())
754810
if err != nil {
755811
pool.closedConn()
756812
return nil, err
@@ -882,19 +938,25 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
882938

883939
for conn := expiredConnections; conn != nil; conn = conn.next.Load() {
884940
pool.Metrics.idleClosed.Add(1)
885-
886941
conn.Close()
887942
pool.closedConn()
888943
}
889944

945+
// Reopen up to capacity. Each reopen re-acquires its active slot
946+
// via CAS; if capacity has been reduced in the meantime, the
947+
// freshly connected conn is closed instead of returned to the
948+
// pool. The worker context ensures a Close interrupts any
949+
// in-flight connect. We pass each conn's prior Setting so the
950+
// reopened conn lands back in the same settings stack instead of
951+
// silently migrating to clean.
890952
for conn := expiredConnections; conn != nil; {
891953
next := conn.next.Load()
892954
if pool.close.Load() == nil || pool.active.Load() >= pool.capacity.Load() {
893955
break
894956
}
895957

896958
conn.next.Store(nil)
897-
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
959+
if err := pool.connReopen(pool.connectCtx(), conn, conn.Conn.Setting(), mono); err != nil {
898960
conn = next
899961
continue
900962
}

0 commit comments

Comments
 (0)