Skip to content
19 changes: 16 additions & 3 deletions go/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (l *List[T]) move(e, at *Element[T]) {
e.next.prev = e
}

// Remove removes e from l if e is an element of list l.
// It returns the element value e.Value.
// The element must not be nil.
// Remove removes e from l. The element must not be nil and must be an
// element of list l; Remove panics otherwise. Use RemoveIfPresent when the
// element may have been removed already.
func (l *List[T]) Remove(e *Element[T]) {
if e.list != l {
panic("removing from wrong List")
Expand All @@ -142,6 +142,19 @@ func (l *List[T]) Remove(e *Element[T]) {
l.remove(e)
}

// RemoveIfPresent removes e from l if e is currently an element of list l
// and reports whether it did so. Unlike Remove, it is a no-op rather than a
// panic when e is not in l, which makes membership checks O(1) for callers
// that would otherwise have to scan the list before removing.
// The element must not be nil.
func (l *List[T]) RemoveIfPresent(e *Element[T]) bool {
if e.list != l {
return false
}
l.remove(e)
return true
}

// PushFront inserts a new element e with value v at the front of list l and returns e.
func (l *List[T]) PushFront(v T) *Element[T] {
return l.insertValue(v, &l.root)
Expand Down
37 changes: 37 additions & 0 deletions go/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInitEmptyList(t *testing.T) {
Expand Down Expand Up @@ -133,3 +134,39 @@ func TestPushFrontValue(t *testing.T) {
assert.Equal(t, a, l.Front())
assert.Equal(t, a, e.prev)
}

func TestRemoveIfPresent(t *testing.T) {
l := New[int]()
a := l.PushBack(1)
b := l.PushBack(2)

require.True(t, l.RemoveIfPresent(a))
require.Equal(t, 1, l.Len())
require.Equal(t, b, l.Front())

// already removed: no-op, no panic
require.False(t, l.RemoveIfPresent(a))
require.Equal(t, 1, l.Len())

// element of a different list: no-op
m := New[int]()
c := m.PushBack(3)
require.False(t, l.RemoveIfPresent(c))
require.Equal(t, 1, m.Len())
require.Equal(t, c, m.Front())

require.True(t, l.RemoveIfPresent(b))
require.Equal(t, 0, l.Len())
require.Nil(t, l.Front())
}

func TestRemoveIfPresentReinsert(t *testing.T) {
l := New[int]()
a := l.PushBack(1)

require.True(t, l.RemoveIfPresent(a))
l.PushBackValue(a)
require.Equal(t, 1, l.Len())
require.True(t, l.RemoveIfPresent(a))
require.Equal(t, 0, l.Len())
}
7 changes: 6 additions & 1 deletion go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ type ConnPool[C Connection] struct {
// was pushed, or -1 if no connection with a Setting has been opened in this pool
freshSettingsStack atomic.Int64
// wait is the list of clients waiting for a connection to be returned to the pool
wait waitlist[C]
// Held behind a pointer so that growing the waitlist struct cannot
// change ConnPool's allocation size: the 128-bit atomics in clean and
// settings must stay 16-byte aligned, and the Go allocator only
// provides that for certain object sizes (see the connStack docs).
wait *waitlist[C]

// borrowed is the number of connections that the pool has given out to clients
// and that haven't been returned yet
Expand Down Expand Up @@ -200,6 +204,7 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.wait = &waitlist[C]{}
pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
Expand Down
12 changes: 12 additions & 0 deletions go/pools/smartconnpool/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ import (

// connStack is a lock-free stack for Connection objects. It is safe to
// use from several goroutines.
//
// ALIGNMENT: the 128-bit atomic in this struct faults (SIGBUS) on amd64 and
// arm64 unless it is 16-byte aligned. Go has no supported way to demand
// 16-byte alignment (the maximum natural alignment is 8 bytes), so it
// depends entirely on where the allocator places the enclosing allocation,
// which varies with the allocation's exact size, pointer layout, and Go
// version — e.g. allocation headers (Go 1.22+) shift some pointer-bearing
// objects larger than 512 bytes to an odd 8-byte boundary. ConnPool
// currently lands in a bucket where allocations are 16-byte aligned; growing
// it can silently break that, which is why its waitlist is held behind a
// pointer. Be careful when adding fields to ConnPool or anything embedded
// in it.
type connStack[C Connection] struct {
// top is a pointer to the top node on the stack and to an increasing
// counter of pop operations, to prevent A-B-A races.
Expand Down
243 changes: 243 additions & 0 deletions go/pools/smartconnpool/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,249 @@ func runStressCloseDuringTrafficCycle(t *testing.T, cycle int) {
cycle, leaked, len(allConns), connectsAfterClose.Load())
}

// TestStressWaiterTimeoutStorm reproduces a production vttablet stall. The
// anatomy of the incident, taken from a goroutine dump of the stalled
// tablet:
//
// - the stream pool was at capacity and a large backlog of requests was
// queued on the waitlist; their deadlines expired while they waited.
// - an expired waiter cannot leave the waitlist instantly: it must
// reacquire the contended waitlist mutex to remove itself, so at any
// given moment the list held thousands of expired-but-still-listed
// waiters.
// - every returned connection was handed to the front-most (expired)
// waiter; the returner then sat blocked in the unbuffered channel send
// until its dead target crawled through the mutex queue to perform the
// fallback receive. The dead request then "completed" holding a
// connection it could not use, failed, and recycled it to the next
// expired waiter.
// - net effect: connections churned through dead requests while live
// requests starved; useful throughput was zero.
//
// The test recreates that state with real waiters and real returners. The
// waitlist mutex, held by the test, plays the role of the convoyed mutex:
// the returners are parked on it first, then the queued waiters' contexts
// are cancelled so they pile up behind. When the test releases the mutex,
// the returners scan a waitlist full of expired-but-listed waiters — the
// exact moment the production bug fired.
//
// The assertions are the customer-visible contract and do not depend on
// timing: no request whose context has already expired may be handed a
// connection (wastedHandoffs must stay zero), and every live request must
// be served. Each cycle is a fresh pool; the loop runs several cycles to
// surface scheduling-dependent races.
func TestStressWaiterTimeoutStorm(t *testing.T) {
const Cycles = 25

for cycle := range Cycles {
if !t.Run(fmt.Sprintf("cycle-%03d", cycle), func(t *testing.T) {
runStressWaiterTimeoutStormCycle(t, cycle)
}) {
return
}
}
}

func runStressWaiterTimeoutStormCycle(t *testing.T, cycle int) {
t.Helper()

const (
Capacity = 4
NumExpired = 64
NumLive = 8
GetsPerLive = 4
LiveTimeout = 30 * time.Second
Watchdog = 30 * time.Second
CloseTimeout = 30 * time.Second
)

var (
connsMu sync.Mutex
allConns []*StressConn
wastedHandoffs atomic.Int64
expiredTimedOut atomic.Int64
liveSuccesses atomic.Int64
)

connect := func(_ context.Context) (*StressConn, error) {
c := &StressConn{}
connsMu.Lock()
allConns = append(allConns, c)
connsMu.Unlock()
return c, nil
}
connCount := func() int {
connsMu.Lock()
defer connsMu.Unlock()
return len(allConns)
}

pool := NewPool[*StressConn](&Config[*StressConn]{
Capacity: Capacity,
}).Open(connect, nil)

// Hold every conn so all subsequent Gets queue on the waitlist.
var held []*Pooled[*StressConn]
for range Capacity {
conn, err := pool.Get(t.Context(), nil)
require.NoError(t, err)
held = append(held, conn)
}

var wg errgroup.Group

// The backlog: real requests whose deadline will expire while they are
// queued. Until the held conns are recycled nothing can be handed to
// them, so any successful Get here is a connection delivered to a
// request whose context had already expired — the production bug.
expiredCtx, cancelExpired := context.WithCancel(t.Context())
defer cancelExpired()
for range NumExpired {
wg.Go(func() error {
conn, err := pool.Get(expiredCtx, nil)
if err != nil {
expiredTimedOut.Add(1)
return nil
}
if expiredCtx.Err() != nil {
wastedHandoffs.Add(1)
}
// The production query path fails on the dead context and
// recycles; do the same so the connection churns onward.
conn.Recycle()
return nil
})
}

status := func() string {
return fmt.Sprintf("capacity=%d active=%d borrowed=%d open=%d isOpen=%v waiting=%d wastedHandoffs=%d expiredTimedOut=%d liveSuccesses=%d",
pool.Capacity(), pool.Active(), pool.InUse(), connCount(), pool.IsOpen(), pool.wait.waiting(), wastedHandoffs.Load(), expiredTimedOut.Load(), liveSuccesses.Load())
}

backlogQueued := assert.Eventuallyf(t, func() bool {
return pool.wait.waiting() == NumExpired
}, Watchdog, time.Millisecond, "cycle %d: backlog did not queue: %s", cycle, status())
if !backlogQueued {
cancelExpired()
for _, conn := range held {
conn.Recycle()
}
waitForStressTraffic(t, cycle, &wg, Watchdog, status)
require.FailNowf(t, "backlog did not queue", "cycle %d: %s", cycle, status())
}

// Live traffic queued behind the backlog, with deadlines generous
// enough to never expire during the test.
for i := range NumLive {
tid := int32(i + 1)
wg.Go(func() error {
for range GetsPerLive {
ctx, cancel := context.WithTimeout(t.Context(), LiveTimeout)
conn, err := pool.Get(ctx, nil)
cancel()
if err != nil {
return fmt.Errorf("cycle %d: live request starved by the expired backlog: %w", cycle, err)
}

previousOwner := conn.Conn.owner.Swap(tid)
if previousOwner != 0 {
return fmt.Errorf("cycle %d: conn handed out concurrently: %d still owned it when %d acquired", cycle, previousOwner, tid)
}
runtime.Gosched()
previousOwner = conn.Conn.owner.Swap(0)
if previousOwner != tid {
return fmt.Errorf("cycle %d: conn owner overwritten under us: expected %d, got %d", cycle, tid, previousOwner)
}
liveSuccesses.Add(1)
conn.Recycle()
}
return nil
})
}

liveQueued := assert.Eventuallyf(t, func() bool {
return pool.wait.waiting() == NumExpired+NumLive
}, Watchdog, time.Millisecond, "cycle %d: live traffic did not queue behind the backlog: %s", cycle, status())
if !liveQueued {
cancelExpired()
for _, conn := range held {
conn.Recycle()
}
waitForStressTraffic(t, cycle, &wg, Watchdog, status)
require.FailNowf(t, "live traffic did not queue behind the backlog", "cycle %d: %s", cycle, status())
}

// The convoy. With the waitlist mutex held by the test, park the
// returners on it, then expire the entire backlog so it piles up
// behind them. Releasing the mutex lets the returners scan first,
// while every backlog waiter is still expired-but-listed.
pool.wait.mu.Lock()

for _, conn := range held {
wg.Go(func() error {
conn.Recycle()
return nil
})
}

// Recycle decrements the borrowed count before reaching for the
// waitlist mutex, so once InUse hits zero every returner is parked on
// (or a few instructions away from) the mutex queue.
returnersParked := assert.Eventuallyf(t, func() bool {
return pool.InUse() == 0
}, Watchdog, time.Millisecond, "cycle %d: returners did not park on the waitlist mutex: %s", cycle, status())
if !returnersParked {
pool.wait.mu.Unlock()
cancelExpired()
waitForStressTraffic(t, cycle, &wg, Watchdog, status)
require.FailNowf(t, "returners did not park on the waitlist mutex", "cycle %d: %s", cycle, status())
}
for range 100 {
runtime.Gosched()
}

cancelExpired()
// Give the expired waiters a moment to wake and pile up on the mutex.
// This only biases the interleaving towards the production one; the
// assertions below hold for every interleaving.
for range 100 {
runtime.Gosched()
}

pool.wait.mu.Unlock()

// Everything settles on its own: the backlog drains (timing out or —
// on broken code — receiving wasted handoffs), then the live traffic
// is served.
waitForStressTraffic(t, cycle, &wg, Watchdog, status)

require.Zerof(t, wastedHandoffs.Load(),
"cycle %d: connections were handed to requests whose context had already expired: %s", cycle, status())
require.EqualValuesf(t, NumLive*GetsPerLive, liveSuccesses.Load(),
"cycle %d: live traffic was not fully served: %s", cycle, status())

closeCtx, cancelClose := context.WithTimeout(t.Context(), CloseTimeout)
closeErr := pool.CloseWithContext(closeCtx)
cancelClose()
require.NoErrorf(t, closeErr, "cycle %d: CloseWithContext failed: %s", cycle, status())

require.EqualValuesf(t, 0, pool.Active(), "cycle %d: active should be 0 after Close", cycle)
require.EqualValuesf(t, 0, pool.InUse(), "cycle %d: borrowed should be 0 after Close", cycle)

finalStatus := status()
connsMu.Lock()
defer connsMu.Unlock()

var leaked int
for _, c := range allConns {
if !c.IsClosed() {
leaked++
}
}
require.Equalf(t, 0, leaked, "cycle %d: leaked %d connections out of %d ever opened; %s",
cycle, leaked, len(allConns), finalStatus)
}

func waitForStressTraffic(t *testing.T, cycle int, wg *errgroup.Group, timeout time.Duration, status func() string) {
t.Helper()

Expand Down
Loading
Loading