diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 51e589dc09..3a739f5a0a 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -2,12 +2,10 @@ use crate::{ classify, config, dns, identity, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr, Error, }; -use futures::future::Either; use linkerd_metrics::prom; use std::fmt; use tokio::time; -use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use tracing::{info_span, warn}; +use tracing::info_span; #[derive(Clone, Debug)] pub struct Config { @@ -106,26 +104,6 @@ impl Config { let addr = self.addr; tracing::trace!(%addr, "Building"); - // When a DNS resolution fails, log the error and use the TTL, if there - // is one, to drive re-resolution attempts. - let resolve_backoff = { - let backoff = self.connect.backoff; - move |error: Error| { - warn!(error, "Failed to resolve control-plane component"); - if let Some(e) = crate::errors::cause_ref::(&*error) { - if let Some(ttl) = e.negative_ttl() { - return Ok::<_, Error>(Either::Left( - IntervalStream::new(time::interval(ttl)).map(|_| ()), - )); - } - } - - // If the error didn't give us a TTL, use the default jittered - // backoff. - Ok(Either::Right(backoff.stream())) - } - }; - let client = svc::stack(ConnectTcp::new( self.connect.keepalive, self.connect.user_timeout, @@ -151,7 +129,11 @@ impl Config { let balance = endpoint .lift_new() - .push(self::balance::layer(metrics.balance, dns, resolve_backoff)) + .push(self::balance::layer( + metrics.balance, + dns, + self.connect.backoff, + )) .push(legacy_metrics.to_layer::()) .push(classify::NewClassify::layer_default()); @@ -251,20 +233,24 @@ mod balance { proxy::{dns_resolve::DnsResolve, http, resolve::recover}, svc, tls, }; + use futures::Stream; + use linkerd_error::Recover; + use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; use linkerd_stack::ExtractParam; - use std::net::SocketAddr; + use std::{net::SocketAddr, pin::Pin, task}; + use tokio_stream::wrappers::IntervalStream; pub(super) type Metrics = http::balance::MetricFamilies; - pub fn layer( + pub(super) type Resolve = recover::Resolve; + + pub fn layer( metrics: Metrics, dns: dns::Resolver, - recover: R, - ) -> impl svc::Layer< - N, - Service = http::NewBalance, NewIntoTarget>, - > { - let resolve = recover::Resolve::new(recover, DnsResolve::new(dns)); + backoff: ExponentialBackoff, + ) -> impl svc::Layer>> { + let resolve = recover::Resolve::new(ResolveRecover::new(backoff), DnsResolve::new(dns)); + svc::layer::mk(move |inner| { http::NewBalance::new( NewIntoTarget { inner }, @@ -293,6 +279,92 @@ mod balance { addr: String, } + /// A [`Recover`] implementation that respects DNS resolution errors. + /// + /// When a DNS resolution fails, this will log the error and use the TTL, if there is one, + /// to drive re-resolution attempts. It defaults to an exponential backoff with jitter for + /// other errors. + #[derive(Clone)] + pub(super) struct ResolveRecover { + backoff: ExponentialBackoff, + } + + /// A [`Stream`] used for control-plane client's error recovery. + #[pin_project::pin_project(project = ResolveBackoffProj)] + pub(super) enum ResolveBackoff { + /// A DNS-resolution error occurred. + /// + /// This will backoff at a regular interval according to the negative TTL. + NegativeTtl(#[pin] IntervalStream), + /// A jittered exponential backoff. + ExponentialBackoff(#[pin] ExponentialBackoffStream), + } + + // === impl ResolveRecover === + + impl ResolveRecover { + /// Returns a new [`ResolveRecover`]. + pub fn new(config: ExponentialBackoff) -> Self { + Self { backoff: config } + } + } + + impl Recover for ResolveRecover { + type Backoff = ResolveBackoff; + fn recover( + &self, + error: linkerd_error::Error, + ) -> Result { + let ResolveRecover { backoff } = self; + + tracing::warn!(error, "Failed to resolve control-plane component"); + + // If we are recovering due to a DNS resolution error, check for a negative TTL. + if let Some(e) = crate::errors::cause_ref::(&*error) { + if let Some(ttl) = e.negative_ttl() { + let interval = tokio::time::interval(ttl); + let stream = IntervalStream::new(interval); + return Ok(ResolveBackoff::NegativeTtl(stream)); + } + } + + // If the error didn't give us a TTL, use the default jittered backoff. + Ok(ResolveBackoff::ExponentialBackoff(backoff.stream())) + } + } + + // === impl ResolveBackoff === + + impl Stream for ResolveBackoff { + /// No items are yielded in this stream. + type Item = (); + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use ResolveBackoffProj::{ExponentialBackoff, NegativeTtl}; + + match self.project() { + NegativeTtl(stream) => { + // We can discard the `Instant`s that IntervalStream yields. + let discard = |opt: Option<_>| opt.map(drop); + stream.poll_next(cx).map(discard) + } + ExponentialBackoff(stream) => stream.poll_next(cx), + } + } + + fn size_hint(&self) -> (usize, Option) { + use ResolveBackoff::{ExponentialBackoff, NegativeTtl}; + + match self { + NegativeTtl(stream) => stream.size_hint(), + ExponentialBackoff(stream) => stream.size_hint(), + } + } + } + // === impl NewIntoTarget === impl> svc::NewService for NewIntoTarget {