Skip to content

Commit 94413f5

Browse files
committed
fix: wip
1 parent 9edb842 commit 94413f5

File tree

3 files changed

+39
-22
lines changed

3 files changed

+39
-22
lines changed

src/api/mod.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
use anyhow::{Result, Error};
2-
use futures_util::future::{OptionFuture, Future};
1+
use anyhow::Result;
2+
use futures_util::future::{Future, OptionFuture};
33
use futures_util::stream::Stream;
4-
use futures_util::{FutureExt, StreamExt, TryStreamExt};
4+
use futures_util::{FutureExt, StreamExt};
55
use hyper::server::conn::Http;
66
use metrics::{metrics, metrics_wrapper};
77
use sqlx::PgPool;
88
use std::sync::Arc;
9-
use tokio::io::{AsyncRead, AsyncWrite, Result as IoResult};
9+
use tokio::io::{AsyncRead, AsyncWrite};
1010
use tokio::net::TcpListener;
11-
use tracing::{info, error, info_span, Instrument};
11+
use tracing::{error, info, info_span, Instrument};
1212
use warp::{Filter, Rejection, Reply};
13-
use std::error::Error as StdError;
1413

1514
use crate::config::Listener;
15+
use std::fmt::Display;
1616

1717
mod metrics;
1818
mod proxy;
@@ -24,31 +24,40 @@ where
2424
I: Stream<Item = Result<S, E>> + Unpin + Send,
2525
S: Future<Output = Result<T, E>> + Send + 'static,
2626
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
27-
E: Into<Error>,
27+
E: Display,
2828
R: Filter<Error = Rejection> + Clone + Send + 'static,
2929
R::Extract: Reply,
3030
{
3131
let service = warp::service(routes);
3232
let http = Arc::new(Http::new());
3333

3434
loop {
35-
let span = info_span!("test");
35+
let span = info_span!("TCP");
3636
let conn = match io.next().instrument(span.clone()).await {
3737
Some(Ok(conn)) => conn,
38-
Some(Err(e)) => {
39-
span.in_scope(|| error!("{}", e));
38+
Some(Err(err)) => {
39+
span.in_scope(|| error!("{}", err));
4040
continue;
41-
},
41+
}
4242
None => break,
4343
};
4444

4545
let http = http.clone();
4646
let service = service.clone();
4747

48-
tokio::spawn(async move {
49-
let conn = conn.await.unwrap();
50-
http.serve_connection(conn, service).await
51-
}.instrument(span));
48+
tokio::spawn(
49+
async move {
50+
let conn = match conn.await {
51+
Ok(conn) => conn,
52+
Err(err) => {
53+
error!("{}", err);
54+
return;
55+
}
56+
};
57+
http.serve_connection(conn, service).await;
58+
}
59+
.instrument(span),
60+
);
5261
}
5362
}
5463

@@ -68,18 +77,18 @@ pub async fn new(
6877

6978
let http = http
7079
.map(move |http| proxy::wrap(http, http_proxy))
71-
.map(|http| serve(http, routes.clone()))
80+
.map(|http| serve(http, routes.clone()).instrument(info_span!("HTTP")))
7281
.map(tokio::spawn);
7382

7483
let prom = prom
7584
.map(move |prom| proxy::wrap(prom, prom_proxy))
76-
.map(|prom| serve(prom, metrics()))
85+
.map(|prom| serve(prom, metrics()).instrument(info_span!("PROM")))
7786
.map(tokio::spawn);
7887

7988
let https = https
8089
.map(move |https| proxy::wrap(https, https_proxy))
8190
.map(|https| tls::wrap(https, pool))
82-
.map(|https| serve(https, routes))
91+
.map(|https| serve(https, routes).instrument(info_span!("HTTPS")))
8392
.map(tokio::spawn);
8493

8594
info!("Starting API");

src/api/proxy.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio::net::{TcpListener, TcpStream};
1212
use tokio_stream::wrappers::TcpListenerStream;
1313
use tokio_util::io::poll_read_buf;
1414
use tracing::field::{display, Empty};
15-
use tracing::{debug_span, error, info, Instrument};
15+
use tracing::{debug_span, error, info, Instrument, Span};
1616

1717
use crate::config::ProxyProtocol;
1818

@@ -21,7 +21,11 @@ pub(super) fn wrap(
2121
proxy: ProxyProtocol,
2222
) -> impl Stream<Item = IoResult<impl Future<Output = IoResult<ProxyStream>>>> + Send {
2323
TcpListenerStream::new(listener)
24-
.map_ok(move |stream| stream.source(proxy))
24+
.map_ok(move |stream| {
25+
let span = Span::current();
26+
span.record_all()
27+
stream.source(proxy)
28+
})
2529
.map_ok(|mut conn| {
2630
let span = debug_span!("ADDR", remote.addr = Empty);
2731
let span_clone = span.clone();

src/api/tls.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{anyhow, Error, Result};
1+
use anyhow::{anyhow, Result};
22
use futures_util::stream::{repeat, Stream};
33
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
44
use parking_lot::RwLock;
@@ -18,7 +18,11 @@ use crate::util::to_u64;
1818
pub(super) fn wrap<L, I>(
1919
listener: L,
2020
pool: PgPool,
21-
) -> impl Stream<Item = Result<impl Future<Output = Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static>>>> + Send
21+
) -> impl Stream<
22+
Item = Result<
23+
impl Future<Output = Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static>>,
24+
>,
25+
> + Send
2226
where
2327
L: Stream<Item = IoResult<I>> + Send,
2428
I: Future<Output = IoResult<ProxyStream>> + Send,

0 commit comments

Comments
 (0)