diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b4e086b2..7f7ed5dc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -143,7 +143,7 @@ jobs: - uses: taiki-e/install-action@cross - name: test - run: cross test --all --target ${{ matrix.target }} -- --test-threads=12 + run: cross test --all --target ${{ matrix.target }} -- --test-threads=4 env: RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }} diff --git a/src/api.rs b/src/api.rs index 2296c3d7..56527994 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,7 +4,7 @@ //! with a remote store via rpc calls. //! //! The entry point for the api is the [`Store`] struct. There are several ways -//! to obtain a `Store` instance: it is available via [`Deref`](std::ops::Deref) +//! to obtain a `Store` instance: it is available via [`Deref`] //! from the different store implementations //! (e.g. [`MemStore`](crate::store::mem::MemStore) //! and [`FsStore`](crate::store::fs::FsStore)) as well as on the diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 0f79838f..d0b94859 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt}; use quinn::SendStream; use range_collections::{range_set::RangeSetRange, RangeSet2}; use ref_cast::RefCast; +use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; use tracing::trace; +mod reader; +pub use reader::BlobReader; // Public reexports from the proto module. // @@ -102,6 +105,38 @@ impl Blobs { }) } + /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`] + /// and therefore can be used to read the blob's content. + /// + /// Any access to parts of the blob that are not present will result in an error. + /// + /// Example: + /// ```rust + /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs}; + /// use tokio::io::AsyncReadExt; + /// + /// # async fn example() -> anyhow::Result<()> { + /// let store = MemStore::new(); + /// let tag = store.add_slice(b"Hello, world!").await?; + /// let mut reader = store.reader(tag.hash); + /// let mut buf = String::new(); + /// reader.read_to_string(&mut buf).await?; + /// assert_eq!(buf, "Hello, world!"); + /// # Ok(()) + /// } + /// ``` + pub fn reader(&self, hash: impl Into) -> BlobReader { + self.reader_with_opts(ReaderOptions { hash: hash.into() }) + } + + /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`] + /// and therefore can be used to read the blob's content. + /// + /// Any access to parts of the blob that are not present will result in an error. + pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader { + BlobReader::new(self.clone(), options) + } + /// Delete a blob. /// /// This function is not public, because it does not work as expected when called manually, @@ -647,6 +682,12 @@ impl<'a> AddProgress<'a> { } } +/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReaderOptions { + pub hash: Hash, +} + /// An observe result. Awaiting this will return the current state. /// /// Calling [`ObserveProgress::stream`] will return a stream of updates, where @@ -856,7 +897,7 @@ impl ExportRangesProgress { /// range of 0..100, you will get the entire first chunk, 0..1024. /// /// It is up to the caller to clip the ranges to the requested ranges. - pub async fn stream(self) -> impl Stream { + pub fn stream(self) -> impl Stream { Gen::new(|co| async move { let mut rx = match self.inner.await { Ok(rx) => rx, diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs new file mode 100644 index 00000000..e15e374d --- /dev/null +++ b/src/api/blobs/reader.rs @@ -0,0 +1,334 @@ +use std::{ + io::{self, ErrorKind, SeekFrom}, + pin::Pin, + task::{Context, Poll}, +}; + +use n0_future::StreamExt; + +use crate::{ + api::{ + blobs::{Blobs, ReaderOptions}, + proto::ExportRangesItem, + }, + Hash, +}; + +/// A reader for blobs that implements `AsyncRead` and `AsyncSeek`. +#[derive(Debug)] +pub struct BlobReader { + blobs: Blobs, + options: ReaderOptions, + state: ReaderState, +} + +#[derive(Default, derive_more::Debug)] +enum ReaderState { + Idle { + position: u64, + }, + Seeking { + position: u64, + }, + Reading { + position: u64, + #[debug(skip)] + op: n0_future::boxed::BoxStream, + }, + #[default] + Poisoned, +} + +impl BlobReader { + pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self { + Self { + blobs, + options, + state: ReaderState::Idle { position: 0 }, + } + } + + pub fn hash(&self) -> &Hash { + &self.options.hash + } +} + +impl tokio::io::AsyncRead for BlobReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + let mut position1 = None; + loop { + let guard = &mut this.state; + match std::mem::take(guard) { + ReaderState::Idle { position } => { + // todo: read until next page boundary instead of fixed size + let len = buf.remaining() as u64; + let end = position.checked_add(len).ok_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading") + })?; + // start the export op for the entire size of the buffer, and convert to a stream + let stream = this + .blobs + .export_ranges(this.options.hash, position..end) + .stream(); + position1 = Some(position); + *guard = ReaderState::Reading { + position, + op: Box::pin(stream), + }; + } + ReaderState::Reading { position, mut op } => { + let position1 = position1.get_or_insert(position); + match op.poll_next(cx) { + Poll::Ready(Some(ExportRangesItem::Size(_))) => { + *guard = ReaderState::Reading { position, op }; + } + Poll::Ready(Some(ExportRangesItem::Data(data))) => { + if data.offset != *position1 { + break Poll::Ready(Err(io::Error::other( + "Data offset does not match expected position", + ))); + } + buf.put_slice(&data.data); + // update just local position1, not the position in the state. + *position1 = + position1 + .checked_add(data.data.len() as u64) + .ok_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "Position overflow") + })?; + *guard = ReaderState::Reading { position, op }; + } + Poll::Ready(Some(ExportRangesItem::Error(err))) => { + *guard = ReaderState::Idle { position }; + break Poll::Ready(Err(io::Error::other(format!( + "Error reading data: {err}" + )))); + } + Poll::Ready(None) => { + // done with the stream, go back in idle. + *guard = ReaderState::Idle { + position: *position1, + }; + break Poll::Ready(Ok(())); + } + Poll::Pending => { + break if position != *position1 { + // we read some data so we need to abort the op. + // + // we can't be sure we won't be called with the same buf size next time. + *guard = ReaderState::Idle { + position: *position1, + }; + Poll::Ready(Ok(())) + } else { + // nothing was read yet, we remain in the reading state + // + // we make an assumption here that the next call will be with the same buf size. + *guard = ReaderState::Reading { + position: *position1, + op, + }; + Poll::Pending + }; + } + } + } + state @ ReaderState::Seeking { .. } => { + // should I try to recover from this or just keep it poisoned? + this.state = state; + break Poll::Ready(Err(io::Error::other("Can't read while seeking"))); + } + ReaderState::Poisoned => { + break Poll::Ready(Err(io::Error::other("Reader is poisoned"))); + } + }; + } + } +} + +impl tokio::io::AsyncSeek for BlobReader { + fn start_seek( + self: std::pin::Pin<&mut Self>, + seek_from: tokio::io::SeekFrom, + ) -> io::Result<()> { + let this = self.get_mut(); + let guard = &mut this.state; + match std::mem::take(guard) { + ReaderState::Idle { position } => { + let position1 = match seek_from { + SeekFrom::Start(pos) => pos, + SeekFrom::Current(offset) => { + position.checked_add_signed(offset).ok_or_else(|| { + io::Error::new( + ErrorKind::InvalidInput, + "Position overflow when seeking", + ) + })? + } + SeekFrom::End(_offset) => { + // todo: support seeking from end if we know the size + return Err(io::Error::new( + ErrorKind::InvalidInput, + "Seeking from end is not supported yet", + ))?; + } + }; + *guard = ReaderState::Seeking { + position: position1, + }; + Ok(()) + } + ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), + ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")), + ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + } + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let guard = &mut this.state; + Poll::Ready(match std::mem::take(guard) { + ReaderState::Seeking { position } => { + *guard = ReaderState::Idle { position }; + Ok(position) + } + ReaderState::Idle { position } => { + // seek calls poll_complete just in case, to finish a pending seek operation + // before the next seek operation. So it is poll_complete/start_seek/poll_complete + *guard = ReaderState::Idle { position }; + Ok(position) + } + state @ ReaderState::Reading { .. } => { + // should I try to recover from this or just keep it poisoned? + *guard = state; + Err(io::Error::other("Can't seek while reading")) + } + ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + }) + } +} + +#[cfg(test)] +mod tests { + use bao_tree::ChunkRanges; + use testresult::TestResult; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + use super::*; + use crate::{ + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, + }, + mem::MemStore, + }, + util::ChunkRangesExt, + }; + + async fn reader_smoke(blobs: &Blobs) -> TestResult<()> { + for size in INTERESTING_SIZES { + let data = test_data(size); + let tag = blobs.add_bytes(data.clone()).await?; + // read all + { + let mut reader = blobs.reader(tag.hash); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await?; + assert_eq!(buf, data); + let pos = reader.stream_position().await?; + assert_eq!(pos, data.len() as u64); + } + // seek to mid and read all + { + let mut reader = blobs.reader(tag.hash); + let mid = size / 2; + reader.seek(SeekFrom::Start(mid as u64)).await?; + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await?; + assert_eq!(buf, data[mid..].to_vec()); + let pos = reader.stream_position().await?; + assert_eq!(pos, data.len() as u64); + } + } + Ok(()) + } + + async fn reader_partial(blobs: &Blobs) -> TestResult<()> { + for size in INTERESTING_SIZES { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + println!("importing {} bytes", bao.len()); + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + // read the first chunk or the entire blob, whatever is smaller + // this should work! + { + let mut reader = blobs.reader(hash); + let valid = size.min(1024); + let mut buf = vec![0u8; valid]; + reader.read_exact(&mut buf).await?; + assert_eq!(buf, data[..valid]); + let pos = reader.stream_position().await?; + assert_eq!(pos, valid as u64); + } + if size > 1024 { + // read the part we don't have - should immediately return an error + { + let mut reader = blobs.reader(hash); + let mut rest = vec![0u8; size - 1024]; + reader.seek(SeekFrom::Start(1024)).await?; + let res = reader.read_exact(&mut rest).await; + assert!(res.is_err()); + } + // read crossing the end of the blob - should return an error despite + // the first bytes being valid. + // A read that fails should not update the stream position. + { + let mut reader = blobs.reader(hash); + let mut buf = vec![0u8; size]; + let res = reader.read(&mut buf).await; + assert!(res.is_err()); + let pos = reader.stream_position().await?; + assert_eq!(pos, 0); + } + } + } + Ok(()) + } + + #[tokio::test] + async fn reader_partial_fs() -> TestResult<()> { + let testdir = tempfile::tempdir()?; + let store = FsStore::load(testdir.path().to_owned()).await?; + reader_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_partial_memory() -> TestResult<()> { + let store = MemStore::new(); + reader_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_smoke_fs() -> TestResult<()> { + let testdir = tempfile::tempdir()?; + let store = FsStore::load(testdir.path().to_owned()).await?; + reader_smoke(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_smoke_memory() -> TestResult<()> { + let store = MemStore::new(); + reader_smoke(store.blobs()).await?; + Ok(()) + } +} diff --git a/src/store/fs.rs b/src/store/fs.rs index 024d9786..b0c1eb60 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -989,7 +989,7 @@ async fn export_ranges_impl( ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; trace!( - "exporting ranges: {hash} {ranges:?} size={}", + "export_ranges: exporting ranges: {hash} {ranges:?} size={}", handle.current_size()? ); debug_assert!(handle.hash() == hash, "hash mismatch"); @@ -1012,11 +1012,9 @@ async fn export_ranges_impl( loop { let end: u64 = (offset + bs).min(range.end); let size = (end - offset) as usize; - tx.send(ExportRangesItem::Data(Leaf { - offset, - data: data.read_bytes_at(offset, size)?, - })) - .await?; + let res = data.read_bytes_at(offset, size); + tx.send(ExportRangesItem::Data(Leaf { offset, data: res? })) + .await?; offset = end; if offset >= range.end { break;