Skip to content

Commit 1a96717

Browse files
committed
feat: allow specifying the tokio runtime to use for the rpc tasks
1 parent d87f671 commit 1a96717

File tree

3 files changed

+19
-2
lines changed

3 files changed

+19
-2
lines changed

src/net_protocol.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl Default for GcState {
5555
#[derive(Debug)]
5656
pub struct Blobs<S> {
5757
rt: LocalPoolHandle,
58+
pub(crate) tokio_rt: tokio::runtime::Handle,
5859
store: S,
5960
events: EventSender,
6061
downloader: Downloader,
@@ -125,6 +126,7 @@ pub struct Builder<S> {
125126
store: S,
126127
events: Option<EventSender>,
127128
gc_config: Option<crate::store::GcConfig>,
129+
tokio_rt: Option<tokio::runtime::Handle>,
128130
}
129131

130132
impl<S: crate::store::Store> Builder<S> {
@@ -139,6 +141,12 @@ impl<S: crate::store::Store> Builder<S> {
139141
self
140142
}
141143

144+
/// Set the tokio runtime handle to use for the rpc handler
145+
pub fn tokio_rt(mut self, value: tokio::runtime::Handle) -> Self {
146+
self.tokio_rt = Some(value);
147+
self
148+
}
149+
142150
/// Build the Blobs protocol handler.
143151
/// You need to provide a local pool handle and an endpoint.
144152
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs<S>> {
@@ -149,6 +157,8 @@ impl<S: crate::store::Store> Builder<S> {
149157
self.events.unwrap_or_default(),
150158
downloader,
151159
endpoint.clone(),
160+
self.tokio_rt
161+
.unwrap_or_else(|| tokio::runtime::Handle::current()),
152162
))
153163
}
154164
}
@@ -160,6 +170,7 @@ impl<S> Blobs<S> {
160170
store,
161171
events: None,
162172
gc_config: None,
173+
tokio_rt: None,
163174
}
164175
}
165176
}
@@ -187,9 +198,11 @@ impl<S: crate::store::Store> Blobs<S> {
187198
events: EventSender,
188199
downloader: Downloader,
189200
endpoint: Endpoint,
201+
tokio_rt: tokio::runtime::Handle,
190202
) -> Self {
191203
Self {
192204
rt,
205+
tokio_rt,
193206
store,
194207
events,
195208
downloader,

src/rpc.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -901,8 +901,11 @@ impl RpcHandler {
901901
let (listener, connector) = quic_rpc::transport::flume::channel(1);
902902
let listener = RpcServer::new(listener);
903903
let client = RpcClient::new(connector);
904-
let _handler = listener
905-
.spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan));
904+
let tokio_rt = blobs.tokio_rt.clone();
905+
let handler = tokio_rt.spawn(
906+
listener.accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)),
907+
);
908+
let _handler = AbortOnDropHandle::new(handler);
906909
Self { client, _handler }
907910
}
908911
}

src/rpc/client/blobs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,7 @@ mod tests {
10741074
events,
10751075
downloader,
10761076
endpoint.clone(),
1077+
tokio::runtime::Handle::current(),
10771078
));
10781079
router = router.accept(crate::ALPN, blobs.clone());
10791080

0 commit comments

Comments
 (0)