Skip to content

Commit ed7cca4

Browse files
authored
Cherry pick stream upload (interfaces only) from #14 (#15)
1 parent 1a78687 commit ed7cca4

File tree

27 files changed

+550
-491
lines changed

27 files changed

+550
-491
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backends/curl/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ nyquest-interface = { version = "0.1.0", path = "../../nyquest-interface", defau
3535
curl = { version = "0.4.47", default-features = false }
3636
curl-sys = { version = "0.4", default-features = false }
3737
iconv-native = { version = "0.1.0", optional = true, default-features = false }
38-
mio = { version = "1", optional = true, default-features = false }
3938
futures-channel = { version = "0.3", optional = true, default-features = false, features = [
4039
"alloc",
4140
] }
4241
futures-util = { version = "0.3", optional = true, default-features = false, features = [
4342
"std",
4443
] }
4544
slab = { version = "0.4", optional = true, default-features = false }
46-
memchr = "2"

backends/curl/src/async.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ use std::io;
22
use std::task::{ready, Poll};
33
use std::{pin::Pin, sync::Arc, task::Context};
44

5-
use curl::easy::Easy;
5+
use curl::easy::Easy2;
66
use nyquest_interface::r#async::{futures_io, AsyncResponse};
77
use nyquest_interface::Error as NyquestError;
88

9-
use crate::url::concat_url;
10-
9+
mod handler;
1110
mod r#loop;
11+
mod pause;
12+
13+
use crate::url::concat_url;
1214

1315
pub struct CurlMultiClientInner {
1416
options: nyquest_interface::client::ClientOptions,
@@ -114,17 +116,20 @@ impl nyquest_interface::r#async::AsyncClient for CurlMultiClient {
114116
&self,
115117
req: nyquest_interface::r#async::Request,
116118
) -> nyquest_interface::Result<Self::Response> {
117-
let req = loop {
118-
// TODO: CURLOPT_SHARE
119-
let mut easy = Easy::new();
119+
let req = {
120+
let mut easy = Easy2::new(handler::AsyncHandler::default());
121+
let raw_handle = easy.raw();
122+
easy.get_mut().pause = Some(pause::EasyPause::new(raw_handle));
120123
// FIXME: properly concat base_url and url
121124
let url = concat_url(self.inner.options.base_url.as_deref(), &req.relative_uri);
122-
crate::request::populate_request(&url, &req, &self.inner.options, &mut easy)?;
123-
let req = self.inner.loop_manager.start_request(easy).await?;
124-
match req {
125-
r#loop::MaybeStartedRequest::Gone => {}
126-
r#loop::MaybeStartedRequest::Started(req) => break req,
127-
}
125+
crate::request::populate_request(
126+
&url,
127+
req,
128+
&self.inner.options,
129+
&mut easy,
130+
|_, _| unimplemented!(),
131+
)?;
132+
self.inner.loop_manager.start_request(easy).await?
128133
};
129134
let mut res = req.wait_for_response().await?;
130135
res.max_response_buffer_size = self.inner.options.max_response_buffer_size;
@@ -141,8 +146,8 @@ impl nyquest_interface::r#async::AsyncBackend for crate::CurlBackend {
141146
) -> Result<Self::AsyncClient, NyquestError> {
142147
Ok(CurlMultiClient {
143148
inner: Arc::new(CurlMultiClientInner {
144-
options,
145149
loop_manager: r#loop::LoopManager::new(),
150+
options,
146151
}),
147152
})
148153
}

backends/curl/src/async/handler.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::sync::Arc;
2+
3+
use curl::easy::{Handler, WriteError};
4+
5+
use super::pause::EasyPause;
6+
use super::r#loop::SharedRequestContext;
7+
8+
#[derive(Default)]
9+
pub(super) struct AsyncHandler {
10+
// To be filled in the loop
11+
pub(super) ctx: Option<Arc<SharedRequestContext>>,
12+
// To be filled after Easy2 is constructed
13+
pub(super) pause: Option<EasyPause>,
14+
}
15+
16+
struct AsyncHandlerRef<'a> {
17+
ctx: &'a SharedRequestContext,
18+
pause: &'a mut EasyPause,
19+
}
20+
21+
impl AsyncHandler {
22+
fn get_ref(&mut self) -> Option<AsyncHandlerRef<'_>> {
23+
let ctx = self.ctx.as_ref()?;
24+
let pause = self.pause.as_mut()?;
25+
Some(AsyncHandlerRef { ctx, pause })
26+
}
27+
}
28+
29+
impl Handler for AsyncHandler {
30+
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
31+
let Some(inner) = self.get_ref() else {
32+
// ... signals an error condition to the library and returns CURLE_WRITE_ERROR.
33+
return Ok(0);
34+
};
35+
{
36+
let mut state = inner.ctx.state.lock().unwrap();
37+
let state = &mut state.0;
38+
state.write_data(data);
39+
}
40+
unsafe {
41+
inner.pause.pause_recv();
42+
}
43+
inner.ctx.waker.wake();
44+
Ok(data.len())
45+
}
46+
47+
fn header(&mut self, data: &[u8]) -> bool {
48+
let Some(inner) = self.get_ref() else {
49+
// ... signals an error condition to the library and returns CURLE_WRITE_ERROR.
50+
return false;
51+
};
52+
{
53+
let mut state = inner.ctx.state.lock().unwrap();
54+
let state = &mut state.0;
55+
if state.push_header_data(data) {
56+
unsafe {
57+
inner.pause.pause_recv();
58+
}
59+
}
60+
}
61+
inner.ctx.waker.wake();
62+
true
63+
}
64+
}

0 commit comments

Comments
 (0)