Skip to content

Commit 56f8c6d

Browse files
authored
feat(grpc): Add TCP listener API in the Runtime trait + tests for server credentials (#2507)
This change adds methods to the `Runtime` trait for creating TCP listeners and accepting remote connections. These methods are required to test the insecure server credentials and the upcoming TLS server credentials, which authenticate streams on the server side.
1 parent 149f366 commit 56f8c6d

File tree

5 files changed

+213
-25
lines changed

5 files changed

+213
-25
lines changed

grpc/src/client/name_resolution/dns/test.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::{
4040
},
4141
service_config::ServiceConfig,
4242
},
43-
rt::{self, tokio::TokioRuntime, GrpcRuntime},
43+
rt::{self, tokio::TokioRuntime, BoxFuture, GrpcRuntime, TcpOptions},
4444
};
4545

4646
use super::{DnsOptions, ParseResult};
@@ -299,6 +299,14 @@ impl rt::Runtime for FakeRuntime {
299299
) -> Pin<Box<dyn Future<Output = Result<Box<dyn rt::GrpcEndpoint>, String>> + Send>> {
300300
self.inner.tcp_stream(target, opts)
301301
}
302+
303+
fn listen_tcp(
304+
&self,
305+
_addr: std::net::SocketAddr,
306+
_opts: TcpOptions,
307+
) -> BoxFuture<Result<Box<dyn rt::TcpListener>, String>> {
308+
unimplemented!()
309+
}
302310
}
303311

