Skip to content

Commit 9714e86

Browse files
committed
Add implementation
1 parent 6e257d9 commit 9714e86

File tree

9 files changed

+407
-113
lines changed

9 files changed

+407
-113
lines changed

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,4 @@ Cargo.lock
4141
# These are backup files generated by rustfmt
4242
**/*.rs.bk
4343

44-
# Others
45-
async-pipe-rs.iws
46-
4744
# End of https://www.gitignore.io/api/rust,macos

Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
[package]
22
name = "stream-body"
33
version = "0.1.0"
4-
description = "An HttpBody implementation which supports streaming for the Rust HTTP library hyper"
4+
description = "An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper"
55
homepage = "https://github.com/rousan/stream-body"
66
repository = "https://github.com/rousan/stream-body"
7-
keywords = ["rust", "hyper", "aschronous", "stream", "writer"]
7+
keywords = ["rust", "hyper", "asynchronous", "stream", "writer", "channel"]
88
categories = ["asynchronous"]
99
authors = ["Rousan Ali <[email protected]>"]
1010
readme = "README.md"
1111
license = "MIT"
1212
edition = "2018"
1313

1414
[dependencies]
15-
hyper = "0.13"
1615
log = "0.4"
1716
pin-project-lite = "0.1.4"
1817
tokio = { version = "0.2", features= [] }
18+
async-pipe = "0.1"
19+
http-body = "0.3"
20+
bytes = "0.5"
21+
http = "0.2"
1922

2023
[dev-dependencies]
24+
hyper = "0.13"
2125
tokio = { version = "0.2", features = ["full"] }

README.md

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,75 @@
44
[![Documentation](https://docs.rs/stream-body/badge.svg)](https://docs.rs/stream-body)
55
[![MIT](https://img.shields.io/crates/l/stream-body.svg)](./LICENSE)
66

7-
An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation which supports streaming for the Rust HTTP library [hyper](https://hyper.rs/).
7+
An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/).
88

99
[Docs](https://docs.rs/stream-body)
1010

11-
## Example
11+
## Motivation
1212

13+
The existing [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html)
14+
as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) type.
15+
Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) and uses `&[u8]`
16+
slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.
17+
18+
Also, the [channel()](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) returns
19+
a pair of a [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html).
20+
Here, the [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again
21+
creates allocation/de-allocation overhead.
22+
To solve this, `StreamBody` has a method named `StreamBody::channel()` which returns a pair of an [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and the `StreamBody`
23+
itself. As the [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) accepts `&[u8]` instead of [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html), there will
24+
be no allocation/de-allocation overhead.
25+
26+
## Usage
27+
28+
First add this to your Cargo.toml:
29+
30+
```toml
31+
[dependencies]
32+
stream-body = "0.1"
33+
```
34+
35+
An example on handling a large file:
1336
```rust
14-
use stream_body;
37+
use hyper::service::{make_service_fn, service_fn};
38+
use hyper::{Body, Request, Response, Server};
39+
use std::{convert::Infallible, net::SocketAddr};
40+
use stream_body::StreamBody;
41+
use tokio::fs::File;
42+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
43+
44+
async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
45+
let (mut writer, body) = StreamBody::channel();
46+
47+
tokio::spawn(async move {
48+
let mut f = File::open("large-file").await.unwrap();
49+
50+
// Reuse this buffer
51+
let mut buf = [0_u8; 1024 * 16];
52+
loop {
53+
let read_count = f.read(&mut buf).await.unwrap();
54+
if read_count == 0 {
55+
break;
56+
}
57+
writer.write_all(&buf[..read_count]).await.unwrap();
58+
}
59+
});
60+
61+
Ok(Response::builder().body(body).unwrap())
62+
}
63+
64+
#[tokio::main]
65+
async fn main() {
66+
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
67+
68+
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
69+
70+
let server = Server::bind(&addr).serve(make_svc);
1571

72+
if let Err(e) = server.await {
73+
eprintln!("server error: {}", e);
74+
}
75+
}
1676
```
1777

1878
## Contributing

examples/main.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
use hyper::service::{make_service_fn, service_fn};
2-
use hyper::{body::Buf, Body, Request, Response, Server};
2+
use hyper::{Body, Request, Response, Server};
33
use std::{convert::Infallible, net::SocketAddr};
4-
use stream_body::{StreamBody, StreamData};
4+
use stream_body::StreamBody;
55
use tokio::fs::File;
6+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
67

7-
async fn handle(_: Request<Body>) -> Result<Response<StreamBody<File>>, Infallible> {
8-
// Ok(Response::new("Hello, World!".into()))
8+
async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
9+
let (mut writer, body) = StreamBody::channel();
910

10-
let file = File::open("./Cargo.toml").await.unwrap();
11+
tokio::spawn(async move {
12+
let mut f = File::open("large-file").await.unwrap();
1113

12-
let sb = StreamBody::new(file);
14+
let mut buf = [0_u8; 1024 * 16];
15+
loop {
16+
let read_count = f.read(&mut buf).await.unwrap();
17+
if read_count == 0 {
18+
break;
19+
}
20+
writer.write_all(&buf[..read_count]).await.unwrap();
21+
}
22+
});
1323

14-
Ok(Response::new(sb))
24+
Ok(Response::builder().body(body).unwrap())
1525
}
1626

1727
#[tokio::main]

rustfmt.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
max_width = 120
2+
tab_spaces = 4

0 commit comments

Comments
 (0)