Skip to content

Commit 038733e

Browse files
committed
Move resolve api to async stream
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
1 parent 8a085d5 commit 038733e

File tree

4 files changed

+65
-62
lines changed

4 files changed

+65
-62
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,7 @@ dependencies = [
12131213
name = "linkerd2-proxy-api-resolve"
12141214
version = "0.1.0"
12151215
dependencies = [
1216+
"async-stream",
12161217
"futures 0.3.5",
12171218
"http 0.2.1",
12181219
"http-body",

linkerd/proxy/api-resolve/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Implements the Resolve trait using the proxy's gRPC API
99
"""
1010

1111
[dependencies]
12+
async-stream = "0.2.1"
1213
futures = "0.3"
1314
linkerd2-identity = { path = "../../identity" }
1415
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" }

linkerd/proxy/api-resolve/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![deny(warnings, rust_2018_idioms)]
2+
#![recursion_limit = "512"]
23

34
use linkerd2_identity as identity;
45
use linkerd2_proxy_api as api;

linkerd/proxy/api-resolve/src/resolve.rs

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ use crate::core::resolve::{self, Update};
33
use crate::metadata::Metadata;
44
use crate::pb;
55
use api::destination_client::DestinationClient;
6-
use futures::{ready, Stream};
6+
use async_stream::try_stream;
7+
use futures::future;
8+
use futures::stream::StreamExt;
9+
use futures::Stream;
710
use http_body::Body as HttpBody;
8-
use pin_project::pin_project;
911
use std::error::Error;
10-
use std::future::Future;
1112
use std::pin::Pin;
1213
use std::task::{Context, Poll};
1314
use tonic::{
@@ -25,13 +26,7 @@ pub struct Resolve<S> {
2526
context_token: String,
2627
}
2728

28-
#[pin_project]
29-
pub struct Resolution {
30-
#[pin]
31-
inner: grpc::Streaming<api::Update>,
32-
}
33-
34-
// === impl Resolver ===
29+
// === impl Resolve ===
3530

3631
impl<S> Resolve<S>
3732
where
@@ -65,6 +60,9 @@ where
6560
}
6661
}
6762

63+
type UpdatesStream =
64+
Pin<Box<dyn Stream<Item = Result<Update<Metadata>, grpc::Status>> + Send + 'static>>;
65+
6866
impl<T, S> Service<T> for Resolve<S>
6967
where
7068
T: ToString,
@@ -75,10 +73,9 @@ where
7573
<S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send,
7674
S::Future: Send,
7775
{
78-
type Response = Resolution;
76+
type Response = UpdatesStream;
7977
type Error = grpc::Status;
80-
type Future =
81-
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
78+
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
8279

8380
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
8481
// The future returned by the Tonic generated `DestinationClient`'s `get` method will drive the service to readiness before calling it, so we can always return `Ready` here.
@@ -88,69 +85,72 @@ where
8885
fn call(&mut self, target: T) -> Self::Future {
8986
let path = target.to_string();
9087
debug!(dst = %path, context = %self.context_token, "Resolving");
91-
let mut svc = self.service.clone();
9288
let req = api::GetDestination {
9389
path,
9490
scheme: self.scheme.clone(),
9591
context_token: self.context_token.clone(),
9692
};
97-
Box::pin(async move {
98-
let rsp = svc.get(grpc::Request::new(req)).await?;
99-
trace!(metadata = ?rsp.metadata());
100-
Ok(Resolution {
101-
inner: rsp.into_inner(),
102-
})
103-
})
93+
94+
future::ok(Box::pin(resolution(self.service.clone(), req)))
10495
}
10596
}
10697

107-
impl Stream for Resolution {
108-
type Item = Result<resolve::Update<Metadata>, grpc::Status>;
98+
fn resolution<S>(
99+
mut client: DestinationClient<S>,
100+
req: api::GetDestination,
101+
) -> impl Stream<Item = Result<resolve::Update<Metadata>, grpc::Status>>
102+
where
103+
S: GrpcService<BoxBody> + Clone + Send + 'static,
104+
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
105+
S::ResponseBody: Send,
106+
<S::ResponseBody as Body>::Data: Send,
107+
<S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send,
108+
S::Future: Send,
109+
{
110+
try_stream! {
111+
let rsp = client.get(grpc::Request::new(req)).await?;
112+
trace!(metadata = ?rsp.metadata());
113+
let mut stream = rsp.into_inner();
109114

110-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111-
let mut this = self.project();
112-
loop {
113-
match ready!(this.inner.as_mut().poll_next(cx)) {
114-
Some(update) => match update?.update {
115-
Some(api::update::Update::Add(api::WeightedAddrSet {
116-
addrs,
117-
metric_labels,
118-
})) => {
119-
let addr_metas = addrs
120-
.into_iter()
121-
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
122-
.collect::<Vec<_>>();
123-
if !addr_metas.is_empty() {
124-
debug!(endpoints = %addr_metas.len(), "Add");
125-
return Poll::Ready(Some(Ok(Update::Add(addr_metas))));
126-
}
115+
while let Some(update) = stream.next().await {
116+
match update?.update {
117+
Some(api::update::Update::Add(api::WeightedAddrSet {
118+
addrs,
119+
metric_labels,
120+
})) => {
121+
let addr_metas = addrs
122+
.into_iter()
123+
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
124+
.collect::<Vec<_>>();
125+
if !addr_metas.is_empty() {
126+
debug!(endpoints = %addr_metas.len(), "Add");
127+
yield Update::Add(addr_metas);
127128
}
129+
}
128130

129-
Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
130-
let sock_addrs = addrs
131-
.into_iter()
132-
.filter_map(pb::to_sock_addr)
133-
.collect::<Vec<_>>();
134-
if !sock_addrs.is_empty() {
135-
debug!(endpoints = %sock_addrs.len(), "Remove");
136-
return Poll::Ready(Some(Ok(Update::Remove(sock_addrs))));
137-
}
131+
Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
132+
let sock_addrs = addrs
133+
.into_iter()
134+
.filter_map(pb::to_sock_addr)
135+
.collect::<Vec<_>>();
136+
if !sock_addrs.is_empty() {
137+
debug!(endpoints = %sock_addrs.len(), "Remove");
138+
yield Update::Remove(sock_addrs);
138139
}
140+
}
139141

140-
Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
141-
info!("No endpoints");
142-
let update = if exists {
143-
Update::Empty
144-
} else {
145-
Update::DoesNotExist
146-
};
147-
return Poll::Ready(Some(Ok(update.into())));
148-
}
142+
Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
143+
info!("No endpoints");
144+
let update = if exists {
145+
Update::Empty
146+
} else {
147+
Update::DoesNotExist
148+
};
149+
yield update.into();
150+
}
149151

150-
None => {} // continue
151-
},
152-
None => return Poll::Ready(None),
153-
};
152+
None => {} // continue
153+
}
154154
}
155155
}
156156
}

0 commit comments

Comments
 (0)