304312
#[tokio::test]
@@ -311,7 +319,7 @@ pub(crate) async fn dns_lookup_error() {
311319
work_tx: work_tx.clone(),
312320
});
313321
let runtime = FakeRuntime {
314-
inner: TokioRuntime {},
322+
inner: TokioRuntime::default(),
315323
dns: FakeDns {
316324
latency: Duration::from_secs(0),
317325
lookup_result: Err("test_error".to_string()),
@@ -344,7 +352,7 @@ pub(crate) async fn dns_lookup_timeout() {
344352
work_tx: work_tx.clone(),
345353
});
346354
let runtime = FakeRuntime {
347-
inner: TokioRuntime {},
355+
inner: TokioRuntime::default(),
348356
dns: FakeDns {
349357
latency: Duration::from_secs(20),
350358
lookup_result: Ok(Vec::new()),

grpc/src/credentials/dyn_wrapper.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ mod tests {
123123
use crate::credentials::client::ClientHandshakeInfo;
124124
use crate::credentials::common::{Authority, SecurityLevel};
125125
use crate::credentials::insecure::InsecureChannelCredentials;
126-
use crate::rt::TcpOptions;
126+
use crate::credentials::InsecureServerCredentials;
127+
use crate::rt::{self, TcpOptions};
127128
use tokio::io::{AsyncReadExt, AsyncWriteExt};
128129
use tokio::net::TcpListener;
129130

@@ -173,4 +174,49 @@ mod tests {
173174
.security_context()
174175
.validate_authority(&authority));
175176
}
177+
178+
#[tokio::test]
179+
async fn test_dyn_server_credential_dispatch() {
180+
let creds = InsecureServerCredentials::new();
181+
let dyn_creds: Box<dyn DynServerCredentials> = Box::new(creds);
182+
183+
let info = dyn_creds.info();
184+
assert_eq!(info.security_protocol, "insecure");
185+
186+
let addr = "127.0.0.1:0";
187+
let runtime = rt::default_runtime();
188+
let mut listener = runtime
189+
.listen_tcp(addr.parse().unwrap(), TcpOptions::default())
190+
.await
191+
.unwrap();
192+
let server_addr = *listener.local_addr();
193+
194+
let client_handle = tokio::spawn(async move {
195+
let mut stream = tokio::net::TcpStream::connect(server_addr).await.unwrap();
196+
let data = b"hello dynamic grpc server";
197+
stream.write_all(data).await.unwrap();
198+
199+
// Keep the connection alive for a bit so server can read
200+
let mut buf = vec![0u8; 1];
201+
let _ = stream.read(&mut buf).await;
202+
});
203+
204+
let (server_stream, _) = listener.accept().await.unwrap();
205+
206+
let result = dyn_creds.accept(server_stream, runtime).await;
207+
208+
assert!(result.is_ok());
209+
let output = result.unwrap();
210+
let mut endpoint = output.endpoint;
211+
let security_info = output.security;
212+
213+
assert_eq!(security_info.security_protocol(), "insecure");
214+
assert_eq!(security_info.security_level(), SecurityLevel::NoSecurity);
215+
216+
let mut buf = vec![0u8; 25];
217+
endpoint.read_exact(&mut buf).await.unwrap();
218+
assert_eq!(&buf[..], b"hello dynamic grpc server");
219+
220+
client_handle.abort();
221+
}
176222
}

grpc/src/credentials/insecure.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,17 @@ impl ServerCredentials for InsecureServerCredentials {
129129
#[cfg(test)]
130130
mod test {
131131
use tokio::io::{AsyncReadExt, AsyncWriteExt};
132-
use tokio::net::TcpListener;
132+
use tokio::net::{TcpListener, TcpStream};
133133

134134
use crate::credentials::client::{
135135
ChannelCredsInternal as ClientSealed, ClientConnectionSecurityContext, ClientHandshakeInfo,
136136
};
137137
use crate::credentials::common::{Authority, SecurityLevel};
138-
use crate::credentials::{ChannelCredentials, InsecureChannelCredentials};
138+
use crate::credentials::server::ServerCredsInternal;
139+
use crate::credentials::{
140+
ChannelCredentials, InsecureChannelCredentials, InsecureServerCredentials,
141+
ServerCredentials,
142+
};
139143
use crate::rt::GrpcEndpoint;
140144
use crate::rt::{self, TcpOptions};
141145

@@ -188,4 +192,45 @@ mod test {
188192
.security_context()
189193
.validate_authority(&authority));
190194
}
195+
196+
#[tokio::test]
197+
async fn test_insecure_server_credentials() {
198+
let creds = InsecureServerCredentials::new();
199+
200+
let info = creds.info();
201+
assert_eq!(info.security_protocol, "insecure");
202+
203+
let addr = "127.0.0.1:0";
204+
let runtime = rt::default_runtime();
205+
let mut listener = runtime
206+
.listen_tcp(addr.parse().unwrap(), TcpOptions::default())
207+
.await
208+
.unwrap();
209+
let server_addr = *listener.local_addr();
210+
211+
let client_handle = tokio::spawn(async move {
212+
let mut stream = TcpStream::connect(server_addr).await.unwrap();
213+
let data = b"hello grpc";
214+
stream.write_all(data).await.unwrap();
215+
216+
// Keep the connection alive for a bit so server can read.
217+
let mut buf = vec![0u8; 1];
218+
let _ = stream.read(&mut buf).await;
219+
});
220+
221+
let (server_stream, _) = listener.accept().await.unwrap();
222+
223+
let output = creds.accept(server_stream, runtime).await.unwrap();
224+
let mut endpoint = output.endpoint;
225+
let security_info = output.security;
226+
227+
assert_eq!(security_info.security_protocol(), "insecure");
228+
assert_eq!(security_info.security_level(), SecurityLevel::NoSecurity);
229+
230+
let mut buf = vec![0u8; 10];
231+
endpoint.read_exact(&mut buf).await.unwrap();
232+
assert_eq!(&buf[..], b"hello grpc");
233+
234+
client_handle.abort();
235+
}
191236
}

grpc/src/rt/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub(crate) mod tokio;
3131

3232
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
3333
pub type BoxedTaskHandle = Box<dyn TaskHandle>;
34+
pub type BoxEndpoint = Box<dyn GrpcEndpoint>;
35+
pub type ScopedBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
3436

3537
/// An abstraction over an asynchronous runtime.
3638
///
@@ -58,6 +60,13 @@ pub trait Runtime: Send + Sync + Debug {
5860
target: SocketAddr,
5961
opts: TcpOptions,
6062
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>>;
63+
64+
/// Create a new listener for the given address.
65+
fn listen_tcp(
66+
&self,
67+
addr: SocketAddr,
68+
opts: TcpOptions,
69+
) -> BoxFuture<Result<Box<dyn TcpListener>, String>>;
6170
}
6271

6372
/// A future that resolves after a specified duration.
@@ -122,6 +131,20 @@ impl GrpcEndpoint for Box<dyn GrpcEndpoint> {
122131
}
123132
}
124133

134+
/// A trait representing a TCP listener capable of accepting incoming
135+
/// connections.
136+
pub trait TcpListener: Send + Sync {
137+
/// Accepts a new incoming connection.
138+
///
139+
/// Returns a future that resolves to a result containing the new
140+
/// `GrpcEndpoint` and the remote peer's `SocketAddr`, or an error string
141+
/// if acceptance fails.
142+
fn accept(&mut self) -> ScopedBoxFuture<'_, Result<(BoxEndpoint, SocketAddr), String>>;
143+
144+
/// Returns the local socket address this listener is bound to.
145+
fn local_addr(&self) -> &SocketAddr;
146+
}
147+
125148
/// A fake runtime to satisfy the compiler when no runtime is enabled. This will
126149
///
127150
/// # Panics
@@ -150,12 +173,20 @@ impl Runtime for NoOpRuntime {
150173
) -> Pin<Box<dyn Future<Output = Result<Box<dyn GrpcEndpoint>, String>> + Send>> {
151174
unimplemented!()
152175
}
176+
177+
fn listen_tcp(
178+
&self,
179+
addr: SocketAddr,
180+
_opts: TcpOptions,
181+
) -> BoxFuture<Result<Box<dyn TcpListener>, String>> {
182+
unimplemented!()
183+
}
153184
}
154185

