Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Commit 697fcb4

Browse files
committed
Reuse socket
1 parent 7c94adf commit 697fcb4

File tree

1 file changed

+73
-41
lines changed

1 file changed

+73
-41
lines changed

src/simple_http.rs

Lines changed: 73 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
55
#[cfg(feature = "proxy")]
66
use socks::Socks5Stream;
7-
use std::io::{BufRead, BufReader, Write};
8-
#[cfg(not(feature = "proxy"))]
7+
use std::io::{BufRead, BufReader, Read, Write};
98
use std::net::TcpStream;
109
use std::net::{SocketAddr, ToSocketAddrs};
10+
use std::sync::{Arc, Mutex};
1111
use std::time::{Duration, Instant};
1212
use std::{error, fmt, io, net, thread};
1313

@@ -38,6 +38,7 @@ pub struct SimpleHttpTransport {
3838
proxy_addr: net::SocketAddr,
3939
#[cfg(feature = "proxy")]
4040
proxy_auth: Option<(String, String)>,
41+
sock: Arc<Mutex<Option<TcpStream>>>,
4142
}
4243

4344
impl Default for SimpleHttpTransport {
@@ -57,6 +58,7 @@ impl Default for SimpleHttpTransport {
5758
),
5859
#[cfg(feature = "proxy")]
5960
proxy_auth: None,
61+
sock: Arc::new(Mutex::new(None)),
6062
}
6163
}
6264
}
@@ -73,29 +75,56 @@ impl SimpleHttpTransport {
7375
}
7476

7577
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
78+
where
79+
R: for<'a> serde::de::Deserialize<'a>,
80+
{
81+
let mut sock = self.sock.lock().unwrap();
82+
match self.try_request(req, &mut sock) {
83+
Ok(response) => Ok(response),
84+
Err(err) => {
85+
*sock = None;
86+
Err(err)
87+
}
88+
}
89+
}
90+
91+
fn try_request<R>(
92+
&self,
93+
req: impl serde::Serialize,
94+
sock: &mut Option<TcpStream>,
95+
) -> Result<R, Error>
7696
where
7797
R: for<'a> serde::de::Deserialize<'a>,
7898
{
7999
// Open connection
80100
let request_deadline = Instant::now() + self.timeout;
81-
#[cfg(feature = "proxy")]
82-
let mut sock = if let Some((username, password)) = &self.proxy_auth {
83-
Socks5Stream::connect_with_password(
84-
self.proxy_addr,
85-
self.addr,
86-
username.as_str(),
87-
password.as_str(),
88-
)?
89-
.into_inner()
90-
} else {
91-
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
92-
};
93-
94-
#[cfg(not(feature = "proxy"))]
95-
let mut sock = TcpStream::connect_timeout(&self.addr, self.timeout)?;
101+
if sock.is_none() {
102+
*sock = Some({
103+
#[cfg(feature = "proxy")]
104+
{
105+
if let Some((username, password)) = &self.proxy_auth {
106+
Socks5Stream::connect_with_password(
107+
self.proxy_addr,
108+
self.addr,
109+
username.as_str(),
110+
password.as_str(),
111+
)?
112+
.into_inner()
113+
} else {
114+
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
115+
}
116+
}
96117

97-
sock.set_read_timeout(Some(self.timeout))?;
98-
sock.set_write_timeout(Some(self.timeout))?;
118+
#[cfg(not(feature = "proxy"))]
119+
{
120+
let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?;
121+
stream.set_read_timeout(Some(self.timeout))?;
122+
stream.set_write_timeout(Some(self.timeout))?;
123+
stream
124+
}
125+
})
126+
};
127+
let sock = sock.as_mut().unwrap();
99128

100129
// Serialize the body first so we can set the Content-Length header.
101130
let body = serde_json::to_vec(&req)?;
@@ -105,7 +134,6 @@ impl SimpleHttpTransport {
105134
sock.write_all(self.path.as_bytes())?;
106135
sock.write_all(b" HTTP/1.1\r\n")?;
107136
// Write headers
108-
sock.write_all(b"Connection: Close\r\n")?;
109137
sock.write_all(b"Content-Type: application/json\r\n")?;
110138
sock.write_all(b"Content-Length: ")?;
111139
sock.write_all(body.len().to_string().as_bytes())?;
@@ -133,18 +161,39 @@ impl SimpleHttpTransport {
133161
Err(_) => return Err(Error::HttpParseError),
134162
};
135163

136-
// Skip response header fields
137-
while get_line(&mut reader, request_deadline)? != "\r\n" {}
164+
// Parse response header fields
165+
let mut content_length = None;
166+
loop {
167+
let line = get_line(&mut reader, request_deadline)?;
168+
169+
if line == "\r\n" {
170+
break;
171+
}
172+
173+
const CONTENT_LENGTH: &str = "content-length: ";
174+
if line.to_lowercase().starts_with(CONTENT_LENGTH) {
175+
content_length = Some(
176+
line[CONTENT_LENGTH.len()..]
177+
.trim()
178+
.parse::<usize>()
179+
.map_err(|_| Error::HttpParseError)?,
180+
);
181+
}
182+
}
138183

139184
if response_code == 401 {
140185
// There is no body in a 401 response, so don't try to read it
141186
return Err(Error::HttpErrorCode(response_code));
142187
}
143188

189+
let content_length = content_length.ok_or(Error::HttpParseError)?;
190+
191+
let mut buffer = vec![0; content_length];
192+
144193
// Even if it's != 200, we parse the response as we may get a JSONRPC error instead
145194
// of the less meaningful HTTP error code.
146-
let resp_body = get_lines(&mut reader)?;
147-
match serde_json::from_str(&resp_body) {
195+
reader.read_exact(&mut buffer)?;
196+
match serde_json::from_slice(&buffer) {
148197
Ok(s) => Ok(s),
149198
Err(e) => {
150199
if response_code != 200 {
@@ -261,23 +310,6 @@ fn get_line<R: BufRead>(reader: &mut R, deadline: Instant) -> Result<String, Err
261310
Err(Error::Timeout)
262311
}
263312

264-
/// Read all lines from a buffered reader.
265-
fn get_lines<R: BufRead>(reader: &mut R) -> Result<String, Error> {
266-
let mut body: String = String::new();
267-
268-
for line in reader.lines() {
269-
match line {
270-
Ok(l) => body.push_str(&l),
271-
// io error occurred, abort
272-
Err(e) => return Err(Error::SocketError(e)),
273-
}
274-
}
275-
// remove whitespace
276-
body.retain(|c| !c.is_whitespace());
277-
278-
Ok(body)
279-
}
280-
281313
/// Do some very basic manual URL parsing because the uri/url crates
282314
/// all have unicode-normalization as a dependency and that's broken.
283315
fn check_url(url: &str) -> Result<(SocketAddr, String), Error> {

0 commit comments

Comments
 (0)