-
Notifications
You must be signed in to change notification settings - Fork 292
Move resolve api to async-stream #599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d53c974
f3bb641
8307876
83a54b4
9d6031a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,11 +3,10 @@ use crate::core::resolve::{self, Update}; | |||||||||||||||||||||
| use crate::metadata::Metadata; | ||||||||||||||||||||||
| use crate::pb; | ||||||||||||||||||||||
| use api::destination_client::DestinationClient; | ||||||||||||||||||||||
| use futures::{ready, Stream}; | ||||||||||||||||||||||
| use async_stream::try_stream; | ||||||||||||||||||||||
| use futures::{future, stream::StreamExt, Stream}; | ||||||||||||||||||||||
| use http_body::Body as HttpBody; | ||||||||||||||||||||||
| use pin_project::pin_project; | ||||||||||||||||||||||
| use std::error::Error; | ||||||||||||||||||||||
| use std::future::Future; | ||||||||||||||||||||||
| use std::pin::Pin; | ||||||||||||||||||||||
| use std::task::{Context, Poll}; | ||||||||||||||||||||||
| use tonic::{ | ||||||||||||||||||||||
|
|
@@ -25,13 +24,7 @@ pub struct Resolve<S> { | |||||||||||||||||||||
| context_token: String, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #[pin_project] | ||||||||||||||||||||||
| pub struct Resolution { | ||||||||||||||||||||||
| #[pin] | ||||||||||||||||||||||
| inner: grpc::Streaming<api::Update>, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // === impl Resolver === | ||||||||||||||||||||||
| // === impl Resolve === | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl<S> Resolve<S> | ||||||||||||||||||||||
| where | ||||||||||||||||||||||
|
|
@@ -65,6 +58,9 @@ where | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| type UpdatesStream = | ||||||||||||||||||||||
| Pin<Box<dyn Stream<Item = Result<Update<Metadata>, grpc::Status>> + Send + 'static>>; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl<T, S> Service<T> for Resolve<S> | ||||||||||||||||||||||
| where | ||||||||||||||||||||||
| T: ToString, | ||||||||||||||||||||||
|
|
@@ -75,10 +71,9 @@ where | |||||||||||||||||||||
| <S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send, | ||||||||||||||||||||||
| S::Future: Send, | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| type Response = Resolution; | ||||||||||||||||||||||
| type Response = UpdatesStream; | ||||||||||||||||||||||
| type Error = grpc::Status; | ||||||||||||||||||||||
| type Future = | ||||||||||||||||||||||
| Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | ||||||||||||||||||||||
| type Future = futures::future::Ready<Result<Self::Response, Self::Error>>; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||||||||||||||||||||||
| // The future returned by the Tonic generated `DestinationClient`'s `get` method will drive the service to readiness before calling it, so we can always return `Ready` here. | ||||||||||||||||||||||
|
|
@@ -88,69 +83,71 @@ where | |||||||||||||||||||||
| fn call(&mut self, target: T) -> Self::Future { | ||||||||||||||||||||||
| let path = target.to_string(); | ||||||||||||||||||||||
| debug!(dst = %path, context = %self.context_token, "Resolving"); | ||||||||||||||||||||||
| let mut svc = self.service.clone(); | ||||||||||||||||||||||
| let req = api::GetDestination { | ||||||||||||||||||||||
| path, | ||||||||||||||||||||||
| scheme: self.scheme.clone(), | ||||||||||||||||||||||
| context_token: self.context_token.clone(), | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| Box::pin(async move { | ||||||||||||||||||||||
| let rsp = svc.get(grpc::Request::new(req)).await?; | ||||||||||||||||||||||
| trace!(metadata = ?rsp.metadata()); | ||||||||||||||||||||||
| Ok(Resolution { | ||||||||||||||||||||||
| inner: rsp.into_inner(), | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| future::ok(Box::pin(resolution(self.service.clone(), req))) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl Stream for Resolution { | ||||||||||||||||||||||
| type Item = Result<resolve::Update<Metadata>, grpc::Status>; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||||||||||||||||||||||
| let mut this = self.project(); | ||||||||||||||||||||||
| loop { | ||||||||||||||||||||||
| match ready!(this.inner.as_mut().poll_next(cx)) { | ||||||||||||||||||||||
| Some(update) => match update?.update { | ||||||||||||||||||||||
| Some(api::update::Update::Add(api::WeightedAddrSet { | ||||||||||||||||||||||
| addrs, | ||||||||||||||||||||||
| metric_labels, | ||||||||||||||||||||||
| })) => { | ||||||||||||||||||||||
| let addr_metas = addrs | ||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||
| .filter_map(|addr| pb::to_addr_meta(addr, &metric_labels)) | ||||||||||||||||||||||
| .collect::<Vec<_>>(); | ||||||||||||||||||||||
| if !addr_metas.is_empty() { | ||||||||||||||||||||||
| debug!(endpoints = %addr_metas.len(), "Add"); | ||||||||||||||||||||||
| return Poll::Ready(Some(Ok(Update::Add(addr_metas)))); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| fn resolution<S>( | ||||||||||||||||||||||
| mut client: DestinationClient<S>, | ||||||||||||||||||||||
| req: api::GetDestination, | ||||||||||||||||||||||
| ) -> impl Stream<Item = Result<resolve::Update<Metadata>, grpc::Status>> | ||||||||||||||||||||||
| where | ||||||||||||||||||||||
| S: GrpcService<BoxBody> + Clone + Send + 'static, | ||||||||||||||||||||||
| S::Error: Into<Box<dyn Error + Send + Sync>> + Send, | ||||||||||||||||||||||
| S::ResponseBody: Send, | ||||||||||||||||||||||
| <S::ResponseBody as Body>::Data: Send, | ||||||||||||||||||||||
| <S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send, | ||||||||||||||||||||||
| S::Future: Send, | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| try_stream! { | ||||||||||||||||||||||
| let rsp = client.get(grpc::Request::new(req)).await?; | ||||||||||||||||||||||
| trace!(metadata = ?rsp.metadata()); | ||||||||||||||||||||||
| let mut stream = rsp.into_inner(); | ||||||||||||||||||||||
| while let Some(update) = stream.next().await { | ||||||||||||||||||||||
| match update?.update { | ||||||||||||||||||||||
| Some(api::update::Update::Add(api::WeightedAddrSet { | ||||||||||||||||||||||
| addrs, | ||||||||||||||||||||||
| metric_labels, | ||||||||||||||||||||||
| })) => { | ||||||||||||||||||||||
| let addr_metas = addrs | ||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||
| .filter_map(|addr| pb::to_addr_meta(addr, &metric_labels)) | ||||||||||||||||||||||
| .collect::<Vec<_>>(); | ||||||||||||||||||||||
| if !addr_metas.is_empty() { | ||||||||||||||||||||||
| debug!(endpoints = %addr_metas.len(), "Add"); | ||||||||||||||||||||||
| yield Update::Add(addr_metas); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Some(api::update::Update::Remove(api::AddrSet { addrs })) => { | ||||||||||||||||||||||
| let sock_addrs = addrs | ||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||
| .filter_map(pb::to_sock_addr) | ||||||||||||||||||||||
| .collect::<Vec<_>>(); | ||||||||||||||||||||||
| if !sock_addrs.is_empty() { | ||||||||||||||||||||||
| debug!(endpoints = %sock_addrs.len(), "Remove"); | ||||||||||||||||||||||
| return Poll::Ready(Some(Ok(Update::Remove(sock_addrs)))); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Some(api::update::Update::Remove(api::AddrSet { addrs })) => { | ||||||||||||||||||||||
| let sock_addrs = addrs | ||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||
| .filter_map(pb::to_sock_addr) | ||||||||||||||||||||||
| .collect::<Vec<_>>(); | ||||||||||||||||||||||
| if !sock_addrs.is_empty() { | ||||||||||||||||||||||
| debug!(endpoints = %sock_addrs.len(), "Remove"); | ||||||||||||||||||||||
| yield Update::Remove(sock_addrs); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => { | ||||||||||||||||||||||
| info!("No endpoints"); | ||||||||||||||||||||||
| let update = if exists { | ||||||||||||||||||||||
| Update::Empty | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| Update::DoesNotExist | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| return Poll::Ready(Some(Ok(update.into()))); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => { | ||||||||||||||||||||||
| info!("No endpoints"); | ||||||||||||||||||||||
| let update = if exists { | ||||||||||||||||||||||
| Update::Empty | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| Update::DoesNotExist | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| yield update.into(); | ||||||||||||||||||||||
|
Comment on lines
+141
to
+146
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, take it or leave it: can this just be
Suggested change
not a blocker.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be unclear on the semantics of yield, but this at least seems like it would always yield DNE after yielding Empty? In either case, the original shows the intent much more clearly imo. |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| None => {} // continue | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| None => return Poll::Ready(None), | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| None => {} // continue | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -158,8 +158,10 @@ where | |
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<Change<D::Key, E::Response>, Error>>> { | ||
| if let Poll::Ready(key) = self.poll_removals(cx) { | ||
| return Poll::Ready(Some(Ok(Change::Remove(key?)))); | ||
| if let Poll::Ready(result) = self.poll_removals(cx) { | ||
| if let Some(key) = result? { | ||
| return Poll::Ready(Some(Ok(Change::Remove(key)))); | ||
| } | ||
| } | ||
|
|
||
| if let Poll::Ready(Some(res)) = self.project().make_futures.poll_next(cx) { | ||
|
|
@@ -182,21 +184,21 @@ where | |
| fn poll_removals( | ||
| self: &mut Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Result<D::Key, Error>> { | ||
| ) -> Poll<Result<Option<D::Key>, Error>> { | ||
| loop { | ||
| let mut this = self.as_mut().project(); | ||
| if let Some(key) = this.pending_removals.pop() { | ||
| this.make_futures.remove(&key); | ||
| return Poll::Ready(Ok(key)); | ||
| return Poll::Ready(Ok(Some(key))); | ||
| } | ||
|
|
||
| // Before polling the resolution, where we could potentially receive | ||
| // an `Add`, poll_ready to ensure that `make` is ready to build new | ||
| // services. Don't process any updates until we can do so. | ||
| ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?; | ||
|
|
||
| if let Some(change) = ready!(this.discover.poll_discover(cx)) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hawkw This is where it was hanging.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahhh, glad you found that! |
||
| match change.map_err(Into::into)? { | ||
| match ready!(this.discover.poll_discover(cx)) { | ||
| Some(change) => match change.map_err(Into::into)? { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take it or leave it: we could maybe collapse the two matches into a single match, like this: match ready!(this.discover.poll_discover(cx)) {
Some(Ok(Change::Insert(key, target))) =>
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
let fut = this.make_endpoint.call(target);
this.make_futures.push(key, fut);
},
Some(Ok(Change::Remove(key))) => {
this.pending_removals.push(key);
},
Some(Err(e)) => return Poll::Ready(Err(e.into())),
None => return Poll::Ready(Ok(None)),
}which might be a little easier to read...what do you think? Is this clearer? |
||
| Change::Insert(key, target) => { | ||
| // Start building the service and continue. If a pending | ||
| // service exists for this addr, it will be canceled. | ||
|
|
@@ -206,7 +208,9 @@ where | |
| Change::Remove(key) => { | ||
| this.pending_removals.push(key); | ||
| } | ||
| } | ||
| }, | ||
|
|
||
| None => return Poll::Ready(Ok(None)), | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.