Skip to content

Commit e851bcc

Browse files
committed
Make blobs more cheaply cloneable by by giving it an Inner
1 parent dba7850 commit e851bcc

File tree

1 file changed

+42
-31
lines changed

1 file changed

+42
-31
lines changed

src/net_protocol.rs

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,21 @@ impl Default for GcState {
4747
}
4848
}
4949

50-
#[derive(Debug, Clone)]
51-
pub struct Blobs<S> {
50+
#[derive(Debug)]
51+
struct BlobsInner<S> {
5252
rt: LocalPoolHandle,
5353
pub(crate) store: S,
5454
events: EventSender,
5555
downloader: Downloader,
56-
#[cfg(feature = "rpc")]
57-
batches: Arc<tokio::sync::Mutex<BlobBatches>>,
5856
endpoint: Endpoint,
59-
gc_state: Arc<std::sync::Mutex<GcState>>,
57+
gc_state: std::sync::Mutex<GcState>,
58+
#[cfg(feature = "rpc")]
59+
batches: tokio::sync::Mutex<BlobBatches>,
60+
}
61+
62+
#[derive(Debug, Clone)]
63+
pub struct Blobs<S> {
64+
inner: Arc<BlobsInner<S>>,
6065
#[cfg(feature = "rpc")]
6166
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
6267
}
@@ -178,40 +183,46 @@ impl<S: crate::store::Store> Blobs<S> {
178183
endpoint: Endpoint,
179184
) -> Self {
180185
Self {
181-
rt,
182-
store,
183-
events,
184-
downloader,
185-
endpoint,
186-
#[cfg(feature = "rpc")]
187-
batches: Default::default(),
188-
gc_state: Default::default(),
186+
inner: Arc::new(BlobsInner {
187+
rt,
188+
store,
189+
events,
190+
downloader,
191+
endpoint,
192+
#[cfg(feature = "rpc")]
193+
batches: Default::default(),
194+
gc_state: Default::default(),
195+
}),
189196
#[cfg(feature = "rpc")]
190197
rpc_handler: Default::default(),
191198
}
192199
}
193200

194201
pub fn store(&self) -> &S {
195-
&self.store
202+
&self.inner.store
203+
}
204+
205+
pub fn events(&self) -> &EventSender {
206+
&self.inner.events
196207
}
197208

198209
pub fn rt(&self) -> &LocalPoolHandle {
199-
&self.rt
210+
&self.inner.rt
200211
}
201212

202213
pub fn downloader(&self) -> &Downloader {
203-
&self.downloader
214+
&self.inner.downloader
204215
}
205216

206217
pub fn endpoint(&self) -> &Endpoint {
207-
&self.endpoint
218+
&self.inner.endpoint
208219
}
209220

210221
/// Add a callback that will be called before the garbage collector runs.
211222
///
212223
/// This can only be called before the garbage collector has started, otherwise it will return an error.
213224
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
214-
let mut state = self.gc_state.lock().unwrap();
225+
let mut state = self.inner.gc_state.lock().unwrap();
215226
match &mut *state {
216227
GcState::Initial(cbs) => {
217228
cbs.push(cb);
@@ -225,7 +236,7 @@ impl<S: crate::store::Store> Blobs<S> {
225236

226237
/// Start garbage collection with the given settings.
227238
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
228-
let mut state = self.gc_state.lock().unwrap();
239+
let mut state = self.inner.gc_state.lock().unwrap();
229240
let protected = match state.deref_mut() {
230241
GcState::Initial(items) => std::mem::take(items),
231242
GcState::Started(_) => bail!("gc already started"),
@@ -241,17 +252,17 @@ impl<S: crate::store::Store> Blobs<S> {
241252
set
242253
}
243254
};
244-
let store = self.store.clone();
255+
let store = self.store().clone();
245256
let run = self
246-
.rt
257+
.rt()
247258
.spawn(move || async move { store.gc_run(config, protected_cb).await });
248259
*state = GcState::Started(Some(run));
249260
Ok(())
250261
}
251262

252263
#[cfg(feature = "rpc")]
253264
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
254-
self.batches.lock().await
265+
self.inner.batches.lock().await
255266
}
256267

257268
pub(crate) async fn download(
@@ -268,7 +279,7 @@ impl<S: crate::store::Store> Blobs<S> {
268279
mode,
269280
} = req;
270281
let hash_and_format = HashAndFormat { hash, format };
271-
let temp_tag = self.store.temp_tag(hash_and_format);
282+
let temp_tag = self.store().temp_tag(hash_and_format);
272283
let stats = match mode {
273284
DownloadMode::Queued => {
274285
self.download_queued(endpoint, hash_and_format, nodes, progress.clone())
@@ -283,10 +294,10 @@ impl<S: crate::store::Store> Blobs<S> {
283294
progress.send(DownloadProgress::AllDone(stats)).await.ok();
284295
match tag {
285296
SetTagOption::Named(tag) => {
286-
self.store.set_tag(tag, Some(hash_and_format)).await?;
297+
self.store().set_tag(tag, Some(hash_and_format)).await?;
287298
}
288299
SetTagOption::Auto => {
289-
self.store.create_tag(hash_and_format).await?;
300+
self.store().create_tag(hash_and_format).await?;
290301
}
291302
}
292303
drop(temp_tag);
@@ -316,7 +327,7 @@ impl<S: crate::store::Store> Blobs<S> {
316327
let can_download = !node_ids.is_empty() && (any_added || endpoint.discovery().is_some());
317328
anyhow::ensure!(can_download, "no way to reach a node for download");
318329
let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress);
319-
let handle = self.downloader.queue(req).await;
330+
let handle = self.downloader().queue(req).await;
320331
let stats = handle.await?;
321332
Ok(stats)
322333
}
@@ -334,7 +345,7 @@ impl<S: crate::store::Store> Blobs<S> {
334345
let mut nodes_iter = nodes.into_iter();
335346
'outer: loop {
336347
match crate::get::db::get_to_db_in_steps(
337-
self.store.clone(),
348+
self.store().clone(),
338349
hash_and_format,
339350
progress.clone(),
340351
)
@@ -393,9 +404,9 @@ impl<S: crate::store::Store> Blobs<S> {
393404

394405
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
395406
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
396-
let db = self.store.clone();
397-
let events = self.events.clone();
398-
let rt = self.rt.clone();
407+
let db = self.store().clone();
408+
let events = self.events().clone();
409+
let rt = self.rt().clone();
399410

400411
Box::pin(async move {
401412
crate::provider::handle_connection(conn.await?, db, events, rt).await;
@@ -404,7 +415,7 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
404415
}
405416

406417
fn shutdown(&self) -> BoxedFuture<()> {
407-
let store = self.store.clone();
418+
let store = self.store().clone();
408419
Box::pin(async move {
409420
store.shutdown().await;
410421
})

0 commit comments

Comments
 (0)