Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Commit 1826d5e

Browse files
committed
Reuse HTTP connection
1 parent 7c94adf commit 1826d5e

File tree

1 file changed

+75
-41
lines changed

1 file changed

+75
-41
lines changed

src/simple_http.rs

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
55
#[cfg(feature = "proxy")]
66
use socks::Socks5Stream;
7-
use std::io::{BufRead, BufReader, Write};
8-
#[cfg(not(feature = "proxy"))]
7+
use std::io::{BufRead, BufReader, Read, Write};
98
use std::net::TcpStream;
109
use std::net::{SocketAddr, ToSocketAddrs};
10+
use std::sync::{Arc, Mutex};
1111
use std::time::{Duration, Instant};
1212
use std::{error, fmt, io, net, thread};
1313

@@ -38,6 +38,7 @@ pub struct SimpleHttpTransport {
3838
proxy_addr: net::SocketAddr,
3939
#[cfg(feature = "proxy")]
4040
proxy_auth: Option<(String, String)>,
41+
sock: Arc<Mutex<Option<TcpStream>>>,
4142
}
4243

4344
impl Default for SimpleHttpTransport {
@@ -57,6 +58,7 @@ impl Default for SimpleHttpTransport {
5758
),
5859
#[cfg(feature = "proxy")]
5960
proxy_auth: None,
61+
sock: Arc::new(Mutex::new(None)),
6062
}
6163
}
6264
}
@@ -73,29 +75,58 @@ impl SimpleHttpTransport {
7375
}
7476

7577
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
78+
where
79+
R: for<'a> serde::de::Deserialize<'a>,
80+
{
81+
// `try_request` should not panic, so the mutex shouldn't be poisoned
82+
// and unwrapping should be safe
83+
let mut sock = self.sock.lock().expect("poisoned mutex");
84+
match self.try_request(req, &mut sock) {
85+
Ok(response) => Ok(response),
86+
Err(err) => {
87+
*sock = None;
88+
Err(err)
89+
}
90+
}
91+
}
92+
93+
fn try_request<R>(
94+
&self,
95+
req: impl serde::Serialize,
96+
sock: &mut Option<TcpStream>,
97+
) -> Result<R, Error>
7698
where
7799
R: for<'a> serde::de::Deserialize<'a>,
78100
{
79101
// Open connection
80102
let request_deadline = Instant::now() + self.timeout;
81-
#[cfg(feature = "proxy")]
82-
let mut sock = if let Some((username, password)) = &self.proxy_auth {
83-
Socks5Stream::connect_with_password(
84-
self.proxy_addr,
85-
self.addr,
86-
username.as_str(),
87-
password.as_str(),
88-
)?
89-
.into_inner()
90-
} else {
91-
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
92-
};
93-
94-
#[cfg(not(feature = "proxy"))]
95-
let mut sock = TcpStream::connect_timeout(&self.addr, self.timeout)?;
103+
if sock.is_none() {
104+
*sock = Some({
105+
#[cfg(feature = "proxy")]
106+
{
107+
if let Some((username, password)) = &self.proxy_auth {
108+
Socks5Stream::connect_with_password(
109+
self.proxy_addr,
110+
self.addr,
111+
username.as_str(),
112+
password.as_str(),
113+
)?
114+
.into_inner()
115+
} else {
116+
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
117+
}
118+
}
96119

97-
sock.set_read_timeout(Some(self.timeout))?;
98-
sock.set_write_timeout(Some(self.timeout))?;
120+
#[cfg(not(feature = "proxy"))]
121+
{
122+
let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?;
123+
stream.set_read_timeout(Some(self.timeout))?;
124+
stream.set_write_timeout(Some(self.timeout))?;
125+
stream
126+
}
127+
})
128+
};
129+
let sock = sock.as_mut().unwrap();
99130

100131
// Serialize the body first so we can set the Content-Length header.
101132
let body = serde_json::to_vec(&req)?;
@@ -105,7 +136,6 @@ impl SimpleHttpTransport {
105136
sock.write_all(self.path.as_bytes())?;
106137
sock.write_all(b" HTTP/1.1\r\n")?;
107138
// Write headers
108-
sock.write_all(b"Connection: Close\r\n")?;
109139
sock.write_all(b"Content-Type: application/json\r\n")?;
110140
sock.write_all(b"Content-Length: ")?;
111141
sock.write_all(body.len().to_string().as_bytes())?;
@@ -133,18 +163,39 @@ impl SimpleHttpTransport {
133163
Err(_) => return Err(Error::HttpParseError),
134164
};
135165

136-
// Skip response header fields
137-
while get_line(&mut reader, request_deadline)? != "\r\n" {}
166+
// Parse response header fields
167+
let mut content_length = None;
168+
loop {
169+
let line = get_line(&mut reader, request_deadline)?;
170+
171+
if line == "\r\n" {
172+
break;
173+
}
174+
175+
const CONTENT_LENGTH: &str = "content-length: ";
176+
if line.to_lowercase().starts_with(CONTENT_LENGTH) {
177+
content_length = Some(
178+
line[CONTENT_LENGTH.len()..]
179+
.trim()
180+
.parse::<usize>()
181+
.map_err(|_| Error::HttpParseError)?,
182+
);
183+
}
184+
}
138185

139186
if response_code == 401 {
140187
// There is no body in a 401 response, so don't try to read it
141188
return Err(Error::HttpErrorCode(response_code));
142189
}
143190

191+
let content_length = content_length.ok_or(Error::HttpParseError)?;
192+
193+
let mut buffer = vec![0; content_length];
194+
144195
// Even if it's != 200, we parse the response as we may get a JSONRPC error instead
145196
// of the less meaningful HTTP error code.
146-
let resp_body = get_lines(&mut reader)?;
147-
match serde_json::from_str(&resp_body) {
197+
reader.read_exact(&mut buffer)?;
198+
match serde_json::from_slice(&buffer) {
148199
Ok(s) => Ok(s),
149200
Err(e) => {
150201
if response_code != 200 {
@@ -261,23 +312,6 @@ fn get_line<R: BufRead>(reader: &mut R, deadline: Instant) -> Result<String, Err
261312
Err(Error::Timeout)
262313
}
263314

264-
/// Read all lines from a buffered reader.
265-
fn get_lines<R: BufRead>(reader: &mut R) -> Result<String, Error> {
266-
let mut body: String = String::new();
267-
268-
for line in reader.lines() {
269-
match line {
270-
Ok(l) => body.push_str(&l),
271-
// io error occurred, abort
272-
Err(e) => return Err(Error::SocketError(e)),
273-
}
274-
}
275-
// remove whitespace
276-
body.retain(|c| !c.is_whitespace());
277-
278-
Ok(body)
279-
}
280-
281315
/// Do some very basic manual URL parsing because the uri/url crates
282316
/// all have unicode-normalization as a dependency and that's broken.
283317
fn check_url(url: &str) -> Result<(SocketAddr, String), Error> {

0 commit comments

Comments
 (0)