Skip to content

Commit f02f40f

Browse files
tneelydjc
authored andcommitted
Fix leaky connections
It's possible to trigger more approvals than are necessary, in turn grabbing more connections than we need. This happens when we drop a connection. The drop produces a notify, which doesn't get used until the pool is empty. The first `Pool::get()` call on an empty pool will spawn an connect task, immediately complete `notify.notified().await`, then spawn a second connect task. Both will connect and we'll end up with 1 more connection than we need. Rather than address the notify issue directly, this fix introduces some bookkeeping that tracks the number of open `pool.get()` requests we have waiting on connections. If the number of pending connections >= the number of pending gets, we will not spawn any additional connect tasks.
1 parent 8210b0e commit f02f40f

File tree

3 files changed

+79
-12
lines changed

3 files changed

+79
-12
lines changed

bb8/src/inner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ where
9191
let mut wait_time_start = None;
9292

9393
let future = async {
94+
let getting = self.inner.start_get();
9495
loop {
95-
let (conn, approvals) = self.inner.pop();
96+
let (conn, approvals) = getting.get();
9697
self.spawn_replenishing_approvals(approvals);
9798

9899
// Cancellation safety: make sure to wrap the connection in a `PooledConnection`

bb8/src/internals.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,6 @@ where
3636
}
3737
}
3838

39-
pub(crate) fn pop(&self) -> (Option<Conn<M::Connection>>, ApprovalIter) {
40-
let mut locked = self.internals.lock();
41-
let conn = locked.conns.pop_front().map(|idle| idle.conn);
42-
let approvals = match &conn {
43-
Some(_) => locked.wanted(&self.statics),
44-
None => locked.approvals(&self.statics, 1),
45-
};
46-
47-
(conn, approvals)
48-
}
49-
5039
pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
5140
let mut locked = self.internals.lock();
5241
let mut approvals = locked.approvals(&self.statics, 1);
@@ -67,6 +56,10 @@ where
6756
iter
6857
}
6958

59+
pub(crate) fn start_get(self: &Arc<Self>) -> Getting<M> {
60+
Getting::new(self.clone())
61+
}
62+
7063
pub(crate) fn forward_error(&self, err: M::Error) {
7164
self.statics.error_sink.sink(err);
7265
}
@@ -81,6 +74,7 @@ where
8174
conns: VecDeque<IdleConn<M::Connection>>,
8275
num_conns: u32,
8376
pending_conns: u32,
77+
in_flight: u32,
8478
}
8579

8680
impl<M> PoolInternals<M>
@@ -202,6 +196,7 @@ where
202196
conns: VecDeque::new(),
203197
num_conns: 0,
204198
pending_conns: 0,
199+
in_flight: 0,
205200
}
206201
}
207202
}
@@ -236,6 +231,43 @@ pub(crate) struct Approval {
236231
_priv: (),
237232
}
238233

234+
pub(crate) struct Getting<M: ManageConnection + Send> {
235+
inner: Arc<SharedPool<M>>,
236+
}
237+
238+
impl<M: ManageConnection + Send> Getting<M> {
239+
pub(crate) fn get(&self) -> (Option<Conn<M::Connection>>, ApprovalIter) {
240+
let mut locked = self.inner.internals.lock();
241+
if let Some(IdleConn { conn, .. }) = locked.conns.pop_front() {
242+
return (Some(conn), locked.wanted(&self.inner.statics));
243+
}
244+
245+
let approvals = match locked.in_flight > locked.pending_conns {
246+
true => 1,
247+
false => 0,
248+
};
249+
250+
(None, locked.approvals(&self.inner.statics, approvals))
251+
}
252+
}
253+
254+
impl<M: ManageConnection + Send> Getting<M> {
255+
fn new(inner: Arc<SharedPool<M>>) -> Self {
256+
{
257+
let mut locked = inner.internals.lock();
258+
locked.in_flight += 1;
259+
}
260+
Getting { inner }
261+
}
262+
}
263+
264+
impl<M: ManageConnection + Send> Drop for Getting<M> {
265+
fn drop(&mut self) {
266+
let mut locked = self.inner.internals.lock();
267+
locked.in_flight -= 1;
268+
}
269+
}
270+
239271
#[derive(Debug)]
240272
pub(crate) struct Conn<C>
241273
where

bb8/tests/test.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,3 +1103,37 @@ async fn test_add_checks_broken_connections() {
11031103
let res = pool.add(conn);
11041104
assert!(matches!(res, Err(AddError::Broken(_))));
11051105
}
1106+
1107+
#[tokio::test]
1108+
async fn test_reuse_on_drop() {
1109+
let pool = Pool::builder()
1110+
.min_idle(0)
1111+
.max_size(100)
1112+
.queue_strategy(QueueStrategy::Lifo)
1113+
.build(OkManager::<FakeConnection>::new())
1114+
.await
1115+
.unwrap();
1116+
1117+
// The first get should
1118+
// 1) see nothing in the pool,
1119+
// 2) spawn a single replenishing approval,
1120+
// 3) get notified of the new connection and grab it from the pool
1121+
let conn_0 = pool.get().await.expect("should connect");
1122+
// Dropping the connection queues up a notify
1123+
drop(conn_0);
1124+
1125+
// The second get should
1126+
// 1) see the first connection in the pool and grab it
1127+
let _conn_1 = pool.get().await.expect("should connect");
1128+
1129+
// The third get will
1130+
// 1) see nothing in the pool,
1131+
// 2) spawn a single replenishing approval,
1132+
// 3) get notified of the new connection,
1133+
// 4) see nothing in the pool,
1134+
// 5) _not_ spawn a single replenishing approval,
1135+
// 6) get notified of the new connection and grab it from the pool
1136+
let _conn_2 = pool.get().await.expect("should connect");
1137+
1138+
assert_eq!(pool.state().connections, 2);
1139+
}

0 commit comments

Comments
 (0)