1
1
use anyhow:: { anyhow, Error , Result } ;
2
2
use futures_util:: future:: OptionFuture ;
3
- use futures_util:: stream:: { repeat, TryStream } ;
3
+ use futures_util:: stream:: { repeat, Stream } ;
4
4
use futures_util:: FutureExt ;
5
5
use futures_util:: { StreamExt , TryStreamExt } ;
6
6
use parking_lot:: RwLock ;
7
+ use prometheus:: proto:: MetricFamily ;
8
+ use prometheus:: { Encoder , TextEncoder } ;
7
9
use rustls:: internal:: pemfile:: { certs, pkcs8_private_keys} ;
8
10
use rustls:: { NoClientAuth , ServerConfig } ;
9
11
use sqlx:: PgPool ;
10
12
use std:: sync:: Arc ;
11
13
use tokio:: io:: { AsyncRead , AsyncWrite } ;
12
14
use tokio:: net:: { TcpListener , ToSocketAddrs } ;
13
15
use tokio_rustls:: TlsAcceptor ;
14
- use tracing:: { error, info} ;
16
+ use tracing:: { debug_span , error, info} ;
15
17
use tracing_futures:: Instrument ;
16
- use warp:: { http:: Response , reply , serve, Filter , Rejection , Reply } ;
18
+ use warp:: { http:: Response , serve, Filter , Rejection , Reply } ;
17
19
18
20
use crate :: cert:: { Cert , CertFacade } ;
19
21
use crate :: domain:: { Domain , DomainFacade } ;
22
+ use crate :: util:: to_u64;
20
23
21
24
struct Acceptor {
22
25
pool : PgPool ,
@@ -56,12 +59,16 @@ impl Acceptor {
56
59
}
57
60
58
61
async fn load_cert ( & self ) -> Result < TlsAcceptor > {
59
- let new_cert = CertFacade :: first_cert ( & self . pool ) . await ;
62
+ let new_cert = CertFacade :: first_cert ( & self . pool ) . in_current_span ( ) . await ;
60
63
61
64
let db_cert = match ( new_cert, & * self . config . read ( ) ) {
62
65
( Ok ( Some ( new_cert) ) , ( cert, _) ) if Some ( & new_cert) != cert. as_ref ( ) => new_cert,
63
- ( _, ( _, server_config) ) => return Ok ( TlsAcceptor :: from ( Arc :: clone ( server_config) ) ) ,
66
+ ( _, ( _, server_config) ) => {
67
+ info ! ( "Using existing TLS Config" ) ;
68
+ return Ok ( TlsAcceptor :: from ( Arc :: clone ( server_config) ) ) ;
69
+ }
64
70
} ;
71
+ info ! ( timestamp = to_u64( & db_cert. update) , "Found new cert" ) ;
65
72
66
73
let server_config = match Acceptor :: create_server_config ( & db_cert) {
67
74
Ok ( server_config) => server_config,
@@ -73,48 +80,50 @@ impl Acceptor {
73
80
} ;
74
81
75
82
* self . config . write ( ) = ( Some ( db_cert) , Arc :: clone ( & server_config) ) ;
83
+ info ! ( "Created new TLS config" ) ;
76
84
Ok ( TlsAcceptor :: from ( server_config) )
77
85
}
78
86
}
79
87
80
88
fn stream (
81
89
listener : TcpListener ,
82
90
pool : PgPool ,
83
- ) -> impl TryStream < Ok = impl AsyncRead + AsyncWrite + Send + Unpin + ' static , Error = Error > + Send
91
+ ) -> impl Stream < Item = Result < impl AsyncRead + AsyncWrite + Send + Unpin + ' static , Error > > + Send
84
92
{
85
93
let acceptor = Acceptor :: new ( pool) ;
86
94
87
95
listener
88
96
. zip ( repeat ( acceptor) )
89
97
. map ( |( conn, acceptor) | conn. map ( |c| ( c, acceptor) ) )
90
98
. err_into ( )
91
- . map_ok ( |( conn, acceptor) | async move {
92
- let tls = acceptor. load_cert ( ) . await ?;
93
- Ok ( tls. accept ( conn) . await ?)
99
+ . map_ok ( |( conn, acceptor) | {
100
+ let addr = conn. peer_addr ( ) ;
101
+ async move {
102
+ let tls = acceptor. load_cert ( ) . in_current_span ( ) . await ?;
103
+ Ok ( tls. accept ( conn) . in_current_span ( ) . await ?)
104
+ }
105
+ . instrument ( debug_span ! ( "TLS" , remote. addr = ?addr) )
94
106
} )
95
107
. try_buffer_unordered ( 100 )
96
108
. inspect_err ( |err| error ! ( "Stream error: {:?}" , err) )
97
109
. filter ( |stream| futures_util:: future:: ready ( stream. is_ok ( ) ) )
110
+ . into_stream ( )
98
111
}
99
112
100
113
pub struct Api {
101
114
http : Option < TcpListener > ,
102
115
https : Option < TcpListener > ,
116
+ prom : Option < TcpListener > ,
103
117
pool : PgPool ,
104
118
}
105
119
106
- #[ tracing:: instrument( skip( pool) ) ]
107
- async fn register ( pool : PgPool , domain : Domain ) -> Result < reply:: Response , Rejection > {
108
- let _domain = match DomainFacade :: create_domain ( & pool, & domain)
109
- . in_current_span ( )
110
- . await
111
- {
120
+ async fn register ( pool : PgPool , domain : Domain ) -> Result < impl Reply , Rejection > {
121
+ let _domain = match DomainFacade :: create_domain ( & pool, & domain) . await {
112
122
Err ( e) => {
113
123
error ! ( "{}" , e) ;
114
124
return Ok ( Response :: builder ( )
115
125
. status ( 500 )
116
126
. body ( e. to_string ( ) )
117
- . unwrap ( )
118
127
. into_response ( ) ) ;
119
128
}
120
129
Ok ( domain) => domain,
@@ -124,36 +133,67 @@ async fn register(pool: PgPool, domain: Domain) -> Result<reply::Response, Rejec
124
133
Ok ( Response :: new ( "no error" ) . into_response ( ) )
125
134
}
126
135
136
+ fn prom ( ) -> impl Reply {
137
+ let encoder = TextEncoder :: new ( ) ;
138
+ let family = MetricFamily :: new ( ) ;
139
+ let mut res = vec ! [ ] ;
140
+ if let Err ( e) = encoder. encode ( & [ family] , & mut res) {
141
+ error ! ( "{}" , e) ;
142
+ return Response :: builder ( )
143
+ . status ( 500 )
144
+ . body ( e. to_string ( ) )
145
+ . into_response ( ) ;
146
+ }
147
+
148
+ Response :: builder ( )
149
+ . header ( "Content-Type" , "text/plain" )
150
+ . body ( res)
151
+ . into_response ( )
152
+ }
153
+
127
154
impl Api {
128
155
pub async fn new < A : ToSocketAddrs > (
129
156
http : Option < A > ,
130
157
https : Option < A > ,
158
+ prom : Option < A > ,
131
159
pool : PgPool ,
132
160
) -> Result < Self > {
133
161
let http = OptionFuture :: from ( http. map ( TcpListener :: bind) ) . map ( Option :: transpose) ;
134
162
let https = OptionFuture :: from ( https. map ( TcpListener :: bind) ) . map ( Option :: transpose) ;
163
+ let prom = OptionFuture :: from ( prom. map ( TcpListener :: bind) ) . map ( Option :: transpose) ;
135
164
136
- let ( http, https) = tokio:: try_join!( http, https) ?;
165
+ let ( http, https, prom ) = tokio:: try_join!( http, https, prom ) ?;
137
166
138
- Ok ( Api { http, https, pool } )
167
+ Ok ( Api {
168
+ http,
169
+ https,
170
+ prom,
171
+ pool,
172
+ } )
139
173
}
140
174
141
- #[ tracing:: instrument( skip( self ) ) ]
175
+ #[ tracing:: instrument( name = "Api::spawn" , skip( self ) ) ]
142
176
pub async fn spawn ( self ) -> Result < ( ) > {
143
177
info ! ( "Starting API spawn" ) ;
178
+ let metrics = warp:: path ( "metrics" )
179
+ . and ( warp:: get ( ) )
180
+ . map ( prom)
181
+ . with ( warp:: trace:: request ( ) ) ;
144
182
145
183
let pool = self . pool . clone ( ) ;
146
184
let routes = warp:: path ( "register" )
147
185
. and ( warp:: post ( ) )
148
186
. map ( move || pool. clone ( ) )
149
187
. and ( warp:: body:: json ( ) )
150
- . and_then ( register) ;
188
+ . and_then ( register)
189
+ . with ( warp:: trace:: request ( ) ) ;
151
190
152
191
let http = self
153
192
. http
154
193
. map ( |http| {
155
- info ! ( ?http, "Starting http" ) ;
156
- http. in_current_span ( )
194
+ //info!(?http, "Starting http");
195
+ let addr = http. local_addr ( ) ;
196
+ http. instrument ( debug_span ! ( "HTTP" , local. addr = ?addr) )
157
197
} )
158
198
. map ( |http| serve ( routes. clone ( ) ) . serve_incoming ( http) )
159
199
. map ( tokio:: spawn) ;
@@ -162,17 +202,36 @@ impl Api {
162
202
let https = self
163
203
. https
164
204
. map ( |https| {
165
- info ! ( ?https, "Starting https" ) ;
166
- stream ( https, pool) . into_stream ( ) . in_current_span ( )
205
+ //info!(?https, "Starting https");
206
+ let addr = https. local_addr ( ) ;
207
+ stream ( https, pool) . instrument ( debug_span ! ( "HTTPS" , local. addr = ?addr) )
167
208
} )
168
209
. map ( |https| serve ( routes) . serve_incoming ( https) )
169
210
. map ( tokio:: spawn) ;
170
211
171
- match ( https, http) {
172
- ( Some ( https) , Some ( http) ) => tokio:: try_join!( https, http) . map ( |_| ( ) ) ,
173
- ( Some ( https) , None ) => https. await ,
174
- ( None , Some ( http) ) => http. await ,
175
- _ => Ok ( ( ) ) ,
212
+ let prom = self
213
+ . prom
214
+ . map ( |prom| {
215
+ //info!(?http, "Starting http");
216
+ let addr = prom. local_addr ( ) ;
217
+ prom. instrument ( debug_span ! ( "PROM" , local. addr = ?addr) )
218
+ } )
219
+ . map ( |prom| serve ( metrics) . serve_incoming ( prom) )
220
+ . map ( tokio:: spawn) ;
221
+
222
+ match ( https, http, prom) {
223
+ ( Some ( https) , Some ( http) , Some ( prom) ) => {
224
+ tokio:: try_join!( https, http, prom) . map ( |_| ( ) )
225
+ }
226
+ ( None , None , None ) => Ok ( ( ) ) ,
227
+
228
+ ( Some ( https) , Some ( http) , None ) => tokio:: try_join!( https, http) . map ( |_| ( ) ) ,
229
+ ( Some ( https) , None , Some ( prom) ) => tokio:: try_join!( https, prom) . map ( |_| ( ) ) ,
230
+ ( None , Some ( http) , Some ( prom) ) => tokio:: try_join!( http, prom) . map ( |_| ( ) ) ,
231
+
232
+ ( Some ( https) , None , None ) => https. await ,
233
+ ( None , Some ( http) , None ) => http. await ,
234
+ ( None , None , Some ( prom) ) => prom. await ,
176
235
} ?;
177
236
178
237
Ok ( ( ) )
0 commit comments