Skip to content

Commit ce97e07

Browse files
authored
implement GEP-1742 timeouts in the policy controller (#10975)
PR #10969 adds support for the GEP-1742 `timeouts` field to the HTTPRoute CRD. This branch implements actual support for these fields in the policy controller. The timeout fields are now read and used to set the timeout fields added to the proxy-api in linkerd/linkerd2-proxy-api#243. In addition, I've added code to ensure that the timeout fields are parsed correctly when a JSON manifest is deserialized. The current implementation represents timeouts in the bindings as a Rust `std::time::Duration` type. `Duration` does implement `serde::Deserialize` and `serde::Serialize`, but its serialization implementation attempts to (de)serialize it as a struct consisting of a number of seconds and a number of subsecond nanoseconds. The timeout fields are instead supposed to be represented as strings in the Go standard library's `time.ParseDuration` format. Therefore, I've added a newtype which wraps the Rust `std::time::Duration` and implements the same parsing logic as Go. Eventually, I'd like to upstream the implementation of this to `kube-rs`; see kube-rs/kube#1222 for details. Depends on #10969 Depends on linkerd/linkerd2-proxy-api#243 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
1 parent 08a1f77 commit ce97e07

File tree

10 files changed

+499
-57
lines changed

10 files changed

+499
-57
lines changed

Cargo.lock

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ dependencies = [
13041304
"linkerd-policy-controller-core",
13051305
"linkerd2-proxy-api",
13061306
"maplit",
1307+
"prost-types",
13071308
"tokio",
13081309
"tonic",
13091310
"tracing",
@@ -1394,9 +1395,9 @@ dependencies = [
13941395

13951396
[[package]]
13961397
name = "linkerd2-proxy-api"
1397-
version = "0.9.0"
1398+
version = "0.10.0"
13981399
source = "registry+https://github.com/rust-lang/crates.io-index"
1399-
checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d"
1400+
checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14"
14001401
dependencies = [
14011402
"http",
14021403
"ipnet",
@@ -1851,19 +1852,19 @@ dependencies = [
18511852

18521853
[[package]]
18531854
name = "prost"
1854-
version = "0.11.8"
1855+
version = "0.11.9"
18551856
source = "registry+https://github.com/rust-lang/crates.io-index"
1856-
checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537"
1857+
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
18571858
dependencies = [
18581859
"bytes",
18591860
"prost-derive",
18601861
]
18611862

18621863
[[package]]
18631864
name = "prost-derive"
1864-
version = "0.11.8"
1865+
version = "0.11.9"
18651866
source = "registry+https://github.com/rust-lang/crates.io-index"
1866-
checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b"
1867+
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
18671868
dependencies = [
18681869
"anyhow",
18691870
"itertools",
@@ -1874,9 +1875,9 @@ dependencies = [
18741875

18751876
[[package]]
18761877
name = "prost-types"
1877-
version = "0.11.8"
1878+
version = "0.11.9"
18781879
source = "registry+https://github.com/rust-lang/crates.io-index"
1879-
checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88"
1880+
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
18801881
dependencies = [
18811882
"prost",
18821883
]

policy-controller/core/src/outbound.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub struct HttpRoute {
4242
pub struct HttpRouteRule {
4343
pub matches: Vec<HttpRouteMatch>,
4444
pub backends: Vec<Backend>,
45+
pub request_timeout: Option<time::Duration>,
46+
pub backend_request_timeout: Option<time::Duration>,
4547
}
4648

4749
#[derive(Clone, Debug, PartialEq, Eq)]

policy-controller/grpc/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ hyper = { version = "0.14", features = ["http2", "server", "tcp"] }
1414
futures = { version = "0.3", default-features = false }
1515
linkerd-policy-controller-core = { path = "../core" }
1616
maplit = "1"
17+
prost-types = "0.11.9"
1718
tokio = { version = "1", features = ["macros"] }
1819
tonic = { version = "0.8", default-features = false }
1920
tracing = "0.1"
2021

2122
[dependencies.linkerd2-proxy-api]
22-
version = "0.9"
23+
version = "0.10"
2324
features = [
2425
"inbound",
2526
"outbound",

policy-controller/grpc/src/outbound.rs

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -237,20 +237,14 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
237237
outbound::failure_accrual::ConsecutiveFailures {
238238
max_failures,
239239
backoff: Some(outbound::ExponentialBackoff {
240-
min_backoff: backoff
241-
.min_penalty
242-
.try_into()
243-
.map_err(|error| {
244-
tracing::error!(?error, "invalid min_backoff")
245-
})
246-
.ok(),
247-
max_backoff: backoff
248-
.max_penalty
249-
.try_into()
250-
.map_err(|error| {
251-
tracing::error!(?error, "invalid max_backoff")
252-
})
253-
.ok(),
240+
min_backoff: convert_duration(
241+
"min_backoff",
242+
backoff.min_penalty,
243+
),
244+
max_backoff: convert_duration(
245+
"max_backoff",
246+
backoff.max_penalty,
247+
),
254248
jitter_ratio: backoff.jitter,
255249
}),
256250
},
@@ -324,31 +318,43 @@ fn convert_outbound_http_route(
324318

325319
let rules = rules
326320
.into_iter()
327-
.map(|HttpRouteRule { matches, backends }| {
328-
let backends = backends
329-
.into_iter()
330-
.map(convert_http_backend)
331-
.collect::<Vec<_>>();
332-
let dist = if backends.is_empty() {
333-
outbound::http_route::distribution::Kind::FirstAvailable(
334-
outbound::http_route::distribution::FirstAvailable {
335-
backends: vec![outbound::http_route::RouteBackend {
336-
backend: Some(backend.clone()),
337-
filters: vec![],
338-
}],
339-
},
340-
)
341-
} else {
342-
outbound::http_route::distribution::Kind::RandomAvailable(
343-
outbound::http_route::distribution::RandomAvailable { backends },
344-
)
345-
};
346-
outbound::http_route::Rule {
347-
matches: matches.into_iter().map(http_route::convert_match).collect(),
348-
backends: Some(outbound::http_route::Distribution { kind: Some(dist) }),
349-
filters: Default::default(),
350-
}
351-
})
321+
.map(
322+
|HttpRouteRule {
323+
matches,
324+
backends,
325+
request_timeout,
326+
backend_request_timeout,
327+
}| {
328+
let backend_request_timeout = backend_request_timeout
329+
.and_then(|d| convert_duration("backend request_timeout", d));
330+
let backends = backends
331+
.into_iter()
332+
.map(|backend| convert_http_backend(backend_request_timeout.clone(), backend))
333+
.collect::<Vec<_>>();
334+
let dist = if backends.is_empty() {
335+
outbound::http_route::distribution::Kind::FirstAvailable(
336+
outbound::http_route::distribution::FirstAvailable {
337+
backends: vec![outbound::http_route::RouteBackend {
338+
backend: Some(backend.clone()),
339+
filters: vec![],
340+
request_timeout: backend_request_timeout,
341+
}],
342+
},
343+
)
344+
} else {
345+
outbound::http_route::distribution::Kind::RandomAvailable(
346+
outbound::http_route::distribution::RandomAvailable { backends },
347+
)
348+
};
349+
outbound::http_route::Rule {
350+
matches: matches.into_iter().map(http_route::convert_match).collect(),
351+
backends: Some(outbound::http_route::Distribution { kind: Some(dist) }),
352+
filters: Default::default(),
353+
request_timeout: request_timeout
354+
.and_then(|d| convert_duration("request timeout", d)),
355+
}
356+
},
357+
)
352358
.collect();
353359

354360
outbound::HttpRoute {
@@ -358,7 +364,10 @@ fn convert_outbound_http_route(
358364
}
359365
}
360366

361-
fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend {
367+
fn convert_http_backend(
368+
request_timeout: Option<prost_types::Duration>,
369+
backend: Backend,
370+
) -> outbound::http_route::WeightedRouteBackend {
362371
match backend {
363372
Backend::Addr(addr) => {
364373
let socket_addr = SocketAddr::new(addr.addr, addr.port.get());
@@ -377,6 +386,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
377386
)),
378387
}),
379388
filters: Default::default(),
389+
request_timeout,
380390
}),
381391
}
382392
}
@@ -409,6 +419,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
409419
)),
410420
}),
411421
filters: Default::default(),
422+
request_timeout,
412423
}),
413424
},
414425
Backend::Invalid { weight, message } => outbound::http_route::WeightedRouteBackend {
@@ -430,6 +441,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
430441
},
431442
)),
432443
}],
444+
request_timeout,
433445
}),
434446
},
435447
}
@@ -473,11 +485,13 @@ fn default_outbound_http_route(backend: outbound::Backend) -> outbound::HttpRout
473485
backends: vec![outbound::http_route::RouteBackend {
474486
backend: Some(backend),
475487
filters: vec![],
488+
request_timeout: None,
476489
}],
477490
},
478491
)),
479492
}),
480493
filters: Default::default(),
494+
request_timeout: None,
481495
}];
482496
outbound::HttpRoute {
483497
metadata,
@@ -529,3 +543,12 @@ fn default_queue_config() -> outbound::Queue {
529543
),
530544
}
531545
}
546+
547+
fn convert_duration(name: &'static str, duration: time::Duration) -> Option<prost_types::Duration> {
548+
duration
549+
.try_into()
550+
.map_err(|error| {
551+
tracing::error!(%error, "Invalid {name} duration");
552+
})
553+
.ok()
554+
}

0 commit comments

Comments
 (0)