Skip to content

Commit a28ec48

Browse files
committed
Remove infine loop around poll
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
1 parent f3bb641 commit a28ec48

File tree

4 files changed

+5465
-47
lines changed

4 files changed

+5465
-47
lines changed

linkerd/proxy/api-resolve/src/resolve.rs

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -108,50 +108,48 @@ where
108108
S::Future: Send,
109109
{
110110
try_stream! {
111-
let rsp = client.get(grpc::Request::new(req)).await?;
112-
trace!(metadata = ?rsp.metadata());
113-
let mut stream = rsp.into_inner();
114-
loop {
115-
while let Some(update) = stream.next().await {
116-
match update?.update {
117-
Some(api::update::Update::Add(api::WeightedAddrSet {
118-
addrs,
119-
metric_labels,
120-
})) => {
121-
let addr_metas = addrs
122-
.into_iter()
123-
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
124-
.collect::<Vec<_>>();
125-
if !addr_metas.is_empty() {
126-
debug!(endpoints = %addr_metas.len(), "Add");
127-
yield Update::Add(addr_metas);
128-
}
129-
}
130-
131-
Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
132-
let sock_addrs = addrs
133-
.into_iter()
134-
.filter_map(pb::to_sock_addr)
135-
.collect::<Vec<_>>();
136-
if !sock_addrs.is_empty() {
137-
debug!(endpoints = %sock_addrs.len(), "Remove");
138-
yield Update::Remove(sock_addrs);
139-
}
111+
let rsp = client.get(grpc::Request::new(req)).await?;
112+
trace!(metadata = ?rsp.metadata());
113+
let mut stream = rsp.into_inner();
114+
while let Some(update) = stream.next().await {
115+
match update?.update {
116+
Some(api::update::Update::Add(api::WeightedAddrSet {
117+
addrs,
118+
metric_labels,
119+
})) => {
120+
let addr_metas = addrs
121+
.into_iter()
122+
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
123+
.collect::<Vec<_>>();
124+
if !addr_metas.is_empty() {
125+
debug!(endpoints = %addr_metas.len(), "Add");
126+
yield Update::Add(addr_metas);
140127
}
128+
}
141129

142-
Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
143-
info!("No endpoints");
144-
let update = if exists {
145-
Update::Empty
146-
} else {
147-
Update::DoesNotExist
148-
};
149-
yield update.into();
130+
Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
131+
let sock_addrs = addrs
132+
.into_iter()
133+
.filter_map(pb::to_sock_addr)
134+
.collect::<Vec<_>>();
135+
if !sock_addrs.is_empty() {
136+
debug!(endpoints = %sock_addrs.len(), "Remove");
137+
yield Update::Remove(sock_addrs);
150138
}
139+
}
151140

152-
None => {} // continue
141+
Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
142+
info!("No endpoints");
143+
let update = if exists {
144+
Update::Empty
145+
} else {
146+
Update::DoesNotExist
147+
};
148+
yield update.into();
153149
}
150+
151+
None => {} // continue
154152
}
155153
}
156-
}
154+
}
157155
}

linkerd/proxy/discover/src/make_endpoint.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,10 @@ where
158158
mut self: Pin<&mut Self>,
159159
cx: &mut Context<'_>,
160160
) -> Poll<Option<Result<Change<D::Key, E::Response>, Error>>> {
161-
if let Poll::Ready(key) = self.poll_removals(cx) {
162-
return Poll::Ready(Some(Ok(Change::Remove(key?))));
161+
if let Poll::Ready(result) = self.poll_removals(cx) {
162+
if let Some(key) = result? {
163+
return Poll::Ready(Some(Ok(Change::Remove(key))));
164+
}
163165
}
164166

165167
if let Poll::Ready(Some(res)) = self.project().make_futures.poll_next(cx) {
@@ -182,21 +184,21 @@ where
182184
fn poll_removals(
183185
self: &mut Pin<&mut Self>,
184186
cx: &mut Context<'_>,
185-
) -> Poll<Result<D::Key, Error>> {
187+
) -> Poll<Result<Option<D::Key>, Error>> {
186188
loop {
187189
let mut this = self.as_mut().project();
188190
if let Some(key) = this.pending_removals.pop() {
189191
this.make_futures.remove(&key);
190-
return Poll::Ready(Ok(key));
192+
return Poll::Ready(Ok(Some(key)));
191193
}
192194

193195
// Before polling the resolution, where we could potentially receive
194196
// an `Add`, poll_ready to ensure that `make` is ready to build new
195197
// services. Don't process any updates until we can do so.
196198
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?;
197199

198-
if let Some(change) = ready!(this.discover.poll_discover(cx)) {
199-
match change.map_err(Into::into)? {
200+
match ready!(this.discover.poll_discover(cx)) {
201+
Some(change) => match change.map_err(Into::into)? {
200202
Change::Insert(key, target) => {
201203
// Start building the service and continue. If a pending
202204
// service exists for this addr, it will be canceled.
@@ -206,7 +208,9 @@ where
206208
Change::Remove(key) => {
207209
this.pending_removals.push(key);
208210
}
209-
}
211+
},
212+
213+
None => return Poll::Ready(Ok(None)),
210214
}
211215
}
212216
}

linkerd/proxy/resolve/src/recover.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,12 @@ where
214214
backoff: None,
215215
}
216216
}
217-
None => return Poll::Ready(None),
217+
None => {
218+
this.inner.state = State::Recover {
219+
error: Some(Eos(()).into()),
220+
backoff: None,
221+
}
222+
}
218223
}
219224
}
220225
// XXX(eliza): note that this match was originally an `if let`,

0 commit comments

Comments
 (0)