Skip to content

Commit e583b19

Browse files
authored
xds: Add RLS in xDS e2e test (#5281)
1 parent 0066bf6 commit e583b19

File tree

10 files changed

+355
-155
lines changed

10 files changed

+355
-155
lines changed

balancer/rls/balancer_test.go

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
3737
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
3838
"google.golang.org/grpc/internal/testutils"
39+
rlstest "google.golang.org/grpc/internal/testutils/rls"
3940
"google.golang.org/grpc/metadata"
4041
"google.golang.org/grpc/resolver"
4142
"google.golang.org/grpc/serviceconfig"
@@ -48,10 +49,10 @@ import (
4849
// and the old one is closed.
4950
func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
5051
// Start two RLS servers.
51-
lis1 := newListenerWrapper(t, nil)
52-
rlsServer1, rlsReqCh1 := setupFakeRLSServer(t, lis1)
53-
lis2 := newListenerWrapper(t, nil)
54-
rlsServer2, rlsReqCh2 := setupFakeRLSServer(t, lis2)
52+
lis1 := testutils.NewListenerWrapper(t, nil)
53+
rlsServer1, rlsReqCh1 := rlstest.SetupFakeRLSServer(t, lis1)
54+
lis2 := testutils.NewListenerWrapper(t, nil)
55+
rlsServer2, rlsReqCh2 := rlstest.SetupFakeRLSServer(t, lis2)
5556

5657
// Build RLS service config with the RLS server pointing to the first one.
5758
// Set a very low value for maxAge to ensure that the entry expires soon.
@@ -61,12 +62,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
6162
// Start a couple of test backends, and set up the fake RLS servers to return
6263
// these as a target in the RLS response.
6364
backendCh1, backendAddress1 := startBackend(t)
64-
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
65-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
65+
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
66+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
6667
})
6768
backendCh2, backendAddress2 := startBackend(t)
68-
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
69-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
69+
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
70+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
7071
})
7172

7273
// Register a manual resolver and push the RLS service config through it.
@@ -84,11 +85,11 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
8485
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh1)
8586

8687
// Ensure a connection is established to the first RLS server.
87-
val, err := lis1.newConnCh.Receive(ctx)
88+
val, err := lis1.NewConnCh.Receive(ctx)
8889
if err != nil {
8990
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
9091
}
91-
conn1 := val.(*connWrapper)
92+
conn1 := val.(*testutils.ConnWrapper)
9293

9394
// Make sure an RLS request is sent out.
9495
verifyRLSRequest(t, rlsReqCh1, true)
@@ -105,12 +106,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
105106
r.UpdateState(resolver.State{ServiceConfig: sc})
106107

107108
// Ensure a connection is established to the second RLS server.
108-
if _, err := lis2.newConnCh.Receive(ctx); err != nil {
109+
if _, err := lis2.NewConnCh.Receive(ctx); err != nil {
109110
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
110111
}
111112

112113
// Ensure the connection to the old one is closed.
113-
if _, err := conn1.closeCh.Receive(ctx); err != nil {
114+
if _, err := conn1.CloseCh.Receive(ctx); err != nil {
114115
t.Fatal("Timeout expired when waiting for LB policy to close control channel")
115116
}
116117

@@ -136,8 +137,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
136137
}
137138

138139
// Start an RLS server with the wrapped listener and credentials.
139-
lis := newListenerWrapper(t, nil)
140-
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
140+
lis := testutils.NewListenerWrapper(t, nil)
141+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
141142
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
142143

143144
// Build RLS service config.
@@ -147,8 +148,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
147148
// and set up the fake RLS server to return this as the target in the RLS
148149
// response.
149150
backendCh, backendAddress := startBackend(t, grpc.Creds(serverCreds))
150-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
151-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
151+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
152+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
152153
})
153154

154155
// Register a manual resolver and push the RLS service config through it.
@@ -173,7 +174,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
173174
verifyRLSRequest(t, rlsReqCh, true)
174175

175176
// Ensure a connection is established to the first RLS server.
176-
if _, err := lis.newConnCh.Receive(ctx); err != nil {
177+
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
177178
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
178179
}
179180
}
@@ -184,7 +185,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
184185
// provided service config is applied for the control channel.
185186
func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
186187
// Start an RLS server and set the throttler to never throttle requests.
187-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
188+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
188189
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
189190

