Skip to content

Commit 1769c91

Browse files
authored
feat(xds): implement xDS subscription worker (#2478)
## Motivation Ref: #2444 With #2475 transport and codec change merged, the remaining change required to get the xDS workflow work end to end is to wire them together with `XdsClient` through a worker loop. This PR implements that. ## Solution 1. Implement `AdsWorker`, a transport/runtime/codegen-agnostic event loop for managing xDS subscriptions and ADS stream. - The worker conceptually manages a pair of mpsc channel, where the sender is used by `XdsClient` to send subscription requests and the receiver is used by `TransportStream` to send `DiscoveryRequest` to xDS servers. - When the underlying ADS stream closes, retry with exponential backoff is supported. Configurable via `ClientConfig`. 3. Implement `ResourceWatcher` and wire it with `XdsClient` so user can now subscribes to xDS resources. Some design choice highlights: 1. Created a `DecodedResource` that is a type-erased representation of xDS resource with its decoding function carried in a closure. AdsWorker sends and receives this type on channels so it can stay transport and codec generic. 2. The ADS stream connection waits for the first subscription from the user. This is because `tonic`'s gRPC stream `::connect()` awaits for the response headers. Depending on the xDS server implementation, it may not respond back with headers until the first subscription, creating a deadlock if we await for the stream creation before sending any requests. (Btw `grpc-go` works around this by having send/recv in different go routines, here I kept both in the same worker loop to reduce the shared state complexity) ## Testing Created a `basic.rs` example to showcase the user experience. I've used it to test against a local xDS management server and successfully subscribed to multiple Listener resources. ``` === xds-client Example === Connecting to xDS server: https://[redacted-private-server] Connected! Enter listener names to watch (one per line, Ctrl+C to exit): (Use empty string for wildcard subscription) [redacted-listener-1] → Watching for Listener: '[redacted-listener-1]' ✓ Listener received: name: [redacted-listener-1] rds_config: [redacted-route-1] [redacted-listener-2] → Watching for Listener: '[redacted-listener-2]' ✓ Listener received: name: [redacted-listener-2] rds_config: [redacted-route-2] ✓ Listener received: name: [redacted-listener-1] rds_config: [redacted-route-1] ``` ## Next Steps The current implementation completes the basic functionality end to end, but have these improvements opportunities: 1. `xds-client` currently do not cache received resources. This means new watchers to a subscribed resource need to wait for the next response from xDS server. If we add in resource caching, new watchers get instant response. 4. If we bring in a proper xDS server implementation we can run integration test in CI. 5. Observability. We'll bring in tracing, logging and metrics support. 6. Delta xDS. The current version implements State of the World variant of xDS only, but in some use cases delta variant is more efficient and scalable. Additionally, these features are xDS-related but are also gRPC-specific so we have plan to implement them in a separate `tonic-xds` crate, using this `xds-client` crate: 1. Cascadingly subscribes to RDS, CDS, and EDS resources from a top-level Listener, for gRPC routing and load balancing. 2. xDS server connection bootstrapping, this includes TLS configuration, connecting pooling, multi-server fallback, etc.
1 parent 56f8c6d commit 1769c91

File tree

15 files changed

+2390
-242
lines changed

15 files changed

+2390
-242
lines changed

xds-client/Cargo.toml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@ workspace = true
1515
[dependencies]
1616
bytes = "1.11.0"
1717
thiserror = "2"
18-
futures-channel = "0.3"
18+
tokio = { version = "1", features = ["sync", "macros"] }
1919

2020
# Optional dependencies for tonic transport
2121
tonic = { version = "0.14", optional = true }
22-
tokio = { version = "1", features = ["rt", "time", "sync"], optional = true }
2322
tokio-stream = { version = "0.1", optional = true }
2423
http = { version = "1", optional = true }
2524

@@ -32,19 +31,27 @@ default = ["transport-tonic", "codegen-prost"]
3231
transport-tonic = [
3332
"rt-tokio",
3433
"dep:tonic",
35-
"dep:tokio",
3634
"dep:tokio-stream",
3735
"dep:http",
3836
]
39-
rt-tokio = ["dep:tokio"]
37+
rt-tokio = ["tokio/rt", "tokio/time"]
4038
codegen-prost = ["dep:envoy-types", "dep:prost"]
4139

4240
[dev-dependencies]
43-
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] }
41+
tokio = { version = "1", features = [
42+
"rt-multi-thread",
43+
"macros",
44+
"net",
45+
] }
46+
tonic = { version = "0.14", features = ["tls-ring"] }
4447
async-stream = "0.3"
4548
envoy-types = "0.7"
4649
prost = "0.14"
4750

