Skip to content

Commit 7b78446

Browse files
authored
backends/feat: stream upload (#14)
* winrt blocking stream upload * winrt async stream upload * curl Easy2 (WIP) * curl Easy2 * curl request context shared * winrt fix multipart upload * curl refactor (WIP) * refactor unsized stream * nsurlsession add dummy input stream * nsurlsession pub crate input stream ivars * nsurlsession do not override cfstream stuff * nsurlsession retain runloop * nsurlsession async stream * nsurlsession fix body stream * nsurlsession do not poll stream on eof * curl stream upload (WIP) async unsized mime stream is not working? * [skip ci] curl revert debug eprint * reqwest stream upload (non-wasm) * curl fix multipart unsized upload * interface avoid changess for utils * nsurlsession multipart stream * curl fix missing futures-io * curl fix missing futures-util io * nsurlsession fix missing futures-util io * update README
1 parent 1ab3758 commit 7b78446

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2699
-419
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,19 @@ The following items are planned in MVP:
5959
- [x] Nyquest blocking API
6060
- [x] Nyquest async API
6161
- [x] Backend: WinRT HttpClient
62-
- [x] Blocking
63-
- [x] Async
6462
- [x] Backend: libcurl
65-
- [x] Blocking
66-
- [x] Async
6763
- [x] Backend: NSURLSession
6864
- [x] Backend: reqwest (with WASM support)
6965
- [x] Client Options
7066
- [x] Streaming download
67+
- [x] Streaming upload
7168
- [x] Test framework for backends
7269
- [x] Presets
7370
- [x] Documentation
7471

7572
Future work may include:
7673

77-
- [ ] Streaming upload
74+
- [ ] WebSocket
7875
- [ ] Backend: WASM fetch
7976
- [ ] Cookie management
8077
- [ ] Progress tracking

backends/curl/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ futures-channel = { version = "0.3", optional = true, default-features = false,
4141
] }
4242
futures-util = { version = "0.3", optional = true, default-features = false, features = [
4343
"std",
44+
"io",
4445
] }
4546
slab = { version = "0.4", optional = true, default-features = false }
4647
pin-project-lite = "0.2"

backends/curl/src/async.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
1+
use std::cell::RefCell;
12
use std::io;
3+
use std::pin::pin;
24
use std::task::{ready, Poll};
35
use std::{pin::Pin, sync::Arc, task::Context};
46

7+
use futures_util::future::{select, Either};
58
use nyquest_interface::r#async::{futures_io, AsyncResponse};
69
use nyquest_interface::Error as NyquestError;
710

811
mod handler;
912
mod r#loop;
1013
mod pause;
14+
mod read_task;
1115
mod set;
16+
mod shared;
1217

1318
use crate::curl_ng::easy::{AsRawEasyMut as _, Share};
1419
use crate::r#async::handler::AsyncHandler;
1520
use crate::request::{create_easy, AsCallbackMut as _};
1621
use crate::url::concat_url;
1722

23+
type Easy = crate::request::BoxEasyHandle<handler::AsyncHandler>;
24+
1825
pub struct CurlMultiClientInner {
1926
options: nyquest_interface::client::ClientOptions,
2027
loop_manager: r#loop::LoopManager,
@@ -123,16 +130,40 @@ impl nyquest_interface::r#async::AsyncClient for CurlMultiClient {
123130
&self,
124131
req: nyquest_interface::r#async::Request,
125132
) -> nyquest_interface::Result<Self::Response> {
126-
let req = {
133+
let (req, read_task_collection) = {
127134
let mut easy = create_easy(AsyncHandler::default(), &self.inner.share)?;
128135
let raw = easy.as_mut().as_raw_easy_mut().raw();
129136
easy.as_callback_mut().pause = Some(pause::EasyPause::new(raw));
130137
// FIXME: properly concat base_url and url
131138
let url = concat_url(self.inner.options.base_url.as_deref(), &req.relative_uri);
132-
crate::request::populate_request(&url, req, &self.inner.options, easy.as_mut())?;
133-
self.inner.loop_manager.start_request(easy).await?
139+
let req_ctx = easy.as_callback_mut().ctx.clone();
140+
let read_task_collection = RefCell::new(read_task::ReadTaskCollection::new(req_ctx));
141+
crate::request::populate_request(
142+
&url,
143+
req,
144+
&self.inner.options,
145+
easy.as_mut(),
146+
|easy, stream| {
147+
read_task_collection
148+
.borrow_mut()
149+
.add_in_handler(easy, stream)
150+
},
151+
|stream| {
152+
read_task_collection
153+
.borrow_mut()
154+
.add_mime_part_reader(stream)
155+
},
156+
)?;
157+
let req = self.inner.loop_manager.start_request(easy).await?;
158+
(req, read_task_collection.into_inner())
159+
};
160+
let res_task = pin!(req.wait_for_response());
161+
let read_task_collection = pin!(read_task_collection.execute(&self.inner.loop_manager));
162+
let mut res = match select(res_task, read_task_collection).await {
163+
Either::Left((res, _)) => res?,
164+
Either::Right((Err(e), _)) => return Err(e),
165+
Either::Right((Ok(_), _)) => unreachable!(),
134166
};
135-
let mut res = req.wait_for_response().await?;
136167
res.max_response_buffer_size = self.inner.options.max_response_buffer_size;
137168
Ok(res)
138169
}

backends/curl/src/async/handler.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,18 @@ use std::sync::Arc;
33
use curl::easy::WriteError;
44

55
use super::pause::EasyPause;
6-
use super::r#loop::SharedRequestContext;
6+
use super::shared::SharedRequestStates;
77
use crate::curl_ng::easy::EasyCallback;
88

99
#[derive(Default)]
1010
pub(super) struct AsyncHandler {
11-
// To be filled in the loop
12-
pub(super) ctx: Arc<SharedRequestContext>,
11+
pub(super) ctx: Arc<SharedRequestStates>,
1312
// To be filled after Easy2 is constructed
1413
pub(super) pause: Option<EasyPause>,
1514
}
1615

1716
struct AsyncHandlerRef<'a> {
18-
ctx: &'a SharedRequestContext,
17+
ctx: &'a SharedRequestStates,
1918
pause: &'a mut EasyPause,
2019
}
2120

@@ -63,11 +62,20 @@ impl EasyCallback for AsyncHandler {
6362
true
6463
}
6564

66-
fn read(&mut self, _buf: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
67-
unimplemented!()
65+
fn read(&mut self, buf: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
66+
let mut state = self.ctx.state.lock().unwrap();
67+
let stream = state
68+
.req_streams
69+
.get_mut(0)
70+
.ok_or(curl::easy::ReadError::Abort)?;
71+
stream.read(buf, &self.ctx)
6872
}
6973

70-
fn seek(&mut self, _whence: std::io::SeekFrom) -> curl::easy::SeekResult {
71-
unimplemented!()
74+
fn seek(&mut self, whence: std::io::SeekFrom) -> curl::easy::SeekResult {
75+
let mut state = self.ctx.state.lock().unwrap();
76+
let Some(stream) = state.req_streams.get_mut(0) else {
77+
return curl::easy::SeekResult::Fail;
78+
};
79+
stream.seek(whence, &self.ctx)
7280
}
7381
}

0 commit comments

Comments
 (0)