190191
// Register a balancer to be used for the control channel, and set up a
@@ -211,8 +212,8 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
211212
// Start a test backend, and set up the fake RLS server to return this as a
212213
// target in the RLS response.
213214
backendCh, backendAddress := startBackend(t)
214-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
215-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
215+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
216+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
216217
})
217218

218219
// Register a manual resolver and push the RLS service config through it.
@@ -244,7 +245,7 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
244245
// target after the config has been applied.
245246
func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
246247
// Start an RLS server and set the throttler to always throttle requests.
247-
rlsServer, _ := setupFakeRLSServer(t, nil)
248+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
248249
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
249250

250251
// Build RLS service config with a default target.
@@ -284,16 +285,16 @@ func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
284285
// child policy configuration are propagated correctly.
285286
func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
286287
// Start an RLS server and set the throttler to never throttle requests.
287-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
288+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
288289
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
289290

290291
// Start a default backend and a test backend.
291292
_, defBackendAddress := startBackend(t)
292293
testBackendCh, testBackendAddress := startBackend(t)
293294

294295
// Set up the RLS server to respond with the test backend.
295-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
296-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
296+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
297+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
297298
})
298299

299300
// Set up a test balancer callback to push configs received by child policies.
@@ -411,7 +412,7 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
411412
// handled by closing the old balancer and creating a new one.
412413
func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
413414
// Start an RLS server and set the throttler to never throttle requests.
414-
rlsServer, _ := setupFakeRLSServer(t, nil)
415+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
415416
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
416417

417418
// Set up balancer callbacks.
@@ -507,14 +508,14 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
507508
// the caller of the RPC.
508509
func (s) TestConfigUpdate_BadChildPolicyConfigs(t *testing.T) {
509510
// Start an RLS server and set the throttler to never throttle requests.
510-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
511+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
511512
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
512513

513514
// Set up the RLS server to respond with a bad target field which is expected
514515
// to cause the child policy's ParseTarget to fail and should result in the LB
515516
// policy creating a lame child policy wrapper.
516-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
517-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
517+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
518+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
518519
})
519520

520521
// Build RLS service config with a default target. This default backend is
@@ -567,7 +568,7 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
567568
defer func() { minEvictDuration = origMinEvictDuration }()
568569

569570
// Start an RLS server and set the throttler to never throttle requests.
570-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
571+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
571572
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
572573

573574
// Register an LB policy to act as the child policy for RLS LB policy.
@@ -582,14 +583,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
582583
// these as targets in the RLS response, based on request keys.
583584
backendCh1, backendAddress1 := startBackend(t)
584585
backendCh2, backendAddress2 := startBackend(t)
585-
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
586+
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
586587
if req.KeyMap["k1"] == "v1" {
587-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
588+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
588589
}
589590
if req.KeyMap["k2"] == "v2" {
590-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
591+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
591592
}
592-
return &e2e.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
593+
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
593594
})
594595

595596
// Register a manual resolver and push the RLS service config through it.
@@ -661,7 +662,7 @@ func (s) TestDataCachePurging(t *testing.T) {
661662
defer func() { dataCachePurgeHook = origDataCachePurgeHook }()
662663

663664
// Start an RLS server and set the throttler to never throttle requests.
664-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
665+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
665666
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
666667

667668
// Register an LB policy to act as the child policy for RLS LB policy.
@@ -678,8 +679,8 @@ func (s) TestDataCachePurging(t *testing.T) {
678679
// Start a test backend, and set up the fake RLS server to return this as a
679680
// target in the RLS response.
680681
backendCh, backendAddress := startBackend(t)
681-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
682-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
682+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
683+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
683684
})
684685

685686
// Register a manual resolver and push the RLS service config through it.
@@ -740,7 +741,7 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
740741

741742
// Start an RLS server with the restartable listener and set the throttler to
742743
// never throttle requests.
743-
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis)
744+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
744745
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
745746

746747
// Override the reset backoff hook to get notified.
@@ -769,8 +770,8 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
769770
// Start a test backend, and set up the fake RLS server to return this as a
770771
// target in the RLS response.
771772
backendCh, backendAddress := startBackend(t)
772-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
773-
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
773+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
774+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
774775
})
775776

