Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 104 additions & 32 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ use crate::{
classify, config, dns, identity, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr,
Error,
};
use futures::future::Either;
use linkerd_metrics::prom;
use std::fmt;
use tokio::time;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tracing::{info_span, warn};
use tracing::info_span;

#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -106,26 +104,6 @@ impl Config {
let addr = self.addr;
tracing::trace!(%addr, "Building");

// When a DNS resolution fails, log the error and use the TTL, if there
// is one, to drive re-resolution attempts.
let resolve_backoff = {
let backoff = self.connect.backoff;
move |error: Error| {
warn!(error, "Failed to resolve control-plane component");
if let Some(e) = crate::errors::cause_ref::<dns::ResolveError>(&*error) {
if let Some(ttl) = e.negative_ttl() {
return Ok::<_, Error>(Either::Left(
IntervalStream::new(time::interval(ttl)).map(|_| ()),
));
}
}

// If the error didn't give us a TTL, use the default jittered
// backoff.
Ok(Either::Right(backoff.stream()))
}
};

let client = svc::stack(ConnectTcp::new(
self.connect.keepalive,
self.connect.user_timeout,
Expand All @@ -151,7 +129,11 @@ impl Config {

let balance = endpoint
.lift_new()
.push(self::balance::layer(metrics.balance, dns, resolve_backoff))
.push(self::balance::layer(
metrics.balance,
dns,
self.connect.backoff,
))
.push(legacy_metrics.to_layer::<classify::Response, _, _>())
.push(classify::NewClassify::layer_default());

Expand Down Expand Up @@ -251,20 +233,24 @@ mod balance {
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
svc, tls,
};
use futures::Stream;
use linkerd_error::Recover;
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
use linkerd_stack::ExtractParam;
use std::net::SocketAddr;
use std::{net::SocketAddr, pin::Pin, task};
use tokio_stream::wrappers::IntervalStream;

pub(super) type Metrics = http::balance::MetricFamilies<Labels>;

pub fn layer<B, R: Clone, N>(
pub(super) type Resolve = recover::Resolve<ResolveRecover, DnsResolve>;

pub fn layer<B, N>(
metrics: Metrics,
dns: dns::Resolver,
recover: R,
) -> impl svc::Layer<
N,
Service = http::NewBalance<B, Params, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
> {
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
backoff: ExponentialBackoff,
) -> impl svc::Layer<N, Service = http::NewBalance<B, Params, Resolve, NewIntoTarget<N>>> {
let resolve = recover::Resolve::new(ResolveRecover::new(backoff), DnsResolve::new(dns));

svc::layer::mk(move |inner| {
http::NewBalance::new(
NewIntoTarget { inner },
Expand Down Expand Up @@ -293,6 +279,92 @@ mod balance {
addr: String,
}

/// A [`Recover`] implementation that respects DNS resolution errors.
///
/// When a DNS resolution fails, this will log the error and use the TTL, if there is one,
/// to drive re-resolution attempts. It defaults to an exponential backoff with jitter for
/// other errors.
#[derive(Clone)]
pub(super) struct ResolveRecover {
backoff: ExponentialBackoff,
}

/// A [`Stream`] used for control-plane client's error recovery.
#[pin_project::pin_project(project = ResolveBackoffProj)]
pub(super) enum ResolveBackoff {
/// A DNS-resolution error occurred.
///
/// This will backoff at a regular interval according to the negative TTL.
NegativeTtl(#[pin] IntervalStream),
/// A jittered exponential backoff.
ExponentialBackoff(#[pin] ExponentialBackoffStream),
}

// === impl ResolveRecover ===

impl ResolveRecover {
/// Returns a new [`ResolveRecover`].
pub fn new(config: ExponentialBackoff) -> Self {
Self { backoff: config }
}
}

impl Recover for ResolveRecover {
type Backoff = ResolveBackoff;
fn recover(
&self,
error: linkerd_error::Error,
) -> Result<Self::Backoff, linkerd_error::Error> {
let ResolveRecover { backoff } = self;

tracing::warn!(error, "Failed to resolve control-plane component");

// If we are recovering due to a DNS resolution error, check for a negative TTL.
if let Some(e) = crate::errors::cause_ref::<dns::ResolveError>(&*error) {
if let Some(ttl) = e.negative_ttl() {
let interval = tokio::time::interval(ttl);
let stream = IntervalStream::new(interval);
return Ok(ResolveBackoff::NegativeTtl(stream));
}
}
Comment on lines +322 to +329
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just keeping the existing behavior, but isn't tokio::time::interval() going to fire immediately so we'll retry right away?


// If the error didn't give us a TTL, use the default jittered backoff.
Ok(ResolveBackoff::ExponentialBackoff(backoff.stream()))
}
}

// === impl ResolveBackoff ===

impl Stream for ResolveBackoff {
/// No items are yielded in this stream.
type Item = ();

fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use ResolveBackoffProj::{ExponentialBackoff, NegativeTtl};

match self.project() {
NegativeTtl(stream) => {
// We can discard the `Instant`s that IntervalStream yields.
let discard = |opt: Option<_>| opt.map(drop);
stream.poll_next(cx).map(discard)
}
ExponentialBackoff(stream) => stream.poll_next(cx),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
use ResolveBackoff::{ExponentialBackoff, NegativeTtl};

match self {
NegativeTtl(stream) => stream.size_hint(),
ExponentialBackoff(stream) => stream.size_hint(),
}
}
}

// === impl NewIntoTarget ===

impl<N: svc::NewService<ControlAddr>> svc::NewService<ControlAddr> for NewIntoTarget<N> {
Expand Down
Loading