@@ -3,10 +3,12 @@ use crate::core::resolve::{self, Update};
33use crate :: metadata:: Metadata ;
44use crate :: pb;
55use api:: destination_client:: DestinationClient ;
6+ use async_stream:: try_stream;
7+ use futures:: future;
8+ use futures:: stream:: StreamExt ;
69use futures:: Stream ;
710use http_body:: Body as HttpBody ;
811use std:: error:: Error ;
9- use std:: future:: Future ;
1012use std:: pin:: Pin ;
1113use std:: task:: { Context , Poll } ;
1214use tonic:: {
@@ -16,9 +18,6 @@ use tonic::{
1618} ;
1719use tower:: Service ;
1820use tracing:: { debug, info, trace} ;
19- use async_stream:: try_stream;
20- use futures:: pin_mut;
21- use futures:: stream:: StreamExt ;
2221
2322#[ derive( Clone ) ]
2423pub struct Resolve < S > {
6160 }
6261}
6362
64-
65- type UpdatesStream = Pin < Box < dyn Stream < Item = Result < Update < Metadata > , grpc:: Status > > + Send + ' static > > ;
63+ type UpdatesStream =
64+ Pin < Box < dyn Stream < Item = Result < Update < Metadata > , grpc:: Status > > + Send + ' static > > ;
6665
6766impl < T , S > Service < T > for Resolve < S >
6867where
7675{
7776 type Response = resolve:: FromStream < UpdatesStream > ;
7877 type Error = grpc:: Status ;
79- type Future =
80- Pin < Box < dyn Future < Output = Result < Self :: Response , Self :: Error > > + Send + ' static > > ;
78+ type Future = futures:: future:: Ready < Result < Self :: Response , Self :: Error > > ;
8179
8280 fn poll_ready ( & mut self , _: & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
8381 // The future returned by the Tonic generated `DestinationClient`'s `get` method will drive the service to readiness before calling it, so we can always return `Ready` here.
@@ -87,28 +85,37 @@ where
8785 fn call ( & mut self , target : T ) -> Self :: Future {
8886 let path = target. to_string ( ) ;
8987 debug ! ( dst = %path, context = %self . context_token, "Resolving" ) ;
90- let mut svc = self . service . clone ( ) ;
9188 let req = api:: GetDestination {
9289 path,
9390 scheme : self . scheme . clone ( ) ,
9491 context_token : self . context_token . clone ( ) ,
9592 } ;
9693
97- Box :: pin ( async move {
98- let rsp = svc. get ( grpc:: Request :: new ( req) ) . await ?;
99- trace ! ( metadata = ?rsp. metadata( ) ) ;
100- Ok ( resolve:: from_stream :: < UpdatesStream > ( Box :: pin ( resolution ( rsp. into_inner ( ) ) ) ) )
101- } )
94+ future:: ok ( resolve:: from_stream :: < UpdatesStream > ( Box :: pin ( resolution (
95+ self . service . clone ( ) ,
96+ req,
97+ ) ) ) )
10298 }
10399}
104100
105-
106- fn resolution < S : Stream < Item = Result < api:: Update , grpc:: Status > > + Send + Sync + ' static > ( input : S )
107- -> impl Stream < Item = Result < resolve:: Update < Metadata > , grpc:: Status > >
101+ fn resolution < S > (
102+ mut client : DestinationClient < S > ,
103+ req : api:: GetDestination ,
104+ ) -> impl Stream < Item = Result < resolve:: Update < Metadata > , grpc:: Status > >
105+ where
106+ S : GrpcService < BoxBody > + Clone + Send + ' static ,
107+ S :: Error : Into < Box < dyn Error + Send + Sync > > + Send ,
108+ S :: ResponseBody : Send ,
109+ <S :: ResponseBody as Body >:: Data : Send ,
110+ <S :: ResponseBody as HttpBody >:: Error : Into < Box < dyn Error + Send + Sync + ' static > > + Send ,
111+ S :: Future : Send ,
108112{
109113 try_stream ! {
110- pin_mut!( input) ;
111- while let Some ( update) = input. next( ) . await {
114+ let rsp = client. get( grpc:: Request :: new( req) ) . await ?;
115+ trace!( metadata = ?rsp. metadata( ) ) ;
116+ let mut stream = rsp. into_inner( ) ;
117+
118+ while let Some ( update) = stream. next( ) . await {
112119 match update?. update {
113120 Some ( api:: update:: Update :: Add ( api:: WeightedAddrSet {
114121 addrs,
0 commit comments