155186
pub(crate) fn default_runtime() -> GrpcRuntime {
156187
#[cfg(feature = "_runtime-tokio")]
157188
{
158-
return GrpcRuntime::new(tokio::TokioRuntime {});
189+
return GrpcRuntime::new(tokio::TokioRuntime::default());
159190
}
160191
#[allow(unreachable_code)]
161192
GrpcRuntime::new(NoOpRuntime::default())
@@ -195,4 +226,12 @@ impl GrpcRuntime {
195226
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>> {
196227
self.inner.tcp_stream(target, opts)
197228
}
229+
230+
pub fn listen_tcp(
231+
&self,
232+
addr: SocketAddr,
233+
opts: TcpOptions,
234+
) -> BoxFuture<Result<Box<dyn TcpListener>, String>> {
235+
self.inner.listen_tcp(addr, opts)
236+
}
198237
}

grpc/src/rt/tokio/mod.rs

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,30 @@
2222
*
2323
*/
2424

25-
use std::{
26-
future::Future,
27-
net::{IpAddr, SocketAddr},
28-
pin::Pin,
29-
time::Duration,
30-
};
25+
use std::future::Future;
26+
use std::net::{IpAddr, SocketAddr};
27+
use std::pin::Pin;
28+
use std::time::Duration;
3129

32-
use tokio::{
33-
io::{AsyncRead, AsyncWrite},
34-
net::TcpStream,
35-
task::JoinHandle,
36-
};
30+
use tokio::io::{AsyncRead, AsyncWrite};
31+
use tokio::net::TcpStream;
32+
use tokio::task::JoinHandle;
3733

38-
use crate::rt::endpoint;
34+
use crate::rt::{BoxEndpoint, BoxFuture, ScopedBoxFuture, TcpOptions};
3935

40-
use super::{BoxedTaskHandle, DnsResolver, ResolverOptions, Runtime, Sleep, TaskHandle};
36+
use super::{
37+
endpoint, BoxedTaskHandle, DnsResolver, GrpcEndpoint, ResolverOptions, Runtime, Sleep,
38+
TaskHandle,
39+
};
4140

4241
#[cfg(feature = "dns")]
4342
mod hickory_resolver;
4443

4544
/// A DNS resolver that uses tokio::net::lookup_host for resolution. It only
4645
/// supports host lookups.
47-
struct TokioDefaultDnsResolver {}
46+
struct TokioDefaultDnsResolver {
47+
_priv: (),
48+
}
4849

4950
#[tonic::async_trait]
5051
impl DnsResolver for TokioDefaultDnsResolver {
@@ -66,8 +67,10 @@ impl DnsResolver for TokioDefaultDnsResolver {
6667
}
6768
}
6869

69-
#[derive(Debug)]
70-
pub(crate) struct TokioRuntime {}
70+
#[derive(Debug, Default)]
71+
pub(crate) struct TokioRuntime {
72+
_priv: (),
73+
}
7174

7275
impl TaskHandle for JoinHandle<()> {
7376
fn abort(&self) {
@@ -126,14 +129,32 @@ impl Runtime for TokioRuntime {
126129
Ok(stream)
127130
})
128131
}
132+
133+
fn listen_tcp(
134+
&self,
135+
addr: SocketAddr,
136+
_opts: TcpOptions,
137+
) -> BoxFuture<Result<Box<dyn super::TcpListener>, String>> {
138+
Box::pin(async move {
139+
let listener = tokio::net::TcpListener::bind(addr)
140+
.await
141+
.map_err(|err| err.to_string())?;
142+
let local_addr = listener.local_addr().map_err(|e| e.to_string())?;
143+
let listener = TokioListener {
144+
inner: listener,
145+
local_addr,
146+
};
147+
Ok(Box::new(listener) as Box<dyn super::TcpListener>)
148+
})
149+
}
129150
}
130151

