Skip to content

Commit 6e257d9

Browse files
committed
Add initial code
1 parent 5e6341b commit 6e257d9

File tree

11 files changed

+259
-7
lines changed

11 files changed

+259
-7
lines changed

.idea/.gitignore

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ license = "MIT"
1212
edition = "2018"
1313

1414
[dependencies]
15+
hyper = "0.13"
1516
log = "0.4"
17+
pin-project-lite = "0.1.4"
18+
tokio = { version = "0.2", features= [] }
1619

1720
[dev-dependencies]
1821
tokio = { version = "0.2", features = ["full"] }

examples/main.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use hyper::service::{make_service_fn, service_fn};
2+
use hyper::{body::Buf, Body, Request, Response, Server};
3+
use std::{convert::Infallible, net::SocketAddr};
4+
use stream_body::{StreamBody, StreamData};
5+
use tokio::fs::File;
6+
7+
async fn handle(_: Request<Body>) -> Result<Response<StreamBody<File>>, Infallible> {
8+
// Ok(Response::new("Hello, World!".into()))
9+
10+
let file = File::open("./Cargo.toml").await.unwrap();
11+
12+
let sb = StreamBody::new(file);
13+
14+
Ok(Response::new(sb))
15+
}
16+
17+
#[tokio::main]
18+
async fn main() {
19+
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
20+
21+
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
22+
23+
let server = Server::bind(&addr).serve(make_svc);
24+
25+
if let Err(e) = server.await {
26+
eprintln!("server error: {}", e);
27+
}
28+
}

src/body.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use crate::data::StreamData;
2+
use crate::state::State;
3+
use hyper::body::Buf;
4+
use hyper::{body::HttpBody, header::HeaderValue, HeaderMap};
5+
use pin_project_lite::pin_project;
6+
use std::io::BufReader;
7+
use std::marker::PhantomData;
8+
use std::marker::Unpin;
9+
use std::mem::MaybeUninit;
10+
use std::pin::Pin;
11+
use std::sync::{Arc, Mutex};
12+
use std::task::{Context, Poll};
13+
use tokio::io::{self, AsyncRead};
14+
15+
pub const DEFAULT_BUF_SIZE: usize = 8 * 1024;
16+
17+
pin_project! {
18+
pub struct StreamBody<R> {
19+
#[pin]
20+
pub(crate) reader: R,
21+
pub(crate) buf: Box<[u8]>,
22+
pub(crate) len: usize,
23+
pub(crate) reached_eof: bool,
24+
pub(crate) state: Arc<Mutex<State>>,
25+
}
26+
}
27+
28+
impl<R> StreamBody<R>
29+
where
30+
R: AsyncRead + Send + Sync,
31+
{
32+
pub fn new(reader: R) -> StreamBody<R> {
33+
StreamBody::with_capacity(DEFAULT_BUF_SIZE, reader)
34+
}
35+
36+
pub fn with_capacity(capacity: usize, reader: R) -> StreamBody<R> {
37+
unsafe {
38+
let mut buffer = Vec::with_capacity(capacity);
39+
buffer.set_len(capacity);
40+
41+
{
42+
let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
43+
reader.prepare_uninitialized_buffer(b);
44+
}
45+
46+
StreamBody {
47+
reader,
48+
buf: buffer.into_boxed_slice(),
49+
len: 0,
50+
reached_eof: false,
51+
state: Arc::new(Mutex::new(State {
52+
is_current_stream_data_consumed: true,
53+
})),
54+
}
55+
}
56+
}
57+
58+
fn check_if_prev_data_consumed(&self) -> io::Result<()> {
59+
let mut state;
60+
match self.state.lock() {
61+
Ok(s) => state = s,
62+
Err(err) => {
63+
return Err(io::Error::new(
64+
io::ErrorKind::Other,
65+
format!(
66+
"{}: StreamBody: Failed to lock the stream state: {}",
67+
env!("CARGO_PKG_NAME"),
68+
err
69+
),
70+
))
71+
}
72+
}
73+
74+
if !state.is_current_stream_data_consumed {
75+
return Err(io::Error::new(
76+
io::ErrorKind::Other,
77+
"The previous StreamData is not yet consumed",
78+
));
79+
}
80+
81+
Ok(())
82+
}
83+
}
84+
85+
impl<R> HttpBody for StreamBody<R>
86+
where
87+
R: AsyncRead + Send + Sync + Unpin,
88+
{
89+
type Data = StreamData;
90+
type Error = io::Error;
91+
92+
fn poll_data(
93+
mut self: Pin<&mut Self>,
94+
cx: &mut Context,
95+
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
96+
if let Err(err) = self.check_if_prev_data_consumed() {
97+
return Poll::Ready(Some(Err(err)));
98+
}
99+
100+
let mut me = self.as_mut().project();
101+
let poll_status = me.reader.poll_read(cx, &mut me.buf[..]);
102+
103+
match poll_status {
104+
Poll::Pending => Poll::Pending,
105+
Poll::Ready(result) => match result {
106+
Ok(read_count) if read_count > 0 => {
107+
let buf = &self.buf[..];
108+
let data = StreamData::new(&self.buf[..], Arc::clone(&self.state));
109+
Poll::Ready(Some(Ok(data)))
110+
}
111+
Ok(_) => {
112+
self.reached_eof = true;
113+
Poll::Ready(None)
114+
}
115+
Err(err) => Poll::Ready(Some(Err(err))),
116+
},
117+
}
118+
}
119+
120+
fn poll_trailers(
121+
self: Pin<&mut Self>,
122+
cx: &mut Context,
123+
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
124+
Poll::Ready(Ok(None))
125+
}
126+
127+
fn is_end_stream(&self) -> bool {
128+
self.reached_eof
129+
}
130+
}

