diff --git a/Cargo.toml b/Cargo.toml index 1503932..7403431 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/0x676e67/rnet" authors = ["0x676e67 "] edition = "2024" rust-version = "1.85.0" +include = ["src/**/*", "LICENSE", "README.md"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] diff --git a/python/rnet/__init__.pyi b/python/rnet/__init__.pyi index 142f564..21a7226 100644 --- a/python/rnet/__init__.pyi +++ b/python/rnet/__init__.pyi @@ -280,24 +280,30 @@ class Message: class Streamer: r""" - A byte stream response. - An asynchronous iterator yielding data chunks from the response stream. - Used to stream response content. + A stream response. + An asynchronous iterator yielding data chunks (bytes) or HTTP trailers (HeaderMap) from the response stream. + Used to stream response content and receive HTTP trailers if present. Implemented in the `stream` method of the `Response` class. Can be used in an asynchronous for loop in Python. + When streaming a response, each iteration yields either a bytes object (for body data) or a HeaderMap (for HTTP trailers, if the server sends them). + This allows you to access HTTP/1.1 or HTTP/2 trailers in addition to the main body. + # Examples ```python import asyncio import rnet - from rnet import Method, Emulation + from rnet import Method, Emulation, HeaderMap async def main(): - resp = await rnet.get("https://httpbin.io/stream/20") + resp = await rnet.get("https://example.com/stream-with-trailers") async with resp.stream() as streamer: async for chunk in streamer: - print("Chunk: ", chunk) + if isinstance(chunk, bytes): + print("Chunk: ", chunk) + elif isinstance(chunk, HeaderMap): + print("Trailers: ", chunk) await asyncio.sleep(0.1) if __name__ == "__main__": @@ -305,16 +311,16 @@ class Streamer: ``` """ + def __iter__(self) -> "Streamer": ... + def __next__(self) -> bytes | HeaderMap: ... + def __enter__(self) -> "Streamer": ... + def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ... async def __aiter__(self) -> "Streamer": ... - async def __anext__(self) -> bytes | None: ... + async def __anext__(self) -> bytes | HeaderMap: ... async def __aenter__(self) -> Any: ... async def __aexit__( self, _exc_type: Any, _exc_value: Any, _traceback: Any - ) -> Any: ... - def __iter__(self) -> "Streamer": ... - def __next__(self) -> bytes: ... - def __enter__(self) -> "Streamer": ... - def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ... + ) -> None: ... class Response: r""" diff --git a/src/client/body/stream.rs b/src/client/body/stream.rs index c586b21..dda1f10 100644 --- a/src/client/body/stream.rs +++ b/src/client/body/stream.rs @@ -6,7 +6,8 @@ use std::{ }; use bytes::Bytes; -use futures_util::{FutureExt, Stream, StreamExt, TryStreamExt, stream::BoxStream}; +use futures_util::{FutureExt, Stream, StreamExt, stream::BoxStream}; +use http_body_util::BodyExt; use pyo3::{ IntoPyObjectExt, intern, prelude::*, @@ -14,7 +15,7 @@ use pyo3::{ }; use tokio::{sync::Mutex, task::JoinHandle}; -use crate::{buffer::PyBuffer, error::Error}; +use crate::{buffer::PyBuffer, error::Error, header::HeaderMap}; type Pending = Option>>>; @@ -31,6 +32,12 @@ pub enum PyBytesLike { String(PyBackedStr), } +#[derive(IntoPyObject)] +pub enum Frame { + Bytes(PyBuffer), + Trailers(HeaderMap), +} + /// A Python stream wrapper. pub struct PyStream { inner: PyStreamSource, @@ -40,7 +47,7 @@ pub struct PyStream { /// A bytes stream response. #[derive(Clone)] #[pyclass(subclass)] -pub struct Streamer(Arc>>>>); +pub struct Streamer(Arc>>); // ===== impl PyStream ===== @@ -59,24 +66,35 @@ impl From for PyStream { impl Streamer { /// Create a new [`Streamer`] instance. #[inline] - pub fn new(stream: impl Stream> + Send + 'static) -> Streamer { - Streamer(Arc::new(Mutex::new(Some(stream.boxed())))) + pub fn new(resp: wreq::Response) -> Streamer { + Streamer(Arc::new(Mutex::new(Some(resp)))) } - async fn next(self, error: fn() -> Error) -> PyResult { - let val = self + async fn next(self, error: fn() -> Error) -> PyResult { + let frame = self .0 .lock() .await .as_mut() .ok_or_else(error)? - .try_next() - .await; - - val.map_err(Error::Library)? - .map(PyBuffer::from) - .ok_or_else(error) - .map_err(Into::into) + .frame() + .await + .ok_or_else(error)? + .map_err(Error::Library)? + .into_data() + .map_err(|frame| frame.into_trailers()); + + match frame { + Ok(bytes) => Ok(Frame::Bytes(PyBuffer::from(bytes))), + Err(Ok(trailers)) => Ok(Frame::Trailers(HeaderMap(trailers))), + Err(Err(frame)) => { + // This branch should be unreachable, as `http_body::Frame` can only be `Data` or + // `Trailers`. The `debug_assert!` will help catch any future + // changes that violate this assumption. + debug_assert!(false, "Unexpected frame type: {:?}", frame); + Err(error().into()) + } + } } } @@ -93,7 +111,7 @@ impl Streamer { } #[inline] - fn __next__(&mut self, py: Python) -> PyResult { + fn __next__(&mut self, py: Python) -> PyResult { py.detach(|| { pyo3_async_runtimes::tokio::get_runtime() .block_on(self.clone().next(|| Error::StopIteration)) @@ -224,7 +242,7 @@ impl Stream for PyStream { Poll::Ready(Ok(res)) => Poll::Ready(res), Poll::Ready(Err(_)) => Poll::Ready(None), Poll::Pending => { - this.pending = Some(pending); + this.pending.replace(pending); Poll::Pending } } diff --git a/src/client/resp/http.rs b/src/client/resp/http.rs index f8455b7..7fb951c 100644 --- a/src/client/resp/http.rs +++ b/src/client/resp/http.rs @@ -105,6 +105,13 @@ impl Response { wreq::Response::from(response) } + /// Creates an empty response with the same metadata but no body content. + /// + /// Useful for operations that only need response headers/metadata without consuming the body. + fn empty_response(self) -> wreq::Response { + self.build_response(wreq::Body::from(Bytes::new())) + } + /// Consumes the response body and caches it in memory for reuse. /// /// If the body is streamable, it will be fully read into memory and cached. @@ -147,13 +154,6 @@ impl Response { } Err(Error::Memory) } - - /// Creates an empty response with the same metadata but no body content. - /// - /// Useful for operations that only need response headers/metadata without consuming the body. - fn empty_response(self) -> wreq::Response { - self.build_response(wreq::Body::from(Bytes::new())) - } } #[pymethods] @@ -208,7 +208,6 @@ impl Response { pub fn stream(&self) -> PyResult { self.clone() .stream_response() - .map(wreq::Response::bytes_stream) .map(Streamer::new) .map_err(Into::into) }