Skip to content

Commit d686842

Browse files
committed
Refactor ConnectionPoolImpl for enhanced lifecycle and pooling operations. Introduce PooledConnection for better connection management and thread safety. Expand test coverage with advanced pooling scenarios.
1 parent c0062e6 commit d686842

File tree

5 files changed

+451
-168
lines changed

5 files changed

+451
-168
lines changed

sqlx4k/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/impl/pool/ConnectionPoolImpl.kt

Lines changed: 72 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,20 @@ import io.github.smyrgeorge.sqlx4k.ConnectionPool
77
import io.github.smyrgeorge.sqlx4k.SQLError
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.Channel
10-
import kotlinx.coroutines.sync.Mutex
1110
import kotlinx.coroutines.sync.Semaphore
12-
import kotlinx.coroutines.sync.withLock
1311
import kotlin.concurrent.atomics.*
1412
import kotlin.time.Duration.Companion.seconds
15-
import kotlin.time.TimeMark
16-
import kotlin.time.TimeSource
1713

1814
class ConnectionPoolImpl(
19-
private val options: ConnectionPool.Options,
15+
val options: ConnectionPool.Options,
2016
private val connectionFactory: suspend () -> Connection,
2117
) : ConnectionPool {
22-
private var closed = AtomicBoolean(false)
23-
private val idleCount = AtomicInt(0)
24-
private val totalConnections = AtomicInt(0)
18+
internal var closed = AtomicBoolean(false)
19+
internal val idleCount = AtomicInt(0)
20+
internal val totalConnections = AtomicInt(0)
21+
internal val semaphore = Semaphore(options.maxConnections)
2522

2623
private val cleanupJob: Job
27-
private val semaphore = Semaphore(options.maxConnections)
2824
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
2925
private val idleConnections = Channel<PooledConnection>(options.maxConnections)
3026

@@ -45,8 +41,8 @@ class ConnectionPoolImpl(
4541
when (error) {
4642
is TimeoutCancellationException -> {
4743
SQLError(
48-
SQLError.Code.PoolTimedOut,
49-
"Timed out waiting for connection after ${options.acquireTimeout}"
44+
code = SQLError.Code.PoolTimedOut,
45+
message = "Timed out waiting for connection after ${options.acquireTimeout}"
5046
).ex()
5147
}
5248

@@ -81,12 +77,6 @@ class ConnectionPoolImpl(
8177
val newConnection = connectionFactory()
8278
val pooled = PooledConnection(newConnection, this)
8379
totalConnections.incrementAndFetch()
84-
85-
// Double-check pool wasn't closed during connection creation
86-
if (closed.load()) {
87-
pooled.close()
88-
SQLError(SQLError.Code.PoolClosed, "Connection pool is closed").ex()
89-
}
9080
return pooled.acquire()
9181
} catch (e: Exception) {
9282
// Release semaphore if connection creation failed
@@ -122,30 +112,23 @@ class ConnectionPoolImpl(
122112
}
123113

124114
override suspend fun close(): Result<Unit> = runCatching {
125-
if (closed.load()) return@runCatching
126-
closed.store(true)
115+
if (!closed.compareAndSet(expectedValue = false, newValue = true)) {
116+
return@runCatching
117+
}
127118

128-
// Cancel cleanup job
129119
cleanupJob.cancel()
130-
131-
// Close the idle channel to wake any waiters and prevent further sends
132120
idleConnections.close()
133-
134-
// Close all idle connections
135121
while (true) {
136122
val pooled = idleConnections.tryReceive().getOrNull() ?: break
137-
// Adjust idle counter for each drained connection
138123
idleCount.decrementAndFetch()
139124
pooled.close()
140125
}
141126
// Ensure idleCount cannot go negative due to races; clamp to 0
142127
idleCount.update { if (it < 0) 0 else it }
143-
144-
// Cancel scope
145128
scope.cancel()
146129
}
147130

148-
private suspend fun sendToIdle(connection: PooledConnection): Boolean {
131+
internal suspend fun sendToIdle(connection: PooledConnection): Boolean {
149132
return try {
150133
idleConnections.send(connection)
151134
idleCount.incrementAndFetch()
@@ -162,34 +145,43 @@ class ConnectionPoolImpl(
162145
}
163146

164147
private suspend fun warmup(minConnections: Int) {
148+
if (closed.load()) return
149+
165150
repeat(minConnections) {
166-
var semaphoreAcquired = false
167-
var connectionCreated = false
168-
var pooledConnection: PooledConnection? = null
151+
if (closed.load()) return
152+
153+
var acquired = false
154+
var pooled: PooledConnection? = null
169155

170156
try {
171157
semaphore.acquire()
172-
semaphoreAcquired = true
173-
pooledConnection = PooledConnection(connectionFactory(), this)
174-
connectionCreated = true
158+
acquired = true
175159

160+
// Check again after potentially blocking on semaphore
161+
if (closed.load()) {
162+
semaphore.release()
163+
return
164+
}
165+
166+
pooled = PooledConnection(connectionFactory(), this)
176167
totalConnections.incrementAndFetch()
177168

178-
// Use atomic send operation - if it fails, rollback
179-
if (!sendToIdle(pooledConnection)) {
169+
if (!sendToIdle(pooled)) {
180170
// Send failed (pool closing), close the connection
181-
pooledConnection.close()
171+
pooled.close()
172+
// No point continuing warmup if pool is closing
173+
return
182174
}
183175
} catch (_: Exception) {
184176
// Clean up based on what succeeded
185-
if (connectionCreated && pooledConnection != null) {
177+
if (pooled != null) {
186178
// Connection was created but something failed, close it
187179
try {
188-
pooledConnection.close()
180+
pooled.close()
189181
} catch (_: Exception) {
190182
// Ignore errors during cleanup
191183
}
192-
} else if (semaphoreAcquired) {
184+
} else if (acquired) {
193185
// Only semaphore was acquired, release it
194186
semaphore.release()
195187
}
@@ -199,27 +191,51 @@ class ConnectionPoolImpl(
199191
}
200192

201193
private suspend fun cleanup() {
202-
val temp = mutableListOf<PooledConnection>()
194+
// Early exit if pool is closing/closed
195+
if (closed.load()) return
196+
203197
val minConnections = options.minConnections ?: 0
198+
var processedCount = 0
204199

205-
// Poll all idle connections using atomic wrapper
206-
while (true) {
200+
// Process connections incrementally without draining all at once
201+
// This allows other coroutines to acquire connections during cleanup
202+
val maxBatchSize: Int = options.maxConnections / 2 // Limit how many we process in one cleanup cycle
203+
204+
while (processedCount < maxBatchSize) {
207205
val pooled = tryReceiveFromIdle() ?: break
208-
temp.add(pooled)
209-
}
206+
processedCount++
210207

211-
// Filter and return valid connections, close expired ones
212-
for (pooled in temp) {
213-
if (pooled.isExpired()) {
214-
// Try to close if above minimum (atomically checked)
215-
val wasClosed = pooled.closeIfAboveMinimum(minConnections)
216-
if (!wasClosed) {
217-
// Couldn't close because we're at or below minimum, keep the connection
218-
sendToIdle(pooled)
208+
try {
209+
if (pooled.isExpired()) {
210+
// Try to close if above minimum (atomically checked)
211+
val wasClosed = pooled.closeIfAboveMinimum(minConnections)
212+
if (!wasClosed) {
213+
// At or below minimum - keep the expired connection
214+
// It will be used until a new connection can replace it
215+
if (!sendToIdle(pooled)) {
216+
// Pool is closing, cleanup the connection
217+
pooled.close()
218+
}
219+
}
220+
} else {
221+
// Connection still valid, return it
222+
if (!sendToIdle(pooled)) {
223+
// Pool is closing, cleanup the connection
224+
pooled.close()
225+
}
219226
}
220-
} else {
221-
// Connection still valid, keep it
222-
sendToIdle(pooled)
227+
} catch (_: Exception) {
228+
// If anything fails, try to close the connection to prevent leaks
229+
try {
230+
pooled.close()
231+
} catch (_: Exception) {
232+
// Ignore errors during emergency cleanup
233+
}
234+
}
235+
236+
// Yield periodically to prevent CPU spinning
237+
if (processedCount % 10 == 0) {
238+
yield()
223239
}
224240
}
225241
}
@@ -230,122 +246,14 @@ class ConnectionPoolImpl(
230246
delay(CLEANUP_INTERVAL)
231247
cleanup()
232248
} catch (_: CancellationException) {
233-
// Job was cancelled, exit loop
234249
break
235250
} catch (_: Exception) {
236251
// TODO: Log error but continue cleanup loop
237252
}
238253
}
239254
}
240255

241-
private class PooledConnection(
242-
private val connection: Connection,
243-
private val pool: ConnectionPoolImpl
244-
) : Connection by connection {
245-
private val mutex = Mutex()
246-
private val createdAt: TimeMark = TIME_SOURCE.markNow()
247-
private var lastUsedAt: TimeMark = createdAt
248-
override var status: Connection.Status = connection.status // TODO: Check if this is correct
249-
private val released get() = status == Connection.Status.Released
250-
251-
suspend fun acquire(): Connection {
252-
mutex.withLock {
253-
if (pool.closed.load()) {
254-
close()
255-
SQLError(SQLError.Code.PoolClosed, "Connection pool is closed").ex()
256-
}
257-
258-
status = Connection.Status.Acquired
259-
lastUsedAt = TIME_SOURCE.markNow()
260-
}
261-
return this
262-
}
263-
264-
override suspend fun release(): Result<Unit> = runCatching {
265-
mutex.withLock {
266-
if (released) return@runCatching
267-
status = Connection.Status.Released
268-
}
269-
270-
// Read closed flag under lock, but perform actions outside to avoid deadlocks
271-
if (pool.closed.load()) {
272-
// Pool is closed, close the connection outside of the mutex
273-
close()
274-
return@runCatching
275-
}
276-
277-
lastUsedAt = TIME_SOURCE.markNow()
278-
279-
// Check if connection is expired
280-
if (isExpired()) {
281-
// Connection is expired, try to close it only if we're above minimum
282-
val minConnections = pool.options.minConnections ?: 0
283-
val wasClosed = closeIfAboveMinimum(minConnections)
284-
if (!wasClosed) {
285-
// At or below minimum, keep the expired connection (cleanup will replace later)
286-
// Enqueue back to idle using suspending send to properly wake waiters
287-
if (!pool.sendToIdle(this)) {
288-
// Pool closing/closed; ensure the connection is closed
289-
close()
290-
}
291-
}
292-
} else {
293-
// Connection is still valid, return to idle pool using suspending send
294-
if (!pool.sendToIdle(this)) {
295-
// Pool closing/closed; ensure the connection is closed
296-
close()
297-
}
298-
}
299-
}
300-
301-
fun isExpired(): Boolean {
302-
pool.options.maxLifetime?.let { maxLifetime -> if (createdAt.elapsedNow() >= maxLifetime) return true }
303-
pool.options.idleTimeout?.let { idleTimeout -> if (lastUsedAt.elapsedNow() >= idleTimeout) return true }
304-
return false
305-
}
306-
307-
suspend fun close() {
308-
try {
309-
connection.release().getOrThrow()
310-
} catch (_: Exception) {
311-
// Ignore errors on close
312-
} finally {
313-
pool.totalConnections.decrementAndFetch()
314-
pool.semaphore.release()
315-
}
316-
}
317-
318-
suspend fun closeIfAboveMinimum(minConnections: Int): Boolean {
319-
// Atomically check and decide whether to close
320-
var shouldClose = false
321-
pool.totalConnections.update {
322-
if (it > minConnections) {
323-
// We're above minimum, mark for closing by decrementing now
324-
shouldClose = true
325-
it - 1
326-
} else {
327-
shouldClose = false
328-
it
329-
}
330-
}
331-
332-
if (shouldClose) {
333-
// Close the actual connection outside the lock
334-
try {
335-
connection.release().getOrThrow()
336-
} catch (_: Exception) {
337-
// Ignore errors on close
338-
} finally {
339-
pool.semaphore.release()
340-
}
341-
}
342-
343-
return shouldClose
344-
}
345-
}
346-
347256
companion object {
348257
private val CLEANUP_INTERVAL = 2.seconds
349-
private val TIME_SOURCE = TimeSource.Monotonic
350258
}
351259
}

0 commit comments

Comments
 (0)