1
1
use anyhow:: { anyhow, Error , Result } ;
2
2
use futures_util:: stream:: { repeat, Stream } ;
3
- use futures_util:: { StreamExt , TryStreamExt } ;
3
+ use futures_util:: { StreamExt , TryStreamExt , TryFutureExt } ;
4
4
use parking_lot:: RwLock ;
5
5
use rustls:: internal:: pemfile:: { certs, pkcs8_private_keys} ;
6
6
use rustls:: { NoClientAuth , ServerConfig } ;
7
7
use sqlx:: PgPool ;
8
+ use std:: future:: Future ;
8
9
use std:: sync:: Arc ;
9
10
use tokio:: io:: { AsyncRead , AsyncWrite , Result as IoResult } ;
10
11
use tokio_rustls:: TlsAcceptor ;
@@ -14,10 +15,13 @@ use super::proxy::ProxyStream;
14
15
use crate :: cert:: { Cert , CertFacade } ;
15
16
use crate :: util:: to_u64;
16
17
17
- pub ( super ) fn wrap (
18
- listener : impl Stream < Item = IoResult < ProxyStream > > + Send ,
18
+ pub ( super ) fn wrap < L , I > (
19
+ listener : L ,
19
20
pool : PgPool ,
20
21
) -> impl Stream < Item = Result < impl AsyncRead + AsyncWrite + Send + Unpin + ' static , Error > > + Send
22
+ where
23
+ L : Stream < Item = IoResult < I > > + Send ,
24
+ I : Future < Output = IoResult < ProxyStream > > + Send ,
21
25
{
22
26
let acceptor = Acceptor :: new ( pool) ;
23
27
@@ -26,13 +30,12 @@ pub(super) fn wrap(
26
30
. zip ( repeat ( acceptor) )
27
31
. map ( |( conn, acceptor) | conn. map ( |c| ( c, acceptor) ) )
28
32
. map_ok ( |( conn, acceptor) | async move {
29
- let tls = acceptor. load_cert ( ) . await ?;
33
+ let ( conn , tls) = tokio :: try_join! ( conn . err_into ( ) , acceptor. load_cert( ) ) ?;
30
34
Ok ( tls. accept ( conn) . await ?)
31
35
} )
32
36
. try_buffer_unordered ( 100 )
33
37
. inspect_err ( |err| error ! ( "Stream error: {:?}" , err) )
34
38
. filter ( |stream| futures_util:: future:: ready ( stream. is_ok ( ) ) )
35
- . into_stream ( )
36
39
}
37
40
38
41
struct Acceptor {
0 commit comments