Skip to content

Commit 1d06ac9

Browse files
committed
fix: wip
1 parent f5ea489 commit 1d06ac9

File tree

1 file changed

+89
-23
lines changed

1 file changed

+89
-23
lines changed

src/api/proxy.rs

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,55 @@
1-
use futures_util::stream::{Stream, TryStream};
2-
use futures_util::TryStreamExt;
1+
use futures_util::stream::{MapOk, Stream, TryBufferUnordered, TryStream};
2+
use futures_util::{FutureExt, StreamExt, TryStreamExt};
33
use ppp::error::ParseError;
44
use ppp::model::{Addresses, Header};
55
use std::future::Future;
66
use std::io::{Cursor, IoSlice, Write};
7+
use std::marker::PhantomData;
78
use std::mem::MaybeUninit;
89
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
910
use std::pin::Pin;
1011
use std::task::{Context, Poll};
1112
use tokio::io::{AsyncRead, AsyncWrite, Error as IoError, ErrorKind, ReadBuf, Result as IoResult};
12-
use tokio::net::TcpStream;
13+
use tokio::net::{TcpListener, TcpStream};
14+
use tokio_stream::wrappers::TcpListenerStream;
1315
use tracing::field::{display, Empty};
1416
use tracing::{debug_span, error, info, Instrument, Span};
1517

1618
use crate::config::ProxyProtocol;
19+
use tracing::instrument::Instrumented;
20+
21+
struct ProxyListener {
22+
listener: TcpListenerStream,
23+
proxy: ProxyProtocol,
24+
}
25+
26+
impl Stream for ProxyListener {
27+
type Item = IoResult<ProxyStream>;
28+
29+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30+
let stream = match self.listener.poll_next_unpin(cx) {
31+
Poll::Pending => return Poll::Pending,
32+
Poll::Ready(None) => return Poll::Ready(None),
33+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
34+
Poll::Ready(Some(Ok(stream))) => stream,
35+
};
36+
37+
let data = match self.proxy {
38+
ProxyProtocol::Enabled => Some(Default::default()),
39+
ProxyProtocol::Disabled => None,
40+
};
41+
42+
Poll::Ready(Some(Ok(ProxyStream {
43+
start_of_data: 0,
44+
stream,
45+
data,
46+
})))
47+
}
48+
49+
fn size_hint(&self) -> (usize, Option<usize>) {
50+
self.listener.size_hint()
51+
}
52+
}
1753

1854
// wrap tcplistener instead of tcpstream
1955

@@ -35,30 +71,60 @@ impl ToProxyStream for TcpStream {
3571
}
3672
}
3773

38-
pub(super) fn wrap<S, E>(
39-
stream: S,
40-
) -> impl Stream<Item = Result<impl Future<Output = Result<impl AsyncRead + AsyncWrite, E>>, E>>
41-
where
42-
S: TryStream<Ok = ProxyStream, Error = E>,
43-
{
44-
stream.map_ok(|mut conn| {
45-
let span = debug_span!("ADDR", remote.addr = Empty);
46-
async move {
47-
let span = Span::current();
48-
match conn.proxy_peer().await {
49-
Ok(addr) => {
50-
span.record("remote.addr", &display(addr));
51-
info!("Got addr {}", addr)
52-
}
53-
Err(e) => {
54-
span.record("remote.addr", &"Unknown");
55-
error!("Could net get remote.addr: {}", e);
56-
}
74+
struct ProxyStreamFuture<E> {
75+
stream: Option<ProxyStream>,
76+
phantom: PhantomData<E>,
77+
}
78+
79+
type Wrap<E> = fn(conn: ProxyStream) -> Instrumented<ProxyStreamFuture<E>>;
80+
81+
impl<E: Unpin> Future for ProxyStreamFuture<E> {
82+
type Output = Result<ProxyStream, E>;
83+
84+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85+
let this = self.get_mut();
86+
let span = Span::current();
87+
let stream = this
88+
.stream
89+
.as_mut()
90+
.map(ProxyStream::proxy_peer)
91+
.map(move |mut fut| fut.poll_unpin(cx));
92+
match stream {
93+
None => unreachable!("Future cannot be pulled anymore"),
94+
Some(Poll::Pending) => return Poll::Pending,
95+
Some(Poll::Ready(Ok(addr))) => {
96+
span.record("remote.addr", &display(addr));
97+
info!("Got addr {}", addr)
5798
}
58-
Ok(conn)
99+
Some(Poll::Ready(Err(e))) => {
100+
span.record("remote.addr", &"Unknown");
101+
error!("Could net get remote.addr: {}", e);
102+
}
103+
}
104+
105+
Poll::Ready(Ok(this
106+
.stream
107+
.take()
108+
.expect("Future cannot be pulled anymore")))
109+
}
110+
}
111+
112+
pub(super) fn wrap(
113+
listener: TcpListener,
114+
) -> TryBufferUnordered<MapOk<ProxyListener, Wrap<IoError>>> {
115+
ProxyListener {
116+
listener: TcpListenerStream::new(listener),
117+
proxy: ProxyProtocol::Enabled,
118+
}
119+
.map_ok(|mut stream| {
120+
let span = debug_span!("test");
121+
ProxyStreamFuture {
122+
stream: Some(stream),
123+
phantom: PhantomData,
59124
}
60125
.instrument(span)
61126
})
127+
.try_buffer_unordered(100)
62128
}
63129

64130
struct PeerAddrFuture<'a> {

0 commit comments

Comments
 (0)