src/data.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::state::State;
2+
use hyper::body::Buf;
3+
use std::sync::{Arc, Mutex};
4+
5+
pub struct StreamData {
6+
ptr: *const u8,
7+
len: usize,
8+
pos: usize,
9+
state: Arc<Mutex<State>>,
10+
}
11+
12+
impl StreamData {
13+
pub(crate) fn new(s: &[u8], state: Arc<Mutex<State>>) -> StreamData {
14+
StreamData {
15+
ptr: s.as_ptr(),
16+
len: s.len(),
17+
pos: 0,
18+
state,
19+
}
20+
}
21+
}
22+
23+
unsafe impl std::marker::Send for StreamData {}
24+
25+
impl Drop for StreamData {
26+
fn drop(&mut self) {
27+
match self.state.lock() {
28+
Ok(mut state) => {
29+
state.is_current_stream_data_consumed = true;
30+
}
31+
Err(err) => log::error!(
32+
"{}: StreamData: Failed to update the drop state: {}",
33+
env!("CARGO_PKG_NAME"),
34+
err
35+
),
36+
}
37+
}
38+
}
39+
40+
impl Buf for StreamData {
41+
fn remaining(&self) -> usize {
42+
self.len - self.pos
43+
}
44+
45+
fn bytes(&self) -> &[u8] {
46+
unsafe { std::slice::from_raw_parts(self.ptr.add(self.pos), self.len - self.pos) }
47+
}
48+
49+
fn advance(&mut self, cnt: usize) {
50+
self.pos += cnt;
51+
}
52+
}

src/lib.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
#[cfg(test)]
2-
mod tests {
3-
#[test]
4-
fn it_works() {
5-
assert_eq!(2 + 2, 4);
6-
}
7-
}
1+
pub use body::StreamBody;
2+
pub use data::StreamData;
3+
4+
mod body;
5+
mod data;
6+
mod state;

src/state.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(crate) struct State {
2+
pub(crate) is_current_stream_data_consumed: bool,
3+
}

0 commit comments

Comments
 (0)