Skip to content

Commit d087247

Browse files
authored
Merge 5edec02 into 57112e6
2 parents 57112e6 + 5edec02 commit d087247

27 files changed

+379
-442
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ futures-util = "0.3.30"
9696
testdir = "0.9.1"
9797

9898
[features]
99-
default = ["fs-store", "net_protocol"]
99+
default = ["fs-store", "net_protocol", "formats-collection"]
100100
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
101101
net_protocol = ["downloader", "dep:futures-util"]
102102
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
@@ -114,6 +114,8 @@ rpc = [
114114
"dep:ssh-key",
115115
"downloader",
116116
]
117+
formats = []
118+
formats-collection = ["formats"]
117119

118120
example-iroh = [
119121
"dep:clap",
@@ -129,6 +131,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]
129131

130132
[[example]]
131133
name = "provide-bytes"
134+
required-features = ["formats-collection"]
132135

133136
[[example]]
134137
name = "fetch-fsm"

examples/fetch-fsm.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::net::SocketAddr;
77

88
use anyhow::{Context, Result};
99
use iroh_blobs::{
10-
get::fsm::{AtInitial, ConnectedNext, EndBlobNext},
10+
fetch::fsm::{AtInitial, ConnectedNext, EndBlobNext},
1111
hashseq::HashSeq,
1212
protocol::GetRequest,
1313
Hash,
@@ -63,14 +63,14 @@ async fn main() -> Result<()> {
6363
// create a request for a collection
6464
let request = GetRequest::all(hash);
6565
// create the initial state of the finite state machine
66-
let initial = iroh_blobs::get::fsm::start(connection, request);
66+
let initial = iroh_blobs::fetch::fsm::start(connection, request);
6767

6868
write_collection(initial).await
6969
} else {
7070
// create a request for a single blob
7171
let request = GetRequest::single(hash);
7272
// create the initial state of the finite state machine
73-
let initial = iroh_blobs::get::fsm::start(connection, request);
73+
let initial = iroh_blobs::fetch::fsm::start(connection, request);
7474

7575
write_blob(initial).await
7676
}
@@ -119,7 +119,7 @@ async fn write_collection(initial: AtInitial) -> Result<()> {
119119
}
120120

121121
// move to the header
122-
let header: iroh_blobs::get::fsm::AtBlobHeader = start_root.next();
122+
let header: iroh_blobs::fetch::fsm::AtBlobHeader = start_root.next();
123123
let (root_end, hashes_bytes) = header.concatenate_into_vec().await?;
124124
let next = root_end.next();
125125
let EndBlobNext::MoreChildren(at_meta) = next else {

examples/fetch-stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use bytes::Bytes;
1111
use futures_lite::{Stream, StreamExt};
1212
use genawaiter::sync::{Co, Gen};
1313
use iroh_blobs::{
14-
get::fsm::{AtInitial, BlobContentNext, ConnectedNext, EndBlobNext},
14+
fetch::fsm::{AtInitial, BlobContentNext, ConnectedNext, EndBlobNext},
1515
hashseq::HashSeq,
1616
protocol::GetRequest,
1717
Hash,
@@ -68,7 +68,7 @@ async fn main() -> Result<()> {
6868
let request = GetRequest::all(hash);
6969

7070
// create the initial state of the finite state machine
71-
let initial = iroh_blobs::get::fsm::start(connection, request);
71+
let initial = iroh_blobs::fetch::fsm::start(connection, request);
7272

7373
// create a stream that yields all the data of the blob
7474
stream_children(initial).boxed_local()
@@ -77,7 +77,7 @@ async fn main() -> Result<()> {
7777
let request = GetRequest::single(hash);
7878

7979
// create the initial state of the finite state machine
80-
let initial = iroh_blobs::get::fsm::start(connection, request);
80+
let initial = iroh_blobs::fetch::fsm::start(connection, request);
8181

8282
// create a stream that yields all the data of the blob
8383
stream_blob(initial).boxed_local()
@@ -166,7 +166,7 @@ fn stream_children(initial: AtInitial) -> impl Stream<Item = io::Result<Bytes>>
166166
));
167167
}
168168
// move to the header
169-
let header: iroh_blobs::get::fsm::AtBlobHeader = start_root.next();
169+
let header: iroh_blobs::fetch::fsm::AtBlobHeader = start_root.next();
170170
let (root_end, hashes_bytes) = header.concatenate_into_vec().await?;
171171

172172
// parse the hashes from the hash sequence bytes

examples/local-swarm-discovery.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ mod progress {
140140
ProgressStyle,
141141
};
142142
use iroh_blobs::{
143-
get::{db::DownloadProgress, progress::BlobProgress, Stats},
143+
fetch::{
144+
progress::{BlobProgress, DownloadProgress},
145+
Stats,
146+
},
144147
Hash,
145148
};
146149

src/cli.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
1919
use tokio::io::AsyncWriteExt;
2020

2121
use crate::{
22-
get::{db::DownloadProgress, progress::BlobProgress, Stats},
23-
net_protocol::DownloadMode,
22+
fetch::{
23+
progress::{BlobProgress, DownloadProgress},
24+
Stats,
25+
},
2426
provider::AddProgress,
2527
rpc::client::blobs::{
2628
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,

src/downloader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
5555
use tracing::{debug, error, error_span, trace, warn, Instrument};
5656

5757
use crate::{
58-
get::{db::DownloadProgress, Stats},
58+
fetch::{progress::DownloadProgress, Stats},
5959
metrics::Metrics,
6060
store::Store,
6161
util::{local_pool::LocalPoolHandle, progress::ProgressSender},

src/downloader/get.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ use iroh::endpoint;
77

88
use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
99
use crate::{
10-
get::{db::get_to_db_in_steps, error::GetError},
11-
store::Store,
10+
fetch::Error,
11+
store::{fetch_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
1212
};
1313

14-
impl From<GetError> for FailureAction {
15-
fn from(e: GetError) -> Self {
14+
impl From<Error> for FailureAction {
15+
fn from(e: Error) -> Self {
1616
match e {
17-
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
18-
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19-
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20-
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
21-
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
17+
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
18+
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19+
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20+
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
21+
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
2222
// TODO: what do we want to do on local failures?
23-
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
23+
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
2424
}
2525
}
2626
}
@@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {
3434

3535
impl<S: Store> Getter for IoGetter<S> {
3636
type Connection = endpoint::Connection;
37-
type NeedsConn = crate::get::db::GetStateNeedsConn;
37+
type NeedsConn = FetchStateNeedsConn;
3838

3939
fn get(
4040
&mut self,
@@ -43,12 +43,10 @@ impl<S: Store> Getter for IoGetter<S> {
4343
) -> GetStartFut<Self::NeedsConn> {
4444
let store = self.store.clone();
4545
async move {
46-
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
46+
match fetch_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
4747
Err(err) => Err(err.into()),
48-
Ok(crate::get::db::GetState::Complete(stats)) => {
49-
Ok(super::GetOutput::Complete(stats))
50-
}
51-
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
48+
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
49+
Ok(FetchState::NeedsConn(needs_conn)) => {
5250
Ok(super::GetOutput::NeedsConn(needs_conn))
5351
}
5452
}
@@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
5755
}
5856
}
5957

60-
impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
58+
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
6159
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
6260
async move {
6361
let res = self.proceed(conn).await;
@@ -73,13 +71,13 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
7371
}
7472

7573
#[cfg(feature = "metrics")]
76-
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
74+
fn track_metrics(res: &Result<crate::fetch::Stats, Error>) {
7775
use iroh_metrics::{inc, inc_by};
7876

7977
use crate::metrics::Metrics;
8078
match res {
8179
Ok(stats) => {
82-
let crate::get::Stats {
80+
let crate::fetch::Stats {
8381
bytes_written,
8482
bytes_read: _,
8583
elapsed,
@@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
9088
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
9189
}
9290
Err(e) => match &e {
93-
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
91+
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
9492
_ => inc!(Metrics, downloads_error),
9593
},
9694
}

src/downloader/progress.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use parking_lot::Mutex;
1111

1212
use super::DownloadKind;
1313
use crate::{
14-
get::{db::DownloadProgress, progress::TransferState},
14+
fetch::progress::{DownloadProgress, TransferState},
1515
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
1616
};
1717

@@ -21,7 +21,7 @@ pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgress>;
2121
/// Track the progress of downloads.
2222
///
2323
/// This struct allows to create [`ProgressSender`] structs to be passed to
24-
/// [`crate::get::db::get_to_db`]. Each progress sender can be subscribed to by any number of
24+
/// [`crate::store::fetch_to_db`]. Each progress sender can be subscribed to by any number of
2525
/// [`ProgressSubscriber`] channel senders, which will receive each progress update (if they have
2626
/// capacity). Additionally, the [`ProgressTracker`] maintains a [`TransferState`] for each
2727
/// transfer, applying each progress update to update this state. When subscribing to an already

src/downloader/test.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use iroh::SecretKey;
1010

1111
use super::*;
1212
use crate::{
13-
get::{
14-
db::BlobId,
15-
progress::{BlobProgress, TransferState},
16-
},
13+
fetch::progress::{BlobId, BlobProgress, TransferState},
1714
util::{
1815
local_pool::LocalPool,
1916
progress::{AsyncChannelProgressSender, IdGenerator},

src/downloader/test/getter.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ impl Getter for TestingGetter {
3636
// since for testing we don't need a real connection, just keep track of what peer is the
3737
// request being sent to
3838
type Connection = NodeId;
39-
type NeedsConn = GetStateNeedsConn;
39+
type NeedsConn = FetchStateNeedsConn;
4040

4141
fn get(
4242
&mut self,
4343
kind: DownloadKind,
4444
progress_sender: BroadcastProgressSender,
4545
) -> GetStartFut<Self::NeedsConn> {
46-
std::future::ready(Ok(downloader::GetOutput::NeedsConn(GetStateNeedsConn(
46+
std::future::ready(Ok(downloader::GetOutput::NeedsConn(FetchStateNeedsConn(
4747
self.clone(),
4848
kind,
4949
progress_sender,
@@ -53,11 +53,11 @@ impl Getter for TestingGetter {
5353
}
5454

5555
#[derive(Debug)]
56-
pub(super) struct GetStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);
56+
pub(super) struct FetchStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);
5757

58-
impl downloader::NeedsConn<NodeId> for GetStateNeedsConn {
58+
impl downloader::NeedsConn<NodeId> for FetchStateNeedsConn {
5959
fn proceed(self, peer: NodeId) -> super::GetProceedFut {
60-
let GetStateNeedsConn(getter, kind, progress_sender) = self;
60+
let FetchStateNeedsConn(getter, kind, progress_sender) = self;
6161
let mut inner = getter.0.write();
6262
inner.request_history.push((kind, peer));
6363
let request_duration = inner.request_duration;

0 commit comments

Comments
 (0)