Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ repository = "https://github.com/0x676e67/rnet"
authors = ["0x676e67 <[email protected]>"]
edition = "2024"
rust-version = "1.85.0"
include = ["src/**/*", "LICENSE", "README.md"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
Expand Down
30 changes: 18 additions & 12 deletions python/rnet/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -280,41 +280,47 @@ class Message:

class Streamer:
r"""
A byte stream response.
An asynchronous iterator yielding data chunks from the response stream.
Used to stream response content.
A stream response.
An asynchronous iterator yielding data chunks (bytes) or HTTP trailers (HeaderMap) from the response stream.
Used to stream response content and receive HTTP trailers if present.
Implemented in the `stream` method of the `Response` class.
Can be used in an asynchronous for loop in Python.

When streaming a response, each iteration yields either a bytes object (for body data) or a HeaderMap (for HTTP trailers, if the server sends them).
This allows you to access HTTP/1.1 or HTTP/2 trailers in addition to the main body.

# Examples

```python
import asyncio
import rnet
from rnet import Method, Emulation
from rnet import Method, Emulation, HeaderMap

async def main():
resp = await rnet.get("https://httpbin.io/stream/20")
resp = await rnet.get("https://example.com/stream-with-trailers")
async with resp.stream() as streamer:
async for chunk in streamer:
print("Chunk: ", chunk)
if isinstance(chunk, bytes):
print("Chunk: ", chunk)
elif isinstance(chunk, HeaderMap):
print("Trailers: ", chunk)
await asyncio.sleep(0.1)

if __name__ == "__main__":
asyncio.run(main())
```
"""

def __iter__(self) -> "Streamer": ...
def __next__(self) -> bytes | HeaderMap: ...
def __enter__(self) -> "Streamer": ...
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ...
async def __aiter__(self) -> "Streamer": ...
async def __anext__(self) -> bytes | None: ...
async def __anext__(self) -> bytes | HeaderMap: ...
async def __aenter__(self) -> Any: ...
async def __aexit__(
self, _exc_type: Any, _exc_value: Any, _traceback: Any
) -> Any: ...
def __iter__(self) -> "Streamer": ...
def __next__(self) -> bytes: ...
def __enter__(self) -> "Streamer": ...
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ...
) -> None: ...

class Response:
r"""
Expand Down
50 changes: 34 additions & 16 deletions src/client/body/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::{
};

use bytes::Bytes;
use futures_util::{FutureExt, Stream, StreamExt, TryStreamExt, stream::BoxStream};
use futures_util::{FutureExt, Stream, StreamExt, stream::BoxStream};
use http_body_util::BodyExt;
use pyo3::{
IntoPyObjectExt, intern,
prelude::*,
pybacked::{PyBackedBytes, PyBackedStr},
};
use tokio::{sync::Mutex, task::JoinHandle};

use crate::{buffer::PyBuffer, error::Error};
use crate::{buffer::PyBuffer, error::Error, header::HeaderMap};

type Pending = Option<JoinHandle<Option<PyResult<PyBytesLike>>>>;

Expand All @@ -31,6 +32,12 @@ pub enum PyBytesLike {
String(PyBackedStr),
}

#[derive(IntoPyObject)]
pub enum Frame {
Bytes(PyBuffer),
Trailers(HeaderMap),
}

/// A Python stream wrapper.
pub struct PyStream {
inner: PyStreamSource,
Expand All @@ -40,7 +47,7 @@ pub struct PyStream {
/// A bytes stream response.
#[derive(Clone)]
#[pyclass(subclass)]
pub struct Streamer(Arc<Mutex<Option<BoxStream<'static, wreq::Result<Bytes>>>>>);
pub struct Streamer(Arc<Mutex<Option<wreq::Response>>>);

// ===== impl PyStream =====

Expand All @@ -59,24 +66,35 @@ impl From<PyStreamSource> for PyStream {
impl Streamer {
/// Create a new [`Streamer`] instance.
#[inline]
pub fn new(stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static) -> Streamer {
Streamer(Arc::new(Mutex::new(Some(stream.boxed()))))
pub fn new(resp: wreq::Response) -> Streamer {
Streamer(Arc::new(Mutex::new(Some(resp))))
}

async fn next(self, error: fn() -> Error) -> PyResult<PyBuffer> {
let val = self
async fn next(self, error: fn() -> Error) -> PyResult<Frame> {
let frame = self
.0
.lock()
.await
.as_mut()
.ok_or_else(error)?
.try_next()
.await;

val.map_err(Error::Library)?
.map(PyBuffer::from)
.ok_or_else(error)
.map_err(Into::into)
.frame()
.await
.ok_or_else(error)?
.map_err(Error::Library)?
.into_data()
.map_err(|frame| frame.into_trailers());

match frame {
Ok(bytes) => Ok(Frame::Bytes(PyBuffer::from(bytes))),
Err(Ok(trailers)) => Ok(Frame::Trailers(HeaderMap(trailers))),
Err(Err(frame)) => {
// This branch should be unreachable, as `http_body::Frame` can only be `Data` or
// `Trailers`. The `debug_assert!` will help catch any future
// changes that violate this assumption.
debug_assert!(false, "Unexpected frame type: {:?}", frame);
Err(error().into())
}
}
}
}

Expand All @@ -93,7 +111,7 @@ impl Streamer {
}

#[inline]
fn __next__(&mut self, py: Python) -> PyResult<PyBuffer> {
fn __next__(&mut self, py: Python) -> PyResult<Frame> {
py.detach(|| {
pyo3_async_runtimes::tokio::get_runtime()
.block_on(self.clone().next(|| Error::StopIteration))
Expand Down Expand Up @@ -224,7 +242,7 @@ impl Stream for PyStream {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => {
this.pending = Some(pending);
this.pending.replace(pending);
Poll::Pending
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/client/resp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ impl Response {
wreq::Response::from(response)
}

/// Creates an empty response with the same metadata but no body content.
///
/// Useful for operations that only need response headers/metadata without consuming the body.
fn empty_response(self) -> wreq::Response {
self.build_response(wreq::Body::from(Bytes::new()))
}

/// Consumes the response body and caches it in memory for reuse.
///
/// If the body is streamable, it will be fully read into memory and cached.
Expand Down Expand Up @@ -147,13 +154,6 @@ impl Response {
}
Err(Error::Memory)
}

/// Creates an empty response with the same metadata but no body content.
///
/// Useful for operations that only need response headers/metadata without consuming the body.
fn empty_response(self) -> wreq::Response {
self.build_response(wreq::Body::from(Bytes::new()))
}
}

#[pymethods]
Expand Down Expand Up @@ -208,7 +208,6 @@ impl Response {
pub fn stream(&self) -> PyResult<Streamer> {
self.clone()
.stream_response()
.map(wreq::Response::bytes_stream)
.map(Streamer::new)
.map_err(Into::into)
}
Expand Down
Loading