@@ -20,12 +20,12 @@ type ConnectionKey struct {
2020
2121// ConnectionMetadata tracks information about a pooled connection
2222type ConnectionMetadata struct {
23- Connection * mcp.Connection
24- CreatedAt time.Time
25- LastUsedAt time.Time
23+ Connection * mcp.Connection
24+ CreatedAt time.Time
25+ LastUsedAt time.Time
2626 RequestCount int
27- ErrorCount int
28- State ConnectionState
27+ ErrorCount int
28+ State ConnectionState
2929}
3030
3131// ConnectionState represents the state of a pooled connection
@@ -39,21 +39,22 @@ const (
3939
4040// Default configuration values
4141const (
42- DefaultIdleTimeout = 30 * time .Minute
42+ DefaultIdleTimeout = 30 * time .Minute
4343 DefaultCleanupInterval = 5 * time .Minute
4444 DefaultMaxErrorCount = 10
4545)
4646
4747// SessionConnectionPool manages connections keyed by (backend, session)
4848type SessionConnectionPool struct {
49- connections map [ConnectionKey ]* ConnectionMetadata
50- mu sync.RWMutex
51- ctx context.Context
52- idleTimeout time.Duration
53- cleanupInterval time.Duration
54- maxErrorCount int
55- cleanupTicker * time.Ticker
56- cleanupDone chan bool
49+ connections map [ConnectionKey ]* ConnectionMetadata
50+ mu sync.RWMutex
51+ ctx context.Context
52+ cancel context.CancelFunc // cancel function to stop cleanup goroutine
53+ idleTimeout time.Duration
54+ cleanupInterval time.Duration
55+ maxErrorCount int
56+ cleanupTicker * time.Ticker
57+ cleanupDone chan bool
5758}
5859
5960// PoolConfig configures the connection pool
@@ -76,26 +77,31 @@ func NewSessionConnectionPool(ctx context.Context) *SessionConnectionPool {
7677func NewSessionConnectionPoolWithConfig (ctx context.Context , config PoolConfig ) * SessionConnectionPool {
7778 logPool .Printf ("Creating new session connection pool: idleTimeout=%v, cleanupInterval=%v, maxErrors=%d" ,
7879 config .IdleTimeout , config .CleanupInterval , config .MaxErrorCount )
79-
80+
81+ // Create a cancellable context derived from the parent context
82+ // This allows Stop() to signal the cleanup goroutine to exit
83+ poolCtx , cancel := context .WithCancel (ctx )
84+
8085 pool := & SessionConnectionPool {
8186 connections : make (map [ConnectionKey ]* ConnectionMetadata ),
82- ctx : ctx ,
87+ ctx : poolCtx ,
88+ cancel : cancel ,
8389 idleTimeout : config .IdleTimeout ,
8490 cleanupInterval : config .CleanupInterval ,
8591 maxErrorCount : config .MaxErrorCount ,
8692 cleanupDone : make (chan bool ),
8793 }
88-
94+
8995 // Start cleanup goroutine
9096 pool .startCleanup ()
91-
97+
9298 return pool
9399}
94100
95101// startCleanup starts the periodic cleanup goroutine
96102func (p * SessionConnectionPool ) startCleanup () {
97103 p .cleanupTicker = time .NewTicker (p .cleanupInterval )
98-
104+
99105 go func () {
100106 logPool .Print ("Cleanup goroutine started" )
101107 for {
@@ -116,48 +122,48 @@ func (p *SessionConnectionPool) startCleanup() {
116122func (p * SessionConnectionPool ) cleanupIdleConnections () {
117123 p .mu .Lock ()
118124 defer p .mu .Unlock ()
119-
125+
120126 now := time .Now ()
121127 removed := 0
122-
128+
123129 for key , metadata := range p .connections {
124130 shouldRemove := false
125131 reason := ""
126-
132+
127133 // Check if idle for too long
128134 if now .Sub (metadata .LastUsedAt ) > p .idleTimeout {
129135 shouldRemove = true
130136 reason = "idle timeout"
131137 }
132-
138+
133139 // Check if too many errors
134140 if metadata .ErrorCount >= p .maxErrorCount {
135141 shouldRemove = true
136142 reason = "too many errors"
137143 }
138-
144+
139145 // Check if already closed
140146 if metadata .State == ConnectionStateClosed {
141147 shouldRemove = true
142148 reason = "already closed"
143149 }
144-
150+
145151 if shouldRemove {
146152 logPool .Printf ("Cleaning up connection: backend=%s, session=%s, reason=%s, idle=%v, errors=%d" ,
147153 key .BackendID , key .SessionID , reason , now .Sub (metadata .LastUsedAt ), metadata .ErrorCount )
148-
154+
149155 // Close the connection if still active
150156 if metadata .Connection != nil && metadata .State != ConnectionStateClosed {
151157 // Note: mcp.Connection doesn't have a Close method in current implementation
152158 // but we mark it as closed
153159 metadata .State = ConnectionStateClosed
154160 }
155-
161+
156162 delete (p .connections , key )
157163 removed ++
158164 }
159165 }
160-
166+
161167 if removed > 0 {
162168 logPool .Printf ("Cleanup complete: removed %d idle/failed connections, active=%d" , removed , len (p .connections ))
163169 }
@@ -166,30 +172,35 @@ func (p *SessionConnectionPool) cleanupIdleConnections() {
166172// Stop gracefully shuts down the connection pool
167173func (p * SessionConnectionPool ) Stop () {
168174 logPool .Print ("Stopping connection pool" )
169-
170- // Stop cleanup goroutine
175+
176+ // Stop cleanup goroutine by cancelling the context
177+ if p .cancel != nil {
178+ p .cancel ()
179+ }
180+
181+ // Stop cleanup ticker
171182 if p .cleanupTicker != nil {
172183 p .cleanupTicker .Stop ()
173184 }
174-
175- // Wait for cleanup to finish
185+
186+ // Wait for cleanup goroutine to finish (should be immediate now that context is cancelled)
176187 select {
177188 case <- p .cleanupDone :
178189 logPool .Print ("Cleanup goroutine stopped" )
179- case <- time .After (5 * time .Second ):
190+ case <- time .After (1 * time .Second ):
180191 logPool .Print ("Cleanup goroutine stop timeout" )
181192 }
182-
193+
183194 // Close all connections
184195 p .mu .Lock ()
185196 defer p .mu .Unlock ()
186-
197+
187198 for key , metadata := range p .connections {
188199 logPool .Printf ("Closing connection: backend=%s, session=%s" , key .BackendID , key .SessionID )
189200 metadata .State = ConnectionStateClosed
190201 delete (p .connections , key )
191202 }
192-
203+
193204 logPool .Print ("Connection pool stopped" )
194205}
195206
@@ -200,7 +211,7 @@ func (p *SessionConnectionPool) Get(backendID, sessionID string) (*mcp.Connectio
200211
201212 key := ConnectionKey {BackendID : backendID , SessionID : sessionID }
202213 metadata , exists := p .connections [key ]
203-
214+
204215 if ! exists {
205216 logPool .Printf ("Connection not found: backend=%s, session=%s" , backendID , sessionID )
206217 return nil , false
@@ -211,9 +222,9 @@ func (p *SessionConnectionPool) Get(backendID, sessionID string) (*mcp.Connectio
211222 return nil , false
212223 }
213224
214- logPool .Printf ("Reusing connection: backend=%s, session=%s, requests=%d" ,
225+ logPool .Printf ("Reusing connection: backend=%s, session=%s, requests=%d" ,
215226 backendID , sessionID , metadata .RequestCount )
216-
227+
217228 // Update last used time and state (need write lock for this)
218229 p .mu .RUnlock ()
219230 p .mu .Lock ()
@@ -232,7 +243,7 @@ func (p *SessionConnectionPool) Set(backendID, sessionID string, conn *mcp.Conne
232243 defer p .mu .Unlock ()
233244
234245 key := ConnectionKey {BackendID : backendID , SessionID : sessionID }
235-
246+
236247 // Check if connection already exists
237248 if existing , exists := p .connections [key ]; exists {
238249 logPool .Printf ("Updating existing connection: backend=%s, session=%s" , backendID , sessionID )
@@ -262,7 +273,7 @@ func (p *SessionConnectionPool) Delete(backendID, sessionID string) {
262273 defer p .mu .Unlock ()
263274
264275 key := ConnectionKey {BackendID : backendID , SessionID : sessionID }
265-
276+
266277 if metadata , exists := p .connections [key ]; exists {
267278 metadata .State = ConnectionStateClosed
268279 delete (p .connections , key )
@@ -295,7 +306,7 @@ func (p *SessionConnectionPool) RecordError(backendID, sessionID string) {
295306 key := ConnectionKey {BackendID : backendID , SessionID : sessionID }
296307 if metadata , exists := p .connections [key ]; exists {
297308 metadata .ErrorCount ++
298- logPool .Printf ("Recorded error for connection: backend=%s, session=%s, errors=%d" ,
309+ logPool .Printf ("Recorded error for connection: backend=%s, session=%s, errors=%d" ,
299310 backendID , sessionID , metadata .ErrorCount )
300311 }
301312}
0 commit comments