Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 45 additions & 17 deletions policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,6 @@ fn to_proto(
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();

let kind = match &policy.app_protocol {
Some(AppProtocol::Opaque) => {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
}
Some(AppProtocol::Http1) => {
http_routes.sort_by(timestamp_then_name);
http::http1_only_protocol(
Expand All @@ -412,23 +407,56 @@ fn to_proto(
)
}
Some(AppProtocol::Http2) => {
http_routes.sort_by(timestamp_then_name);
http::http2_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();

if !grpc_routes.is_empty() {
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(
backend,
grpc_routes.into_iter(),
accrual,
policy.grpc_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
} else {
http_routes.sort_by(timestamp_then_name);
http::http2_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
}
None | Some(AppProtocol::Unknown(_)) => {
Some(AppProtocol::Opaque) | Some(AppProtocol::Unknown(_)) => {
if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol {
tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\"");
}

let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();

if !tcp_routes.is_empty() {
tcp_routes.sort_by(timestamp_then_name);
tcp::protocol(
backend,
tcp_routes.into_iter(),
&policy.parent_info,
original_dst,
)
} else {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
}
}
None => {
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();
Expand Down
232 changes: 226 additions & 6 deletions policy-test/tests/outbound_api_app_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use futures::StreamExt;
use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_controller_k8s_api::gateway;
use linkerd_policy_test::{
assert_resource_meta, create,
assert_resource_meta, await_route_accepted, create,
outbound_api::{
assert_route_is_default, assert_singleton, http1_routes, http2_routes,
assert_route_is_default, assert_singleton, grpc_routes, http1_routes, http2_routes,
retry_watch_outbound_policy,
},
test_route::TestParent,
test_route::{TestParent, TestRoute},
with_temp_ns,
};

Expand All @@ -21,7 +21,6 @@ async fn opaque_parent() {
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())),
Expand All @@ -48,6 +47,91 @@ async fn opaque_parent() {
test::<k8s::Service>().await;
}

#[cfg(feature = "gateway-api-experimental")]
#[tokio::test(flavor = "current_thread")]
async fn unknown_app_protocol_parent() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("XMPP".to_string())),
)
.await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = linkerd_policy_test::outbound_api::tcp_routes(&config);
let route = assert_singleton(routes);
assert_route_is_default::<gateway::TCPRoute>(route, &parent.obj_ref(), port);
})
.await;
}

test::<k8s::Service>().await;
}

#[cfg(feature = "gateway-api-experimental")]
#[tokio::test(flavor = "current_thread")]
async fn opaque_parent_with_tcp_route() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with TCPRoute.
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("linkerd.io/opaque".to_string())),
)
.await;

let route = create(
&client,
gateway::TCPRoute::make_route(
ns.clone(),
vec![parent.obj_ref()],
vec![vec![parent.backend_ref(port)]],
),
)
.await;
await_route_accepted(&client, &route).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

gateway::TCPRoute::routes(&config, |routes| {
// Only the first TCPRoute should be returned in the config.
assert!(route.meta_eq(gateway::TCPRoute::extract_meta(&routes[0])));
assert_eq!(routes.len(), 1);
});
})
.await;
}

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http1_parent() {
async fn test<P: TestParent>() {
Expand All @@ -57,7 +141,6 @@ async fn http1_parent() {
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("http".to_string())),
Expand All @@ -84,6 +167,52 @@ async fn http1_parent() {
test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http1_parent_with_http_route() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with HTTPRoute.
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("http".to_string())),
)
.await;

let route = create(
&client,
gateway::HTTPRoute::make_route(
ns.clone(),
vec![parent.obj_ref()],
vec![vec![parent.backend_ref(port)]],
),
)
.await;
await_route_accepted(&client, &route).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http1_routes(&config);
let outbound_route = assert_singleton(routes);
assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route)));
})
.await;
}

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http2_parent() {
async fn test<P: TestParent>() {
Expand All @@ -93,7 +222,6 @@ async fn http2_parent() {
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
Expand All @@ -119,3 +247,95 @@ async fn http2_parent() {

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http2_parent_with_http_route() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with HTTPRoute.
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
)
.await;

let route = create(
&client,
gateway::HTTPRoute::make_route(
ns.clone(),
vec![parent.obj_ref()],
vec![vec![parent.backend_ref(port)]],
),
)
.await;
await_route_accepted(&client, &route).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http2_routes(&config);
let outbound_route = assert_singleton(routes);
assert!(route.meta_eq(gateway::HTTPRoute::extract_meta(outbound_route)));
})
.await;
}

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http2_parent_with_grpc_route() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with GRPCRoute.
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
)
.await;

let route = create(
&client,
gateway::GRPCRoute::make_route(
ns.clone(),
vec![parent.obj_ref()],
vec![vec![parent.backend_ref(port)]],
),
)
.await;
await_route_accepted(&client, &route).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = grpc_routes(&config);
let outbound_route = assert_singleton(routes);
assert!(route.meta_eq(gateway::GRPCRoute::extract_meta(outbound_route)));
})
.await;
}

test::<k8s::Service>().await;
}