776777
// Register a manual resolver and push the RLS service config through it.
@@ -818,7 +819,11 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
818819
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
819820
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)
820821

821-
<-resetBackoffDone
822+
select {
823+
case <-ctx.Done():
824+
t.Fatalf("Timed out waiting for resetBackoffDone")
825+
case <-resetBackoffDone:
826+
}
822827

823828
// The fact that the above RPC succeeded indicates that the control channel
824829
// has moved back to READY. The connectivity state monitoring code should have

balancer/rls/control_channel_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import (
3232
"github.com/google/go-cmp/cmp"
3333
"google.golang.org/grpc"
3434
"google.golang.org/grpc/balancer"
35-
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
3635
"google.golang.org/grpc/codes"
3736
"google.golang.org/grpc/credentials"
3837
"google.golang.org/grpc/internal"
3938
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
39+
rlstest "google.golang.org/grpc/internal/testutils/rls"
4040
"google.golang.org/grpc/metadata"
4141
"google.golang.org/grpc/status"
4242
"google.golang.org/grpc/testdata"
@@ -47,7 +47,7 @@ import (
4747
// indicates that the control channel needs to be throttled.
4848
func (s) TestControlChannelThrottled(t *testing.T) {
4949
// Start an RLS server and set the throttler to always throttle requests.
50-
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
50+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
5151
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
5252

5353
// Create a control channel to the fake RLS server.
@@ -70,12 +70,12 @@ func (s) TestControlChannelThrottled(t *testing.T) {
7070
// TestLookupFailure tests the case where the RLS server responds with an error.
7171
func (s) TestLookupFailure(t *testing.T) {
7272
// Start an RLS server and set the throttler to never throttle requests.
73-
rlsServer, _ := setupFakeRLSServer(t, nil)
73+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
7474
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
7575

7676
// Setup the RLS server to respond with errors.
77-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
78-
return &e2e.RouteLookupResponse{Err: errors.New("rls failure")}
77+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
78+
return &rlstest.RouteLookupResponse{Err: errors.New("rls failure")}
7979
})
8080

8181
// Create a control channel to the fake RLS server.
@@ -114,7 +114,7 @@ func (s) TestLookupDeadlineExceeded(t *testing.T) {
114114
}
115115

116116
// Start an RLS server and set the throttler to never throttle.
117-
rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
117+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
118118
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
119119

120120
// Create a control channel with a small deadline.
@@ -246,7 +246,7 @@ var (
246246
Reason: rlspb.RouteLookupRequest_REASON_MISS,
247247
StaleHeaderData: staleHeaderData,
248248
}
249-
lookupResponse = &e2e.RouteLookupResponse{
249+
lookupResponse = &rlstest.RouteLookupResponse{
250250
Resp: &rlspb.RouteLookupResponse{
251251
Targets: wantTargets,
252252
HeaderData: wantHeaderData,
@@ -256,11 +256,11 @@ var (
256256

257257
func testControlChannelCredsSuccess(t *testing.T, sopts []grpc.ServerOption, bopts balancer.BuildOptions) {
258258
// Start an RLS server and set the throttler to never throttle requests.
259-
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
259+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
260260
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
261261

262262
// Setup the RLS server to respond with a valid response.
263-
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
263+
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
264264
return lookupResponse
265265
})
266266

@@ -356,7 +356,7 @@ func testControlChannelCredsFailure(t *testing.T, sopts []grpc.ServerOption, bop
356356
// Start an RLS server and set the throttler to never throttle requests. The
357357
// creds failures happen before the RPC handler on the server is invoked.
358358
// So, there is need to setup the request and responses on the fake server.
359-
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
359+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
360360
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
361361

362362
// Create the control channel to the fake server.
@@ -454,7 +454,7 @@ func (*unsupportedCredsBundle) NewWithMode(mode string) (credentials.Bundle, err
454454
// TestNewControlChannelUnsupportedCredsBundle tests the case where the control
455455
// channel is configured with a bundle which does not support the mode we use.
456456
func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) {
457-
rlsServer, _ := setupFakeRLSServer(t, nil)
457+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
458458

459459
// Create the control channel to the fake server.
460460
ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{CredsBundle: &unsupportedCredsBundle{}}, nil)

0 commit comments

Comments
 (0)