51+
[[example]]
52+
name = "basic"
53+
path = "examples/basic.rs"
54+
4855
[package.metadata.cargo_check_external_types]
4956
allowed_external_types = [
5057
# major released

xds-client/examples/basic.rs

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
//! Example demonstrating xds-client usage.
2+
//!
3+
//! This example shows:
4+
//! - How to implement the `Resource` trait for Envoy Listener
5+
//! - How to create an `XdsClient` with tonic transport and prost codec
6+
//! - How to watch for resources and handle events
7+
//!
8+
//! # Configuration (environment variables)
9+
//!
10+
//! - `XDS_SERVER` — URI of the xDS management server (default: `http://localhost:18000`)
11+
//! - `XDS_LISTENERS` — Comma-separated listener names to watch (required)
12+
//! - `XDS_CA_CERT` — Path to PEM-encoded CA certificate (enables TLS)
13+
//! - `XDS_CLIENT_CERT` — Path to PEM-encoded client certificate (for mTLS, requires `XDS_CA_CERT`)
14+
//! - `XDS_CLIENT_KEY` — Path to PEM-encoded client key (for mTLS, requires `XDS_CLIENT_CERT`)
15+
//!
16+
//! # Usage
17+
//!
18+
//! ```sh
19+
//! # Basic usage
20+
//! XDS_LISTENERS=my-listener cargo run -p xds-client --example basic
21+
//!
22+
//! # Multiple listeners
23+
//! XDS_LISTENERS=listener-1,listener-2 cargo run -p xds-client --example basic
24+
//!
25+
//! # Custom server
26+
//! XDS_SERVER=http://xds.example.com:18000 XDS_LISTENERS=foo cargo run -p xds-client --example basic
27+
//!
28+
//! # With TLS
29+
//! XDS_CA_CERT=/path/to/ca.pem \
30+
//! XDS_CLIENT_CERT=/path/to/client.pem \
31+
//! XDS_CLIENT_KEY=/path/to/client.key \
32+
//! XDS_LISTENERS=my-listener \
33+
//! cargo run -p xds-client --example basic
34+
//! ```
35+
36+
use bytes::Bytes;
37+
use envoy_types::pb::envoy::config::listener::v3::Listener as ListenerProto;
38+
use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{
39+
http_connection_manager::RouteSpecifier, HttpConnectionManager,
40+
};
41+
use prost::Message;
42+
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
43+
44+
use xds_client::resource::TypeUrl;
45+
use xds_client::{
46+
ClientConfig, Node, ProstCodec, Resource, ResourceEvent, Result as XdsResult, ServerConfig,
47+
TokioRuntime, TonicTransport, TonicTransportBuilder, TransportBuilder, XdsClient,
48+
};
49+
50+
struct Args {
51+
server: String,
52+
ca_cert: Option<String>,
53+
client_cert: Option<String>,
54+
client_key: Option<String>,
55+
listeners: Vec<String>,
56+
}
57+
58+
fn parse_args() -> Args {
59+
let server =
60+
std::env::var("XDS_SERVER").unwrap_or_else(|_| "http://localhost:18000".to_string());
61+
62+
let listeners: Vec<String> = std::env::var("XDS_LISTENERS")
63+
.expect("XDS_LISTENERS env var is required (comma-separated listener names)")
64+
.split(',')
65+
.map(|s| s.trim().to_string())
66+
.filter(|s| !s.is_empty())
67+
.collect();
68+
69+
if listeners.is_empty() {
70+
panic!("XDS_LISTENERS must contain at least one listener name");
71+
}
72+
73+
let ca_cert = std::env::var("XDS_CA_CERT").ok();
74+
let client_cert = std::env::var("XDS_CLIENT_CERT").ok();
75+
let client_key = std::env::var("XDS_CLIENT_KEY").ok();
76+
77+
if client_cert.is_some() && ca_cert.is_none() {
78+
panic!("XDS_CLIENT_CERT requires XDS_CA_CERT to be set");
79+
}
80+
if client_key.is_some() && client_cert.is_none() {
81+
panic!("XDS_CLIENT_KEY requires XDS_CLIENT_CERT to be set");
82+
}
83+
84+
Args {
85+
server,
86+
ca_cert,
87+
client_cert,
88+
client_key,
89+
listeners,
90+
}
91+
}
92+
93+
/// A simplified Listener resource for gRPC xDS.
94+
///
95+
/// Extracts the RDS route config name from the ApiListener's HttpConnectionManager.
96+
#[derive(Debug, Clone)]
97+
pub struct Listener {
98+
/// The listener name.
99+
pub name: String,
100+
/// The RDS route config name (from HttpConnectionManager).
101+
pub rds_route_config_name: Option<String>,
102+
}
103+
104+
/// Custom transport builder that configures TLS on the channel.
105+
///
106+
/// This demonstrates how to implement a custom [`TransportBuilder`] when you need
107+
/// TLS or other custom channel configuration. The default [`TonicTransportBuilder`]
108+
/// creates plain (non-TLS) connections.
109+
struct TlsTransportBuilder {
110+
tls_config: ClientTlsConfig,
111+
}
112+
113+
impl TransportBuilder for TlsTransportBuilder {
114+
type Transport = TonicTransport;
115+
116+
async fn build(&self, server: &ServerConfig) -> XdsResult<Self::Transport> {
117+
let channel = Channel::from_shared(server.uri().to_string())
118+
.map_err(|e| xds_client::Error::Connection(e.to_string()))?
119+
.tls_config(self.tls_config.clone())
120+
.map_err(|e| xds_client::Error::Connection(e.to_string()))?
121+
.connect()
122+
.await
123+
.map_err(|e| xds_client::Error::Connection(e.to_string()))?;
124+
125+
Ok(TonicTransport::from_channel(channel))
126+
}
127+
}
128+
129+
impl Resource for Listener {
130+
type Message = ListenerProto;
131+
132+
const TYPE_URL: TypeUrl = TypeUrl::new("type.googleapis.com/envoy.config.listener.v3.Listener");
133+
134+
fn deserialize(bytes: Bytes) -> xds_client::Result<Self::Message> {
135+
ListenerProto::decode(bytes).map_err(Into::into)
136+
}
137+
138+
fn name(message: &Self::Message) -> &str {
139+
&message.name
140+
}
141+
142+
fn validate(message: Self::Message) -> xds_client::Result<Self> {
143+
let hcm = message
144+
.api_listener
145+
.and_then(|api| api.api_listener)
146+
.and_then(|any| HttpConnectionManager::decode(Bytes::from(any.value)).ok());
147+
148+
let rds_route_config_name = hcm.and_then(|hcm| match hcm.route_specifier {
149+
Some(RouteSpecifier::Rds(rds)) => Some(rds.route_config_name),
150+
_ => None,
151+
});
152+
153+
Ok(Self {
154+
name: message.name,
155+
rds_route_config_name,
156+
})
157+
}
158+
}
159+
160+
#[tokio::main]
161+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
162+
let args = parse_args();
163+
164+
println!("xds-client Example\n");
165+
println!("Connecting to xDS server: {}", args.server);
166+
167+
let node = Node::new("grpc", "1.0").with_id("example-node");
168+
let config = ClientConfig::new(node, &args.server);
169+
170+
let client = match &args.ca_cert {
171+
Some(ca_path) => {
172+
let ca_cert = std::fs::read_to_string(ca_path)?;
173+
let mut tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&ca_cert));
174+
175+
if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) {
176+
let client_cert = std::fs::read_to_string(cert_path)?;
177+
let client_key = std::fs::read_to_string(key_path)?;
178+
tls = tls.identity(Identity::from_pem(client_cert, client_key));
179+
}
180+
181+
let tls_builder = TlsTransportBuilder { tls_config: tls };
182+
XdsClient::builder(config, tls_builder, ProstCodec, TokioRuntime).build()
183+
}
184+
None => XdsClient::builder(
185+
config,
186+
TonicTransportBuilder::new(),
187+
ProstCodec,
188+
TokioRuntime,
189+
)
190+
.build(),
191+
};
192+
193+
println!("Starting watchers...\n");
194+
195+
let (event_tx, mut event_rx) =
196+
tokio::sync::mpsc::unbounded_channel::<ResourceEvent<Listener>>();
197+
198+
// Start watchers for each listener from args
199+
for name in &args.listeners {
200+
println!("Watching for Listener: '{name}'");
201+
202+
let mut watcher = client.watch::<Listener>(name).await;
203+
let tx = event_tx.clone();
204+
205+
tokio::spawn(async move {
206+
while let Some(event) = watcher.next().await {
207+
if tx.send(event).is_err() {
208+
eprintln!("Event channel closed, stopping watcher");
209+
break;
210+
}
211+
}
212+
});
213+
}
214+
215+
// Drop the original sender so the loop exits when all watchers complete
216+
drop(event_tx);
217+
218+
while let Some(event) = event_rx.recv().await {
219+
match event {
220+
ResourceEvent::ResourceChanged {
221+
result: Ok(resource),
222+
done,
223+
} => {
224+
println!("Listener received:");
225+
println!(" name: {}", resource.name);
226+
if let Some(ref rds) = resource.rds_route_config_name {
227+
println!(" rds_config: {rds}");
228+
}
229+
println!();
230+
231+
// In gRPC xDS, you would cascadingly subscribe to RDS, CDS, EDS, etc.
232+
// The done signal is sent automatically when it's dropped.
233+
drop(done);
234+
}
235+
236+
ResourceEvent::ResourceChanged {
237+
result: Err(error), ..
238+
} => {
239+
// Resource was invalidated (validation error, deleted, etc.)
240+
println!("Resource invalidated: {error}");
241+
}
242+
243+
ResourceEvent::AmbientError { error, .. } => {
244+
// Non-fatal error, continue using cached resource if available
245+
println!("Ambient error: {error}");
246+
}
247+
}
248+
}
249+
250+
println!("Exiting");
251+
Ok(())
252+
}

0 commit comments

Comments
 (0)