Skip to content

Commit 5a3c563

Browse files
authored
Remove Resolution trait (#606)
The `Resolution` trait was initially introduce to ensure that the update stream is infinite; but this isn't actually desirable in a general way. It's fine for resolution streams to complete. Consumers (balancers) need to handle this. This change removes this trait, in favor of a simple `TryStream`. This makes it easier to use ready-made stream combinators and constructors.
1 parent 61932ae commit 5a3c563

File tree

9 files changed

+149
-144
lines changed

9 files changed

+149
-144
lines changed

linkerd/proxy/api-resolve/src/resolve.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,10 @@ where
104104
}
105105
}
106106

107-
impl resolve::Resolution for Resolution {
108-
type Endpoint = Metadata;
109-
type Error = grpc::Status;
107+
impl Stream for Resolution {
108+
type Item = Result<resolve::Update<Metadata>, grpc::Status>;
110109

111-
fn poll(
112-
self: Pin<&mut Self>,
113-
cx: &mut Context<'_>,
114-
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>> {
110+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115111
let mut this = self.project();
116112
loop {
117113
match ready!(this.inner.as_mut().poll_next(cx)) {
@@ -126,7 +122,7 @@ impl resolve::Resolution for Resolution {
126122
.collect::<Vec<_>>();
127123
if !addr_metas.is_empty() {
128124
debug!(endpoints = %addr_metas.len(), "Add");
129-
return Poll::Ready(Ok(Update::Add(addr_metas)));
125+
return Poll::Ready(Some(Ok(Update::Add(addr_metas))));
130126
}
131127
}
132128

@@ -137,7 +133,7 @@ impl resolve::Resolution for Resolution {
137133
.collect::<Vec<_>>();
138134
if !sock_addrs.is_empty() {
139135
debug!(endpoints = %sock_addrs.len(), "Remove");
140-
return Poll::Ready(Ok(Update::Remove(sock_addrs)));
136+
return Poll::Ready(Some(Ok(Update::Remove(sock_addrs))));
141137
}
142138
}
143139

@@ -148,14 +144,12 @@ impl resolve::Resolution for Resolution {
148144
} else {
149145
Update::DoesNotExist
150146
};
151-
return Poll::Ready(Ok(update.into()));
147+
return Poll::Ready(Some(Ok(update.into())));
152148
}
153149

154150
None => {} // continue
155151
},
156-
None => {
157-
return Poll::Ready(Err(grpc::Status::new(grpc::Code::Ok, "end of stream")))
158-
}
152+
None => return Poll::Ready(None),
159153
};
160154
}
161155
}

linkerd/proxy/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
pub mod resolve;
44

5-
pub use self::resolve::{Resolution, Resolve};
5+
pub use self::{resolve::Resolve, resolve::Update};

linkerd/proxy/core/src/resolve.rs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
use futures::stream::TryStream;
12
use linkerd2_error::Error;
23
use std::future::Future;
34
use std::net::SocketAddr;
4-
use std::pin::Pin;
55
use std::task::{Context, Poll};
66

7-
/// Resolves `T`-typed names/addresses as a `Resolution`.
7+
/// Resolves `T`-typed names/addresses as an infinite stream of `Update<Self::Endpoint>`.
88
pub trait Resolve<T> {
99
type Endpoint;
1010
type Error: Into<Error>;
11-
type Resolution: Resolution<Endpoint = Self::Endpoint>;
11+
type Resolution: TryStream<Ok = Update<Self::Endpoint>, Error = Self::Error>;
1212
type Future: Future<Output = Result<Self::Resolution, Self::Error>>;
1313

1414
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
@@ -23,27 +23,6 @@ pub trait Resolve<T> {
2323
}
2424
}
2525

26-
/// An infinite stream of endpoint updates.
27-
pub trait Resolution {
28-
type Endpoint;
29-
type Error: Into<Error>;
30-
31-
fn poll(
32-
self: Pin<&mut Self>,
33-
cx: &mut Context<'_>,
34-
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>;
35-
36-
fn poll_unpin(
37-
&mut self,
38-
cx: &mut Context<'_>,
39-
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>
40-
where
41-
Self: Unpin,
42-
{
43-
Pin::new(self).poll(cx)
44-
}
45-
}
46-
4726
#[derive(Clone, Debug)]
4827
pub struct Service<S>(S);
4928

@@ -57,13 +36,13 @@ pub enum Update<T> {
5736

5837
// === impl Resolve ===
5938

60-
impl<S, T, R> Resolve<T> for S
39+
impl<S, T, R, E> Resolve<T> for S
6140
where
6241
S: tower::Service<T, Response = R>,
6342
S::Error: Into<Error>,
64-
R: Resolution,
43+
R: TryStream<Ok = Update<E>, Error = S::Error>,
6544
{
66-
type Endpoint = <R as Resolution>::Endpoint;
45+
type Endpoint = E;
6746
type Error = S::Error;
6847
type Resolution = S::Response;
6948
type Future = S::Future;

linkerd/proxy/discover/src/from_resolve.rs

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use futures::{ready, Stream, TryFuture};
1+
use futures::{ready, Stream, TryFuture, TryStream};
22
use indexmap::IndexSet;
3-
use linkerd2_proxy_core::resolve::{Resolution, Resolve, Update};
3+
use linkerd2_proxy_core::resolve::{Resolve, Update};
44
use pin_project::pin_project;
55
use std::collections::VecDeque;
66
use std::future::Future;
@@ -10,45 +10,50 @@ use std::task::{Context, Poll};
1010
use tower::discover::Change;
1111

1212
#[derive(Clone, Debug)]
13-
pub struct FromResolve<R> {
13+
pub struct FromResolve<R, E> {
1414
resolve: R,
15+
_marker: std::marker::PhantomData<fn(E)>,
1516
}
1617

1718
#[pin_project]
1819
#[derive(Debug)]
19-
pub struct DiscoverFuture<F> {
20+
pub struct DiscoverFuture<F, E> {
2021
#[pin]
2122
future: F,
23+
_marker: std::marker::PhantomData<fn(E)>,
2224
}
2325

2426
/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to
2527
/// build a service for each endpoint.
2628
#[pin_project]
27-
pub struct Discover<R: Resolution> {
29+
pub struct Discover<R: TryStream, E> {
2830
#[pin]
2931
resolution: R,
3032
active: IndexSet<SocketAddr>,
31-
pending: VecDeque<Change<SocketAddr, R::Endpoint>>,
33+
pending: VecDeque<Change<SocketAddr, E>>,
3234
}
3335

3436
// === impl FromResolve ===
3537

36-
impl<R> FromResolve<R> {
38+
impl<R, E> FromResolve<R, E> {
3739
pub fn new<T>(resolve: R) -> Self
3840
where
3941
R: Resolve<T>,
4042
{
41-
Self { resolve }
43+
Self {
44+
resolve,
45+
_marker: std::marker::PhantomData,
46+
}
4247
}
4348
}
4449

45-
impl<T, R> tower::Service<T> for FromResolve<R>
50+
impl<T, R, E> tower::Service<T> for FromResolve<R, E>
4651
where
4752
R: Resolve<T> + Clone,
4853
{
49-
type Response = Discover<R::Resolution>;
54+
type Response = Discover<R::Resolution, E>;
5055
type Error = R::Error;
51-
type Future = DiscoverFuture<R::Future>;
56+
type Future = DiscoverFuture<R::Future, E>;
5257

5358
#[inline]
5459
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -59,18 +64,19 @@ where
5964
fn call(&mut self, target: T) -> Self::Future {
6065
Self::Future {
6166
future: self.resolve.resolve(target),
67+
_marker: std::marker::PhantomData,
6268
}
6369
}
6470
}
6571

6672
// === impl DiscoverFuture ===
6773

68-
impl<F> Future for DiscoverFuture<F>
74+
impl<F, E> Future for DiscoverFuture<F, E>
6975
where
7076
F: TryFuture,
71-
F::Ok: Resolution,
77+
F::Ok: TryStream,
7278
{
73-
type Output = Result<Discover<F::Ok>, F::Error>;
79+
type Output = Result<Discover<F::Ok, E>, F::Error>;
7480

7581
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
7682
let resolution = ready!(self.project().future.try_poll(cx))?;
@@ -80,7 +86,7 @@ where
8086

8187
// === impl Discover ===
8288

83-
impl<R: Resolution> Discover<R> {
89+
impl<R: TryStream, E> Discover<R, E> {
8490
pub fn new(resolution: R) -> Self {
8591
Self {
8692
resolution,
@@ -90,8 +96,11 @@ impl<R: Resolution> Discover<R> {
9096
}
9197
}
9298

93-
impl<R: Resolution> Stream for Discover<R> {
94-
type Item = Result<Change<SocketAddr, R::Endpoint>, R::Error>;
99+
impl<R, E> Stream for Discover<R, E>
100+
where
101+
R: TryStream<Ok = Update<E>>,
102+
{
103+
type Item = Result<Change<SocketAddr, E>, R::Error>;
95104

96105
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97106
loop {
@@ -100,24 +109,27 @@ impl<R: Resolution> Stream for Discover<R> {
100109
return Poll::Ready(Some(Ok(change)));
101110
}
102111

103-
match ready!(this.resolution.poll(cx))? {
104-
Update::Add(endpoints) => {
105-
for (addr, endpoint) in endpoints.into_iter() {
106-
this.active.insert(addr);
107-
this.pending.push_back(Change::Insert(addr, endpoint));
112+
match ready!(this.resolution.try_poll_next(cx)) {
113+
Some(update) => match update? {
114+
Update::Add(endpoints) => {
115+
for (addr, endpoint) in endpoints.into_iter() {
116+
this.active.insert(addr);
117+
this.pending.push_back(Change::Insert(addr, endpoint));
118+
}
108119
}
109-
}
110-
Update::Remove(addrs) => {
111-
for addr in addrs.into_iter() {
112-
if this.active.remove(&addr) {
113-
this.pending.push_back(Change::Remove(addr));
120+
Update::Remove(addrs) => {
121+
for addr in addrs.into_iter() {
122+
if this.active.remove(&addr) {
123+
this.pending.push_back(Change::Remove(addr));
124+
}
114125
}
115126
}
116-
}
117-
Update::DoesNotExist | Update::Empty => {
118-
this.pending
119-
.extend(this.active.drain(..).map(Change::Remove));
120-
}
127+
Update::DoesNotExist | Update::Empty => {
128+
this.pending
129+
.extend(this.active.drain(..).map(Change::Remove));
130+
}
131+
},
132+
None => return Poll::Ready(None),
121133
}
122134
}
123135
}

linkerd/proxy/discover/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ where
4343
T: fmt::Display,
4444
R: Resolve<T> + Send + Clone + 'static,
4545
R::Error: Into<Error>,
46-
R::Endpoint: fmt::Debug + Clone + PartialEq + Send,
46+
R::Endpoint: fmt::Debug + Clone + PartialEq + Send + 'static,
4747
R::Resolution: Send + 'static,
4848
R::Future: Send + 'static,
4949
M: tower::Service<R::Endpoint> + Clone + Send + 'static,
5050
M::Error: Into<Error>,
5151
M::Response: Send + 'static,
5252
M::Future: Send + 'static,
5353
{
54-
type Service = Buffer<MakeEndpoint<FromResolve<R>, M>>;
54+
type Service = Buffer<MakeEndpoint<FromResolve<R, R::Endpoint>, M>>;
5555

5656
fn layer(&self, make_endpoint: M) -> Self::Service {
5757
let make_discover =

linkerd/proxy/discover/src/make_endpoint.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,17 @@ where
195195
// services. Don't process any updates until we can do so.
196196
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?;
197197

198-
match ready!(this.discover.poll_discover(cx))
199-
.expect("XXX(eliza): can this ever be none???")
200-
.map_err(Into::into)?
201-
{
202-
Change::Insert(key, target) => {
203-
// Start building the service and continue. If a pending
204-
// service exists for this addr, it will be canceled.
205-
let fut = this.make_endpoint.call(target);
206-
this.make_futures.push(key, fut);
207-
}
208-
Change::Remove(key) => {
209-
this.pending_removals.push(key);
198+
if let Some(change) = ready!(this.discover.poll_discover(cx)) {
199+
match change.map_err(Into::into)? {
200+
Change::Insert(key, target) => {
201+
// Start building the service and continue. If a pending
202+
// service exists for this addr, it will be canceled.
203+
let fut = this.make_endpoint.call(target);
204+
this.make_futures.push(key, fut);
205+
}
206+
Change::Remove(key) => {
207+
this.pending_removals.push(key);
208+
}
210209
}
211210
}
212211
}

linkerd/proxy/resolve/src/make_unpin.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use futures::stream::{Stream, TryStream};
12
use futures::TryFuture;
2-
use linkerd2_proxy_core::resolve::{self, Resolution, Update};
3+
use linkerd2_proxy_core::resolve;
34
use pin_project::pin_project;
45
use std::future::Future;
56
use std::pin::Pin;
@@ -38,15 +39,11 @@ impl<T> MakeUnpin<T> {
3839
}
3940
}
4041

41-
impl<T: Resolution> Resolution for MakeUnpin<T> {
42-
type Endpoint = T::Endpoint;
43-
type Error = T::Error;
42+
impl<T: TryStream> Stream for MakeUnpin<T> {
43+
type Item = Result<T::Ok, T::Error>;
4444

45-
fn poll(
46-
self: Pin<&mut Self>,
47-
cx: &mut Context<'_>,
48-
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>> {
49-
self.project().0.as_mut().as_mut().poll(cx)
45+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46+
self.project().0.as_mut().as_mut().try_poll_next(cx)
5047
}
5148
}
5249

0 commit comments

Comments
 (0)