131152
impl TokioDefaultDnsResolver {
132153
pub fn new(opts: ResolverOptions) -> Result<Self, String> {
133154
if opts.server_addr.is_some() {
134155
return Err("Custom DNS server are not supported, enable optional feature 'dns' to enable support.".to_string());
135156
}
136-
Ok(TokioDefaultDnsResolver {})
157+
Ok(TokioDefaultDnsResolver { _priv: () })
137158
}
138159
}
139160

@@ -201,13 +222,42 @@ impl super::GrpcEndpoint for TokioTcpStream {
201222
}
202223
}
203224

225+
struct TokioListener {
226+
inner: tokio::net::TcpListener,
227+
local_addr: SocketAddr,
228+
}
229+
230+
impl super::TcpListener for TokioListener {
231+
fn accept(&mut self) -> ScopedBoxFuture<'_, Result<(BoxEndpoint, SocketAddr), String>> {
232+
Box::pin(async move {
233+
let (stream, addr) = self.inner.accept().await.map_err(|e| e.to_string())?;
234+
Ok((
235+
Box::new(TokioTcpStream {
236+
local_addr: stream
237+
.local_addr()
238+
.map_err(|err| err.to_string())?
239+
.to_string()
240+
.into_boxed_str(),
241+
peer_addr: addr.to_string().into_boxed_str(),
242+
inner: stream,
243+
}) as Box<dyn GrpcEndpoint>,
244+
addr,
245+
))
246+
})
247+
}
248+
249+
fn local_addr(&self) -> &SocketAddr {
250+
&self.local_addr
251+
}
252+
}
253+
204254
#[cfg(test)]
205255
mod tests {
206256
use super::{DnsResolver, ResolverOptions, Runtime, TokioDefaultDnsResolver, TokioRuntime};
207257

208258
#[tokio::test]
209259
async fn lookup_hostname() {
210-
let runtime = TokioRuntime {};
260+
let runtime = TokioRuntime::default();
211261

212262
let dns = runtime
213263
.get_dns_resolver(ResolverOptions::default())

0 commit comments

Comments
 (0)