Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,9 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
func (pool *ConnPool[C]) SetCapacity(ctx context.Context, newcap int64) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()
if pool.close.Load() == nil {
return ErrConnPoolClosed
}
return pool.setCapacity(ctx, newcap)
}

Expand Down
103 changes: 103 additions & 0 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,82 @@ func TestUserClosing(t *testing.T) {
}
}

func TestCloseWithContextAfterIdleCleanupPopDoesNotLeak(t *testing.T) {
var state TestState

p := NewPool(&Config[*TestConn]{
Capacity: 2,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)
t.Cleanup(p.Close)

held, err := p.Get(t.Context(), nil)
require.NoError(t, err)

idle, err := p.Get(t.Context(), nil)
require.NoError(t, err)
idle.Recycle()

cleanupConn, ok := p.clean.Pop()
require.True(t, ok)
require.NotNil(t, cleanupConn)

var waiterGotConn atomic.Bool
waiterDone := make(chan struct{})
go func() {
defer close(waiterDone)

conn, err := p.Get(t.Context(), nil)
if err == nil {
waiterGotConn.Store(true)
conn.Recycle()
}
}()

require.Eventually(t, func() bool {
return p.wait.waiting() == 1
}, 30*time.Second, time.Millisecond)

closeCtx, cancelClose := context.WithTimeout(t.Context(), 30*time.Second)
closeDone := make(chan error, 1)
go func() {
closeDone <- p.CloseWithContext(closeCtx)
}()

require.Eventually(t, func() bool {
return p.Capacity() == 0
}, 30*time.Second, time.Millisecond)

p.tryReturnConn(cleanupConn, false)
held.Recycle()

var closeErr error
require.Eventually(t, func() bool {
select {
case closeErr = <-closeDone:
return true
default:
return false
}
}, 30*time.Second, time.Millisecond)
cancelClose()
require.NoError(t, closeErr)

require.Eventually(t, func() bool {
select {
case <-waiterDone:
return true
default:
return false
}
}, 30*time.Second, time.Millisecond)

require.False(t, waiterGotConn.Load(), "close must not hand an idle-cleanup conn to a waiter after capacity reaches 0")
require.EqualValues(t, 0, p.Active())
require.EqualValues(t, 0, p.InUse())
require.EqualValues(t, 0, state.open.Load())
}

func TestCloseWithContextAfterSetCapacityZeroClosesPool(t *testing.T) {
var state TestState

Expand Down Expand Up @@ -732,6 +808,33 @@ func TestCloseWithContextDrainsAfterTimedOutSetCapacityZero(t *testing.T) {
conn.Recycle()
}

// TestSetCapacityRejectedOnClosedPool pins the contract that SetCapacity must
// fail fast (rather than silently writing into pool.capacity) when the pool
// is not open. This covers both states: never-opened and previously-opened-
// then-closed. Without this guard, a SetCapacity call queued on capacityMu
// during CloseWithContext can race through after Close releases the mutex
// and bump capacity back up — leaving the pool closed with non-zero capacity.
func TestSetCapacityRejectedOnClosedPool(t *testing.T) {
var state TestState

// Never-opened pool: SetCapacity must reject.
p := NewPool(&Config[*TestConn]{Capacity: 4})
err := p.SetCapacity(t.Context(), 2)
require.ErrorIs(t, err, ErrConnPoolClosed)

// Opened pool: SetCapacity succeeds.
p.Open(newConnector(&state), nil)
require.NoError(t, p.SetCapacity(t.Context(), 2))
require.EqualValues(t, 2, p.Capacity())

// Closed pool: SetCapacity must reject and must not change capacity.
p.Close()
require.False(t, p.IsOpen())
err = p.SetCapacity(t.Context(), 8)
require.ErrorIs(t, err, ErrConnPoolClosed)
require.EqualValues(t, 0, p.Capacity())
}

func TestConnReopen(t *testing.T) {
var state TestState

Expand Down
Loading
Loading