Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions internal/xds/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
"google.golang.org/grpc/internal/xds/clusterspecifier"
"google.golang.org/grpc/internal/xds/httpfilter"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)

func init() {
Expand Down Expand Up @@ -337,3 +340,136 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
}`
verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
}

// TestResolverClusterSpecifierPlugin_WithFilters tests the case where a route
// configuration containing cluster specifier plugins is sent by the management
// server, and HTTP filters are configured. The test verifies that the
// interceptor chain is built for routes matching cluster specifier plugins.
func (s) TestResolverClusterSpecifierPlugin_WithFilters(t *testing.T) {
// Register custom httpFilter builders for the test.
testFilterTypeURL1 := "test-filter-type-url-1" + uuid.New().String()
testFilterTypeURL2 := "test-filter-type-url-2" + uuid.New().String()
newStreamChan := testutils.NewChannelWithSize(2)
fb1 := &testHTTPFilterWithRPCMetadata{
logger: t,
typeURL: testFilterTypeURL1,
newStreamChan: newStreamChan,
}
fb2 := &testHTTPFilterWithRPCMetadata{
logger: t,
typeURL: testFilterTypeURL2,
newStreamChan: newStreamChan,
}
httpfilter.Register(fb1)
httpfilter.Register(fb2)
defer httpfilter.UnregisterForTesting(fb1.typeURL)
defer httpfilter.UnregisterForTesting(fb2.typeURL)

// Spin up an xDS management server.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID)

// Configure resources on the management server.
// We need a listener with the filter, and a route with ClusterSpecifierPlugin.
listeners := []*v3listenerpb.Listener{{
Name: defaultTestServiceName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: defaultTestRouteConfigName,
}},
HttpFilters: []*v3httppb.HttpFilter{
newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "filter-path-1", ""),
newHTTPFilter(t, "test-filter-2", testFilterTypeURL2, "filter-path-2", ""),
e2e.RouterHTTPFilter,
},
}),
},
}}

routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
// Override the configuration for "test-filter-1" in the route.
routes[0].VirtualHosts[0].Routes[0].TypedPerFilterConfig = map[string]*anypb.Any{
"test-filter-1": newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "override-path-1", "").GetTypedConfig(),
}
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

// Wait for an update from the resolver, and verify the service config.
wantSC := `
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster_specifier_plugin:cspA": {
"childPolicy": [
{
"csp_experimental": {
"arbitrary_field": "anything"
}
}
]
}
}
}
}
]
}`
Comment on lines +412 to +430
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we have this whole block indented one tabspace to the left. That way, it will align with the code instead of sticking out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}

// Verify that the interceptor is not nil.
if res.Interceptor == nil {
t.Fatal("RPCInfo does not contain interceptors list")
}

newStream := func(context.Context, func()) (iresolver.ClientStream, error) {
return nil, nil
}
Comment on lines +442 to +444
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why significance does this empty function have? Why does it return nil, nil? Can a nil function be passed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot pass nil here. The documentation for the NewStream method in the ClientInterceptor interface explicitly states: The caller must ensure done is non-nil. Passing nil would cause a panic.


if _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{Method: "/service/method", Context: ctx}, func() {}, newStream); err != nil {
t.Fatalf("NewStream() failed with error: %v", err)
}

// Verify that first filter receives the config.
cfg, err := newStreamChan.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for first filter to receive config: %v", err)
}
ofc := cfg.(overallFilterConfig)
if ofc.BasePath != "filter-path-1" {
t.Fatalf("Unexpected base path for first filter, got: %q, want: %q", ofc.BasePath, "filter-path-1")
}
if ofc.OverridePath != "override-path-1" {
t.Fatalf("Unexpected override path for first filter, got: %q, want: %q", ofc.OverridePath, "override-path-1")
}

// Verify that second filter receives the base path.
cfg, err = newStreamChan.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for second filter to receive config: %v", err)
}
ofc = cfg.(overallFilterConfig)
if ofc.BasePath != "filter-path-2" {
t.Fatalf("Unexpected base path for second filter, got: %q, want: %q", ofc.BasePath, "filter-path-2")
}
if ofc.OverridePath != "" {
t.Fatalf("Unexpected override path for second filter, got: %q, want: %q", ofc.OverridePath, "")
}
Comment on lines +463 to +474
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of having two filters? What extra test coverage does it provide over having a single filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It verifies that the system can correctly build and execute a chain of multiple interceptors. A single filter would only test that an interceptor works in isolation, not that the chaining mechanism itself is functional. It also ensures that configuration overrides are applied correctly on a per-filter basis.
In this test, test-filter-1 is configured with an override, while test-filter-2 uses its base configuration. This allows us to verify that the override for one filter does not affect the configuration of other filters in the chain.

}
43 changes: 28 additions & 15 deletions internal/xds/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,30 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool {
return true
}

// addClusterToRoute creates a client interceptor for the given cluster
// configuration, adds the cluster to the provided WRR picker, and appends the
// interceptor to the list of interceptors for the current route.
func (r *xdsResolver) addClusterToRoute(clusters wrr.WRR, clusterName string, weight int64, interceptors *[]iresolver.ClientInterceptor,
clusterOverride map[string]httpfilter.FilterConfig, routeOverride map[string]httpfilter.FilterConfig) error {
interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, clusterOverride, routeOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride)
if err != nil {
// Clean up any interceptors that were successfully built
// for the current route before this error occurred. Note
// that this is not handled by the call to cs.stop() in the
// deferred function.
for _, i := range *interceptors {
i.Close()
}
return err
}
clusters.Add(&routeCluster{
name: clusterName,
interceptor: interceptor,
}, weight)
*interceptors = append(*interceptors, interceptor)
return nil
}

// newConfigSelector creates a new config selector using the most recently
// received listener and route config updates. May add entries to
// r.activeClusters for previously-unseen clusters.
Expand Down Expand Up @@ -417,29 +441,18 @@ func (r *xdsResolver) newConfigSelector() (_ *configSelector, err error) {
interceptors := []iresolver.ClientInterceptor{}
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{name: clusterName}, 1)
if err := r.addClusterToRoute(clusters, clusterName, 1, &interceptors, nil, rt.HTTPFilterConfigOverride); err != nil {
return nil, err
}
ci := r.addOrGetActiveClusterInfo(clusterName, "")
ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
cs.plugins[clusterName] = ci
} else {
for _, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + wc.Name
interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride)
if err != nil {
// Clean up any interceptors that were successfully built
// for the current route before this error occurred. Note
// that this is not handled by the call to cs.stop() in the
// deferred function.
for _, i := range interceptors {
i.Close()
}
if err := r.addClusterToRoute(clusters, clusterName, int64(wc.Weight), &interceptors, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride); err != nil {
return nil, err
}
clusters.Add(&routeCluster{
name: clusterName,
interceptor: interceptor,
}, int64(wc.Weight))
interceptors = append(interceptors, interceptor)
ci := r.addOrGetActiveClusterInfo(clusterName, wc.Name)
ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: wc.Name})}
cs.clusters[clusterName] = ci
Expand Down
Loading