Skip to content

Commit be91af5

Browse files
committed
fix: refactor
1 parent 0e2a7be commit be91af5

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

src/api/mod.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use anyhow::Result;
1+
use anyhow::{Error, Result};
22
use futures_util::future::{Future, OptionFuture};
33
use futures_util::stream::Stream;
4-
use futures_util::{FutureExt, StreamExt};
4+
use futures_util::{FutureExt, StreamExt, TryFutureExt};
55
use hyper::server::conn::Http;
6+
use lazy_static::lazy_static;
67
use metrics::{metrics, metrics_wrapper};
8+
use prometheus::{register_int_counter_vec, register_int_gauge_vec, IntCounterVec, IntGaugeVec};
79
use sqlx::PgPool;
810
use std::fmt::Display;
911
use std::sync::Arc;
@@ -20,12 +22,27 @@ mod proxy;
2022
mod routes;
2123
mod tls;
2224

23-
async fn serve<I, S, T, E, R>(mut io: I, routes: R)
25+
lazy_static! {
26+
static ref TCP_TOTAL_CONNECTION_COUNTER: IntCounterVec = register_int_counter_vec!(
27+
"tcp_total_connection_counter",
28+
"Sum of TCP Connections",
29+
&["endpoint"]
30+
)
31+
.unwrap();
32+
static ref TCP_OPEN_CONNECTION_COUNTER: IntGaugeVec = register_int_gauge_vec!(
33+
"tcp_open_connection_counter",
34+
"Amount of currently open TCP Connections",
35+
&["endpoint"]
36+
)
37+
.unwrap();
38+
}
39+
40+
async fn serve<I, S, T, E, R>(mut io: I, routes: R, endpoint: &str)
2441
where
2542
I: Stream<Item = Result<S, E>> + Unpin + Send,
2643
S: Future<Output = Result<T, E>> + Send + 'static,
2744
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
28-
E: Display,
45+
E: Into<Error> + Display + Send,
2946
R: Filter<Error = Rejection> + Clone + Send + 'static,
3047
R::Extract: Reply,
3148
{
@@ -35,28 +52,28 @@ where
3552
loop {
3653
let span = info_span!("conn", remote.addr = Empty, remote.real = Empty);
3754
let conn = match io.next().instrument(span.clone()).await {
38-
Some(Ok(conn)) => conn,
55+
Some(Ok(conn)) => conn.err_into(),
3956
Some(Err(err)) => {
4057
span.in_scope(|| error!("{}", err));
4158
continue;
4259
}
4360
None => break,
4461
};
4562

63+
TCP_TOTAL_CONNECTION_COUNTER.with_label_values(&[endpoint]).inc();
64+
let open_counter = TCP_OPEN_CONNECTION_COUNTER.with_label_values(&[endpoint]);
65+
open_counter.inc();
66+
4667
let http = http.clone();
4768
let service = service.clone();
4869

4970
tokio::spawn(
5071
async move {
51-
let conn = match conn.await {
52-
Ok(conn) => conn,
53-
Err(err) => {
54-
error!("{}", err);
55-
return;
56-
}
57-
};
58-
http.serve_connection(conn, service).await;
72+
let conn = conn.await?;
73+
Ok(http.serve_connection(conn, service).await?)
5974
}
75+
.inspect_err(|err: &Error| error!("{}", err))
76+
.inspect(move |_| open_counter.dec())
6077
.instrument(span),
6178
);
6279
}
@@ -78,18 +95,18 @@ pub async fn new(
7895

7996
let http = http
8097
.map(move |http| proxy::wrap(http, http_proxy))
81-
.map(|http| serve(http, routes.clone()).instrument(info_span!("HTTP")))
98+
.map(|http| serve(http, routes.clone(), "HTTP").instrument(info_span!("HTTP")))
8299
.map(tokio::spawn);
83100

84101
let prom = prom
85102
.map(move |prom| proxy::wrap(prom, prom_proxy))
86-
.map(|prom| serve(prom, metrics()).instrument(info_span!("PROM")))
103+
.map(|prom| serve(prom, metrics(), "PROM").instrument(info_span!("PROM")))
87104
.map(tokio::spawn);
88105

89106
let https = https
90107
.map(move |https| proxy::wrap(https, https_proxy))
91108
.map(|https| tls::wrap(https, pool))
92-
.map(|https| serve(https, routes).instrument(info_span!("HTTPS")))
109+
.map(|https| serve(https, routes, "HTTPS").instrument(info_span!("HTTPS")))
93110
.map(tokio::spawn);
94111

95112
info!("Starting API");

src/api/proxy.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ pub(super) fn wrap(
2424
.map_ok(move |stream| {
2525
let span = Span::current();
2626
span.record("remote.addr", &debug(stream.peer_addr()));
27-
(stream.source(proxy), span)
28-
})
29-
.map_ok(|(mut conn, span)| {
3027
let span_clone = span.clone();
28+
29+
let mut conn = stream.source(proxy);
3130
async move {
3231
match conn.proxy_peer().await {
3332
Ok(Some(addr)) => {

0 commit comments

Comments
 (0)