Skip to content

Commit 393a57e

Browse files
xd009642LucioFranco
authored andcommitted
feat(transport): Add server graceful shutdown (#169)
1 parent cce550b commit 393a57e

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

tonic/src/transport/server.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ impl Server {
261261
}
262262
},
263263
);
264-
265264
let svc = MakeSvc {
266265
inner: svc,
267266
interceptor,
@@ -280,6 +279,71 @@ impl Server {
280279

281280
Ok(())
282281
}
282+
283+
pub(crate) async fn serve_with_shutdown<S, F>(
284+
self,
285+
addr: SocketAddr,
286+
svc: S,
287+
signal: F,
288+
) -> Result<(), super::Error>
289+
where
290+
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
291+
S::Future: Send + 'static,
292+
S::Error: Into<crate::Error> + Send,
293+
F: Future<Output = ()>,
294+
{
295+
let interceptor = self.interceptor.clone();
296+
let concurrency_limit = self.concurrency_limit;
297+
let init_connection_window_size = self.init_connection_window_size;
298+
let init_stream_window_size = self.init_stream_window_size;
299+
let max_concurrent_streams = self.max_concurrent_streams;
300+
// let timeout = self.timeout.clone();
301+
302+
let incoming = hyper::server::accept::from_stream::<_, _, crate::Error>(
303+
async_stream::try_stream! {
304+
let mut tcp = TcpIncoming::bind(addr)?
305+
.set_nodelay(self.tcp_nodelay)
306+
.set_keepalive(self.tcp_keepalive);
307+
308+
while let Some(stream) = tcp.try_next().await? {
309+
#[cfg(feature = "tls")]
310+
{
311+
if let Some(tls) = &self.tls {
312+
let io = match tls.connect(stream.into_inner()).await {
313+
Ok(io) => io,
314+
Err(error) => {
315+
error!(message = "Unable to accept incoming connection.", %error);
316+
continue
317+
},
318+
};
319+
yield BoxedIo::new(io);
320+
continue;
321+
}
322+
}
323+
324+
yield BoxedIo::new(stream);
325+
}
326+
},
327+
);
328+
329+
let svc = MakeSvc {
330+
inner: svc,
331+
interceptor,
332+
concurrency_limit,
333+
// timeout,
334+
};
335+
hyper::Server::builder(incoming)
336+
.http2_only(true)
337+
.http2_initial_connection_window_size(init_connection_window_size)
338+
.http2_initial_stream_window_size(init_stream_window_size)
339+
.http2_max_concurrent_streams(max_concurrent_streams)
340+
.serve(svc)
341+
.with_graceful_shutdown(signal)
342+
.await
343+
.map_err(map_err)?;
344+
345+
Ok(())
346+
}
283347
}
284348

285349
impl<S> Router<S, Unimplemented> {
@@ -348,6 +412,19 @@ where
348412
pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> {
349413
self.server.serve(addr, self.routes).await
350414
}
415+
416+
/// Consume this [`Server`] creating a future that will execute the server
417+
/// on [`tokio`]'s default executor. And shutdown when the provided signal
418+
/// is received.
419+
///
420+
/// [`Server`]: struct.Server.html
421+
pub async fn serve_with_shutdown<F: Future<Output = ()>>(
422+
self,
423+
addr: SocketAddr,
424+
f: F,
425+
) -> Result<(), super::Error> {
426+
self.server.serve_with_shutdown(addr, self.routes, f).await
427+
}
351428
}
352429

353430
fn map_err(e: impl Into<crate::Error>) -> super::Error {

0 commit comments

Comments
 (0)