Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions internal/xds/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
Expand All @@ -64,6 +65,7 @@ import (
v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/protobuf/types/known/structpb"

_ "google.golang.org/grpc/xds"
)
Expand Down Expand Up @@ -1124,6 +1126,8 @@ func (s) TestUpdateLRSServerToNil(t *testing.T) {
// Test verifies that child policy was updated on receipt of
// configuration update.
func (s) TestChildPolicyChangeOnConfigUpdate(t *testing.T) {
const customLBPolicy = "test_custom_lb_policy"

// Create an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer mgmtServer.Stop()
Expand Down Expand Up @@ -1175,21 +1179,21 @@ func (s) TestChildPolicyChangeOnConfigUpdate(t *testing.T) {
t.Fatalf("client.EmptyCall() failed: %v", err)
}

// Register stub pickfirst LB policy so that we can catch config changes.
// Register stub customLBPolicy LB policy so that we can catch config changes.
pfBuilder := balancer.Get(pickfirst.Name)
internal.BalancerUnregister(pfBuilder.Name())
lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
var updatedChildPolicy atomic.Pointer[string]
stub.Register(pfBuilder.Name(), stub.BalancerFuncs{
var updatedChildPolicy atomic.Value

stub.Register(customLBPolicy, stub.BalancerFuncs{
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return pfBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
},
Init: func(bd *stub.BalancerData) {
bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
name := pfBuilder.Name()
updatedChildPolicy.Store(&name)
// name := customLBPolicy
updatedChildPolicy.Store(customLBPolicy)
select {
case lbCfgCh <- ccs.BalancerConfig:
case <-ctx.Done():
Expand All @@ -1201,13 +1205,17 @@ func (s) TestChildPolicyChangeOnConfigUpdate(t *testing.T) {
bd.ChildBalancer.Close()
},
})
defer balancer.Register(pfBuilder)

// Now update the cluster to use "pick_first" as the endpoint picking policy.
defer internal.BalancerUnregister(customLBPolicy)

// Update the cluster to use customLBPolicy as the endpoint picking policy.
resources.Clusters[0].LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(t, &v3pickfirstpb.PickFirst{}),
TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
TypeUrl: "type.googleapis.com/" + customLBPolicy,
Value: &structpb.Struct{},
}),
},
}},
}
Expand All @@ -1217,21 +1225,26 @@ func (s) TestChildPolicyChangeOnConfigUpdate(t *testing.T) {

select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for pickfirst child policy config")
t.Fatalf("Timeout waiting for child policy config")
case <-lbCfgCh:
}

if p := updatedChildPolicy.Load(); p == nil || *p != pfBuilder.Name() {
var got string
if p != nil {
got = *p
}
t.Fatalf("Unexpected child policy after config update, got %q, want %q", got, pfBuilder.Name())
if p, ok := updatedChildPolicy.Load().(string); !ok || p != customLBPolicy {
t.Fatalf("Unexpected child policy after config update, got %q (ok: %v), want %q", p, ok, customLBPolicy)
}

// New RPC should still be routed successfully
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall() failed after policy update: %v", err)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for successful RPC after policy update.")
case <-ticker.C:
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
return
}
}
}
}

Expand Down