Skip to content

Commit 9ccdefd

Browse files
authored
feat(response): introduce trailers support (#467)
1 parent 75eb571 commit 9ccdefd

File tree

4 files changed

+60
-36
lines changed

4 files changed

+60
-36
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ repository = "https://github.com/0x676e67/rnet"
88
authors = ["0x676e67 <[email protected]>"]
99
edition = "2024"
1010
rust-version = "1.85.0"
11+
include = ["src/**/*", "LICENSE", "README.md"]
1112

1213
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1314
[lib]

python/rnet/__init__.pyi

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -280,41 +280,47 @@ class Message:
280280

281281
class Streamer:
282282
r"""
283-
A byte stream response.
284-
An asynchronous iterator yielding data chunks from the response stream.
285-
Used to stream response content.
283+
A stream response.
284+
An asynchronous iterator yielding data chunks (bytes) or HTTP trailers (HeaderMap) from the response stream.
285+
Used to stream response content and receive HTTP trailers if present.
286286
Implemented in the `stream` method of the `Response` class.
287287
Can be used in an asynchronous for loop in Python.
288288
289+
When streaming a response, each iteration yields either a bytes object (for body data) or a HeaderMap (for HTTP trailers, if the server sends them).
290+
This allows you to access HTTP/1.1 or HTTP/2 trailers in addition to the main body.
291+
289292
# Examples
290293
291294
```python
292295
import asyncio
293296
import rnet
294-
from rnet import Method, Emulation
297+
from rnet import Method, Emulation, HeaderMap
295298
296299
async def main():
297-
resp = await rnet.get("https://httpbin.io/stream/20")
300+
resp = await rnet.get("https://example.com/stream-with-trailers")
298301
async with resp.stream() as streamer:
299302
async for chunk in streamer:
300-
print("Chunk: ", chunk)
303+
if isinstance(chunk, bytes):
304+
print("Chunk: ", chunk)
305+
elif isinstance(chunk, HeaderMap):
306+
print("Trailers: ", chunk)
301307
await asyncio.sleep(0.1)
302308
303309
if __name__ == "__main__":
304310
asyncio.run(main())
305311
```
306312
"""
307313

314+
def __iter__(self) -> "Streamer": ...
315+
def __next__(self) -> bytes | HeaderMap: ...
316+
def __enter__(self) -> "Streamer": ...
317+
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ...
308318
async def __aiter__(self) -> "Streamer": ...
309-
async def __anext__(self) -> bytes | None: ...
319+
async def __anext__(self) -> bytes | HeaderMap: ...
310320
async def __aenter__(self) -> Any: ...
311321
async def __aexit__(
312322
self, _exc_type: Any, _exc_value: Any, _traceback: Any
313-
) -> Any: ...
314-
def __iter__(self) -> "Streamer": ...
315-
def __next__(self) -> bytes: ...
316-
def __enter__(self) -> "Streamer": ...
317-
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: ...
323+
) -> None: ...
318324

