Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions linkerd/app/core/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ pub struct Config {

pub struct Dns {
pub resolver: Resolver,
pub task: Task,
}

// === impl Config ===

impl Config {
pub fn build(self) -> Dns {
let (resolver, task) =
let resolver =
Resolver::from_system_config_with(&self).expect("system DNS config must be valid");
Dns { resolver, task }
Dns { resolver }
}
}

Expand Down
6 changes: 0 additions & 6 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct Config {
pub struct App {
admin: admin::Admin,
drain: drain::Signal,
dns: dns::Task,
dst: ControlAddr,
identity: identity::Identity,
inbound_addr: SocketAddr,
Expand Down Expand Up @@ -223,7 +222,6 @@ impl Config {
admin,
dst: dst_addr,
drain: drain_tx,
dns: dns.task,
identity,
inbound_addr,
oc_collector,
Expand Down Expand Up @@ -283,7 +281,6 @@ impl App {
let App {
admin,
drain,
dns,
identity,
oc_collector,
start_proxy,
Expand Down Expand Up @@ -340,9 +337,6 @@ impl App {
admin.latch.release()
}

// Spawn the DNS resolver background task.
tokio::spawn(dns.instrument(info_span!("dns")));

if let tap::Tap::Enabled {
registry, serve, ..
} = tap
Expand Down
93 changes: 18 additions & 75 deletions linkerd/dns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ pub use linkerd2_dns_name::{InvalidName, Name, Suffix};
use std::future::Future;
use std::pin::Pin;
use std::{fmt, net};
use tokio::sync::{mpsc, oneshot};
use tracing::{info_span, trace, Span};
use tracing::{info_span, trace};
use tracing_futures::Instrument;
pub use trust_dns_resolver::config::ResolverOpts;
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
use trust_dns_resolver::lookup_ip::LookupIp;
use trust_dns_resolver::{config::ResolverConfig, system_conf, AsyncResolver};
use trust_dns_resolver::{
config::ResolverConfig, lookup_ip::LookupIp, system_conf, AsyncResolver, TokioAsyncResolver,
};
pub use trust_dns_resolver::{
config::ResolverOpts,
error::{ResolveError, ResolveErrorKind},
};

#[derive(Clone)]
pub struct Resolver {
tx: mpsc::UnboundedSender<ResolveRequest>,
dns: TokioAsyncResolver,
}

pub trait ConfigureResolver {
Expand All @@ -28,19 +30,10 @@ pub trait ConfigureResolver {
pub enum Error {
NoAddressesFound,
ResolutionFailed(ResolveError),
TaskLost,
}

pub type Task = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

pub type IpAddrFuture = Pin<Box<dyn Future<Output = Result<net::IpAddr, Error>> + Send + 'static>>;

struct ResolveRequest {
name: Name,
result_tx: oneshot::Sender<Result<LookupIp, ResolveError>>,
span: tracing::Span,
}

impl Resolver {
/// Construct a new `Resolver` from environment variables and system
/// configuration.
Expand All @@ -51,61 +44,24 @@ impl Resolver {
/// could not be parsed.
///
/// TODO: This should be infallible like it is in the `domain` crate.
pub fn from_system_config_with<C: ConfigureResolver>(
c: &C,
) -> Result<(Self, Task), ResolveError> {
pub fn from_system_config_with<C: ConfigureResolver>(c: &C) -> Result<Self, ResolveError> {
let (config, mut opts) = system_conf::read_system_conf()?;
c.configure_resolver(&mut opts);
trace!("DNS config: {:?}", &config);
trace!("DNS opts: {:?}", &opts);
Self::new(config, opts)
Ok(Self::new(config, opts))
}

pub fn new(
config: ResolverConfig,
mut opts: ResolverOpts,
) -> Result<(Self, Task), ResolveError> {
pub fn new(config: ResolverConfig, mut opts: ResolverOpts) -> Self {
// Disable Trust-DNS's caching.
opts.cache_size = 0;

// XXX(eliza): figure out an appropriate bound for the channel...
let (tx, mut rx) = mpsc::unbounded_channel();
let task = Box::pin(async move {
let resolver = match AsyncResolver::tokio(config, opts) {
Ok(resolver) => resolver,
Err(e) => unreachable!("constructing resolver should not fail: {}", e),
};
while let Some(ResolveRequest {
name,
result_tx,
span,
}) = rx.recv().await
{
let resolver = resolver.clone();
tokio::spawn(
async move {
let res = resolver.lookup_ip(name.as_ref()).await;
if result_tx.send(res).is_err() {
tracing::debug!("resolution canceled");
}
}
.instrument(span),
);
}
tracing::debug!("all resolver handles dropped; terminating.");
});
Ok((Resolver { tx }, task))
let dns = AsyncResolver::tokio(config, opts).expect("Resolver must be valid");
Resolver { dns }
}

async fn lookup_ip(&self, name: Name, span: Span) -> Result<LookupIp, Error> {
let (result_tx, rx) = oneshot::channel();
self.tx.send(ResolveRequest {
name,
result_tx,
span,
})?;
let ips = rx.await??;
Ok(ips)
async fn lookup_ip(&self, name: Name) -> Result<LookupIp, Error> {
let lookup = self.dns.lookup_ip(name.as_ref()).await?;
Ok(lookup)
}

pub fn resolve_one_ip(
Expand All @@ -116,7 +72,7 @@ impl Resolver {
let resolver = self.clone();
Box::pin(async move {
let span = info_span!("resolve_one_ip", %name);
let ips = resolver.lookup_ip(name, span).await?;
let ips = resolver.lookup_ip(name).instrument(span).await?;
ips.iter().next().ok_or_else(|| Error::NoAddressesFound)
})
}
Expand All @@ -137,18 +93,6 @@ impl fmt::Debug for Resolver {
}
}

impl<T> From<mpsc::error::SendError<T>> for Error {
fn from(_: mpsc::error::SendError<T>) -> Self {
Self::TaskLost
}
}

impl From<oneshot::error::RecvError> for Error {
fn from(_: oneshot::error::RecvError) -> Self {
Self::TaskLost
}
}

impl From<ResolveError> for Error {
fn from(e: ResolveError) -> Self {
Self::ResolutionFailed(e)
Expand All @@ -160,7 +104,6 @@ impl fmt::Display for Error {
match self {
Self::NoAddressesFound => f.pad("no addresses found"),
Self::ResolutionFailed(e) => fmt::Display::fmt(e, f),
Self::TaskLost => f.pad("background task terminated unexpectedly"),
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions linkerd/dns/src/refine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::net::IpAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tracing_futures::Instrument;
use trust_dns_resolver::lookup_ip::LookupIp;

/// A `MakeService` that produces a `Refine` for a given name.
Expand Down Expand Up @@ -54,12 +55,11 @@ impl tower::Service<()> for Refine {
loop {
self.state = match self.state {
State::Init => {
let resolver = self.resolver.clone();
let name = self.name.clone();
let span = tracing::Span::current();
State::Pending(Box::pin(
async move { resolver.lookup_ip(name, span).await },
))
let dns = self.resolver.clone();
State::Pending(Box::pin(async move {
dns.lookup_ip(name).in_current_span().await
}))
}
State::Pending(ref mut fut) => {
let lookup = ready!(fut.as_mut().poll(cx))?;
Expand Down