319325
class Response:
320326
r"""

src/client/body/stream.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ use std::{
66
};
77

88
use bytes::Bytes;
9-
use futures_util::{FutureExt, Stream, StreamExt, TryStreamExt, stream::BoxStream};
9+
use futures_util::{FutureExt, Stream, StreamExt, stream::BoxStream};
10+
use http_body_util::BodyExt;
1011
use pyo3::{
1112
IntoPyObjectExt, intern,
1213
prelude::*,
1314
pybacked::{PyBackedBytes, PyBackedStr},
1415
};
1516
use tokio::{sync::Mutex, task::JoinHandle};
1617

17-
use crate::{buffer::PyBuffer, error::Error};
18+
use crate::{buffer::PyBuffer, error::Error, header::HeaderMap};
1819

1920
type Pending = Option<JoinHandle<Option<PyResult<PyBytesLike>>>>;
2021

@@ -31,6 +32,12 @@ pub enum PyBytesLike {
3132
String(PyBackedStr),
3233
}
3334

35+
#[derive(IntoPyObject)]
36+
pub enum Frame {
37+
Bytes(PyBuffer),
38+
Trailers(HeaderMap),
39+
}
40+
3441
/// A Python stream wrapper.
3542
pub struct PyStream {
3643
inner: PyStreamSource,
@@ -40,7 +47,7 @@ pub struct PyStream {
4047
/// A bytes stream response.
4148
#[derive(Clone)]
4249
#[pyclass(subclass)]
43-
pub struct Streamer(Arc<Mutex<Option<BoxStream<'static, wreq::Result<Bytes>>>>>);
50+
pub struct Streamer(Arc<Mutex<Option<wreq::Response>>>);
4451

4552
// ===== impl PyStream =====
4653

@@ -59,24 +66,35 @@ impl From<PyStreamSource> for PyStream {
5966
impl Streamer {
6067
/// Create a new [`Streamer`] instance.
6168
#[inline]
62-
pub fn new(stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static) -> Streamer {
63-
Streamer(Arc::new(Mutex::new(Some(stream.boxed()))))
69+
pub fn new(resp: wreq::Response) -> Streamer {
70+
Streamer(Arc::new(Mutex::new(Some(resp))))
6471
}
6572

66-
async fn next(self, error: fn() -> Error) -> PyResult<PyBuffer> {
67-
let val = self
73+
async fn next(self, error: fn() -> Error) -> PyResult<Frame> {
74+
let frame = self
6875
.0
6976
.lock()
7077
.await
7178
.as_mut()
7279
.ok_or_else(error)?
73-
.try_next()
74-
.await;
75-
76-
val.map_err(Error::Library)?
77-
.map(PyBuffer::from)
78-
.ok_or_else(error)
79-
.map_err(Into::into)
80+
.frame()
81+
.await
82+
.ok_or_else(error)?
83+
.map_err(Error::Library)?
84+
.into_data()
85+
.map_err(|frame| frame.into_trailers());
86+
87+
match frame {
88+
Ok(bytes) => Ok(Frame::Bytes(PyBuffer::from(bytes))),
89+
Err(Ok(trailers)) => Ok(Frame::Trailers(HeaderMap(trailers))),
90+
Err(Err(frame)) => {
91+
// This branch should be unreachable, as `http_body::Frame` can only be `Data` or
92+
// `Trailers`. The `debug_assert!` will help catch any future
93+
// changes that violate this assumption.
94+
debug_assert!(false, "Unexpected frame type: {:?}", frame);
95+
Err(error().into())
96+
}
97+
}
8098
}
8199
}
82100

@@ -93,7 +111,7 @@ impl Streamer {
93111
}
94112

95113
#[inline]
96-
fn __next__(&mut self, py: Python) -> PyResult<PyBuffer> {
114+
fn __next__(&mut self, py: Python) -> PyResult<Frame> {
97115
py.detach(|| {
98116
pyo3_async_runtimes::tokio::get_runtime()
99117
.block_on(self.clone().next(|| Error::StopIteration))
@@ -224,7 +242,7 @@ impl Stream for PyStream {
224242
Poll::Ready(Ok(res)) => Poll::Ready(res),
225243
Poll::Ready(Err(_)) => Poll::Ready(None),
226244
Poll::Pending => {
227-
this.pending = Some(pending);
245+
this.pending.replace(pending);
228246
Poll::Pending
229247
}
230248
}

src/client/resp/http.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ impl Response {
105105
wreq::Response::from(response)
106106
}
107107

108+
/// Creates an empty response with the same metadata but no body content.
109+
///
110+
/// Useful for operations that only need response headers/metadata without consuming the body.
111+
fn empty_response(self) -> wreq::Response {
112+
self.build_response(wreq::Body::from(Bytes::new()))
113+
}
114+
108115
/// Consumes the response body and caches it in memory for reuse.
109116
///
110117
/// If the body is streamable, it will be fully read into memory and cached.
@@ -147,13 +154,6 @@ impl Response {
147154
}
148155
Err(Error::Memory)
149156
}
150-
151-
/// Creates an empty response with the same metadata but no body content.
152-
///
153-
/// Useful for operations that only need response headers/metadata without consuming the body.
154-
fn empty_response(self) -> wreq::Response {
155-
self.build_response(wreq::Body::from(Bytes::new()))
156-
}
157157
}
158158

159159
#[pymethods]
@@ -208,7 +208,6 @@ impl Response {
208208
pub fn stream(&self) -> PyResult<Streamer> {
209209
self.clone()
210210
.stream_response()
211-
.map(wreq::Response::bytes_stream)
212211
.map(Streamer::new)
213212
.map_err(Into::into)
214213
}

0 commit comments

Comments
 (0)