Skip to content

Commit 1127279

Browse files
authored
Version 0.4.0 related functional logic (#907)
* Update kdubbo API module versions for MeshService * Remove MeshService visibleTo usage * Test proxyless mTLS xDS generation * Wire proxyless runtime security config * Update api and xds-api dependencies * Add mtls gateway logic and necessary configuration
1 parent a71ecf7 commit 1127279

34 files changed

Lines changed: 1700 additions & 179 deletions

.asf.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ notifications:
2121

2222
github:
2323
homepage: "https://kdubbo.github.io"
24-
description: "Embed gRPC, use xDS, build inherent mesh."
24+
description: "Build a native mesh using gRPC and xDS."
2525
features:
2626
# Enable wiki for documentation
2727
wiki: false

dubbod/discovery/cmd/app/cmd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
executeLogScope = "setup"
3838
waitLogScope = "wait"
3939
xclientLogScope = "xclient"
40+
xserverLogScope = "xserver"
4041
)
4142

4243
func NewRootCommand() *cobra.Command {
@@ -61,6 +62,7 @@ func NewRootCommand() *cobra.Command {
6162
cmd.AddFlags(rootCmd)
6263
rootCmd.AddCommand(waitCmd)
6364
rootCmd.AddCommand(newXClientCommand())
65+
rootCmd.AddCommand(newXServerCommand())
6466

6567
return rootCmd
6668
}

dubbod/discovery/cmd/app/cmd_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestRootCommandRegistersRenamedCommands(t *testing.T) {
3232
commands[command.Name()] = true
3333
}
3434

35-
for _, name := range []string{"execute", "xclient"} {
35+
for _, name := range []string{"execute", "xclient", "xserver"} {
3636
if !commands[name] {
3737
t.Fatalf("expected command %q to be registered; commands=%v", name, commands)
3838
}
@@ -47,7 +47,7 @@ func TestRootCommandRegistersRenamedCommands(t *testing.T) {
4747
func TestCommandsSetLogScopes(t *testing.T) {
4848
defer func() {
4949
log.SetDefaultScope("log")
50-
for _, name := range []string{"log", executeLogScope, waitLogScope, xclientLogScope} {
50+
for _, name := range []string{"log", executeLogScope, waitLogScope, xclientLogScope, xserverLogScope} {
5151
if scope := log.FindScope(name); scope != nil {
5252
scope.SetOutput(os.Stderr)
5353
}
@@ -62,6 +62,7 @@ func TestCommandsSetLogScopes(t *testing.T) {
6262
{command: "execute", scope: "setup"},
6363
{command: "wait", scope: "wait"},
6464
{command: "xclient", scope: "xclient"},
65+
{command: "xserver", scope: "xserver"},
6566
} {
6667
t.Run(tt.command, func(t *testing.T) {
6768
var out bytes.Buffer

dubbod/discovery/cmd/app/xds_client.go

Lines changed: 133 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
corev1 "github.com/kdubbo/xds-api/core/v1"
4040
endpointv1 "github.com/kdubbo/xds-api/endpoint/v1"
4141
hcmv1 "github.com/kdubbo/xds-api/extensions/filters/v1/network/http_connection_manager"
42+
tlsv1 "github.com/kdubbo/xds-api/extensions/transport_sockets/tls/v1"
4243
xdsresolver "github.com/kdubbo/xds-api/grpc/resolver"
4344
listenerv1 "github.com/kdubbo/xds-api/listener/v1"
4445
routev1 "github.com/kdubbo/xds-api/route/v1"
@@ -85,7 +86,11 @@ type xdsDestination struct {
8586
Host string `json:"host"`
8687
Subset string `json:"subset,omitempty"`
8788
Weight uint32 `json:"weight"`
89+
TLSMode string `json:"tlsMode,omitempty"`
90+
SNI string `json:"sni,omitempty"`
8891
Endpoints []xdsEndpoint `json:"endpoints,omitempty"`
92+
93+
tlsContext *tlsv1.UpstreamTlsContext
8994
}
9095

9196
type xdsRouteSnapshot struct {
@@ -102,12 +107,15 @@ type sampleADSClient struct {
102107
host string
103108
port int
104109

105-
mu sync.RWMutex
106-
subs map[string][]string
107-
route map[string]uint32
108-
endpoints map[string][]xdsEndpoint
109-
updates chan struct{}
110-
errs chan error
110+
mu sync.RWMutex
111+
subs map[string][]string
112+
route map[string]uint32
113+
endpoints map[string][]xdsEndpoint
114+
clusterTLS map[string]*tlsv1.UpstreamTlsContext
115+
updates chan struct{}
116+
errs chan error
117+
bootstrapPath string
118+
bootstrap *xdsresolver.BootstrapConfig
111119
}
112120

113121
func newXClientCommand() *cobra.Command {
@@ -232,17 +240,19 @@ func newSampleADSClient(ctx context.Context, opts *xdsClientOptions) (*sampleADS
232240
return nil, fmt.Errorf("open ADS stream: %w", err)
233241
}
234242
return &sampleADSClient{
235-
conn: conn,
236-
stream: stream,
237-
node: node,
238-
target: opts.target,
239-
host: opts.host,
240-
port: opts.port,
241-
subs: map[string][]string{},
242-
route: map[string]uint32{},
243-
endpoints: map[string][]xdsEndpoint{},
244-
updates: make(chan struct{}, 1),
245-
errs: make(chan error, 1),
243+
conn: conn,
244+
stream: stream,
245+
node: node,
246+
target: opts.target,
247+
host: opts.host,
248+
port: opts.port,
249+
subs: map[string][]string{},
250+
route: map[string]uint32{},
251+
endpoints: map[string][]xdsEndpoint{},
252+
clusterTLS: map[string]*tlsv1.UpstreamTlsContext{},
253+
updates: make(chan struct{}, 1),
254+
errs: make(chan error, 1),
255+
bootstrapPath: opts.bootstrapPath,
246256
}, nil
247257
}
248258

@@ -410,10 +420,16 @@ func (c *sampleADSClient) handleResponse(resp *discovery.DiscoveryResponse) erro
410420
return c.subscribe(v1.ClusterType, clusters)
411421
}
412422
case v1.ClusterType:
413-
edsNames, err := edsNamesFromClusters(resp.Resources)
423+
edsNames, clusterTLS, err := edsNamesAndTLSFromClusters(resp.Resources)
414424
if err != nil {
415425
return err
416426
}
427+
c.mu.Lock()
428+
for clusterName, tlsContext := range clusterTLS {
429+
c.clusterTLS[clusterName] = tlsContext
430+
}
431+
c.mu.Unlock()
432+
c.notify()
417433
if len(edsNames) > 0 {
418434
return c.subscribe(v1.EndpointType, edsNames)
419435
}
@@ -478,7 +494,7 @@ func (c *sampleADSClient) readySnapshot(expected map[string]uint32) (xdsRouteSna
478494
continue
479495
}
480496
eps := append([]xdsEndpoint(nil), c.endpoints[clusterName]...)
481-
dest := destinationFromCluster(clusterName, weight, eps)
497+
dest := destinationFromCluster(clusterName, weight, eps, c.clusterTLS[clusterName])
482498
snapshot.Destinations = append(snapshot.Destinations, dest)
483499
}
484500
sort.Slice(snapshot.Destinations, func(i, j int) bool {
@@ -569,13 +585,15 @@ func routeWeightsFromRoutes(resources []*anypb.Any) (map[string]uint32, []string
569585
return weights, sortedUnique(clusters), nil
570586
}
571587

572-
func edsNamesFromClusters(resources []*anypb.Any) ([]string, error) {
588+
func edsNamesAndTLSFromClusters(resources []*anypb.Any) ([]string, map[string]*tlsv1.UpstreamTlsContext, error) {
573589
out := make([]string, 0, len(resources))
590+
clusterTLS := map[string]*tlsv1.UpstreamTlsContext{}
574591
for _, resource := range resources {
575592
c := &clusterv1.Cluster{}
576593
if err := proto.Unmarshal(resource.Value, c); err != nil {
577-
return nil, err
594+
return nil, nil, err
578595
}
596+
clusterTLS[c.Name] = upstreamTLSContextFromCluster(c)
579597
serviceName := c.GetName()
580598
if eds := c.GetEdsClusterConfig(); eds != nil && eds.GetServiceName() != "" {
581599
serviceName = eds.GetServiceName()
@@ -584,7 +602,22 @@ func edsNamesFromClusters(resources []*anypb.Any) ([]string, error) {
584602
out = append(out, serviceName)
585603
}
586604
}
587-
return sortedUnique(out), nil
605+
return sortedUnique(out), clusterTLS, nil
606+
}
607+
608+
func upstreamTLSContextFromCluster(c *clusterv1.Cluster) *tlsv1.UpstreamTlsContext {
609+
if c == nil || c.TransportSocket == nil {
610+
return nil
611+
}
612+
typedConfig := c.TransportSocket.GetTypedConfig()
613+
if typedConfig == nil {
614+
return nil
615+
}
616+
upstreamTLS := &tlsv1.UpstreamTlsContext{}
617+
if err := anypb.UnmarshalTo(typedConfig, upstreamTLS, proto.UnmarshalOptions{}); err != nil {
618+
return nil
619+
}
620+
return upstreamTLS
588621
}
589622

590623
func endpointsFromAssignments(resources []*anypb.Any) (map[string][]xdsEndpoint, error) {
@@ -622,13 +655,17 @@ func endpointsFromAssignments(resources []*anypb.Any) (map[string][]xdsEndpoint,
622655
return out, nil
623656
}
624657

625-
func destinationFromCluster(clusterName string, weight uint32, endpoints []xdsEndpoint) xdsDestination {
658+
func destinationFromCluster(clusterName string, weight uint32, endpoints []xdsEndpoint, tlsContext *tlsv1.UpstreamTlsContext) xdsDestination {
626659
parts := strings.Split(clusterName, "|")
627-
dest := xdsDestination{Cluster: clusterName, Weight: weight, Endpoints: endpoints}
660+
dest := xdsDestination{Cluster: clusterName, Weight: weight, Endpoints: endpoints, tlsContext: tlsContext}
628661
if len(parts) == 4 {
629662
dest.Host = parts[3]
630663
dest.Subset = parts[2]
631664
}
665+
if tlsContext != nil {
666+
dest.TLSMode = "DUBBO_MUTUAL"
667+
dest.SNI = tlsContext.Sni
668+
}
632669
return dest
633670
}
634671

@@ -638,7 +675,7 @@ func runSampleRequests(ctx context.Context, adsClient *sampleADSClient, snapshot
638675
return err
639676
}
640677
currentSignature := snapshotSignature(snapshot)
641-
client := &http.Client{Timeout: requestTimeout}
678+
clients := newSampleRequestClients(adsClient, requestTimeout)
642679
for i := 0; i < count; i++ {
643680
if updated, ok := adsClient.readySnapshot(nil); ok {
644681
updatedSignature := snapshotSignature(updated)
@@ -651,17 +688,21 @@ func runSampleRequests(ctx context.Context, adsClient *sampleADSClient, snapshot
651688
}
652689
}
653690
}
654-
_, endpoint, err := picker.Next()
691+
destination, endpoint, err := picker.Next()
692+
if err != nil {
693+
return err
694+
}
695+
httpClient, scheme, err := clients.clientForDestination(destination)
655696
if err != nil {
656697
return err
657698
}
658699
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
659-
fmt.Sprintf("http://%s/", net.JoinHostPort(endpoint.Address, strconv.Itoa(int(endpoint.Port)))), nil)
700+
fmt.Sprintf("%s://%s/", scheme, net.JoinHostPort(endpoint.Address, strconv.Itoa(int(endpoint.Port)))), nil)
660701
if err != nil {
661702
return err
662703
}
663704
req.Host = snapshot.Host
664-
resp, err := client.Do(req)
705+
resp, err := httpClient.Do(req)
665706
if err != nil {
666707
return err
667708
}
@@ -686,6 +727,70 @@ func runSampleRequests(ctx context.Context, adsClient *sampleADSClient, snapshot
686727
return nil
687728
}
688729

730+
type sampleRequestClients struct {
731+
adsClient *sampleADSClient
732+
requestTimeout time.Duration
733+
plaintext *http.Client
734+
tlsClients map[string]*http.Client
735+
}
736+
737+
func newSampleRequestClients(adsClient *sampleADSClient, requestTimeout time.Duration) *sampleRequestClients {
738+
return &sampleRequestClients{
739+
adsClient: adsClient,
740+
requestTimeout: requestTimeout,
741+
plaintext: &http.Client{Timeout: requestTimeout},
742+
tlsClients: map[string]*http.Client{},
743+
}
744+
}
745+
746+
func (c *sampleRequestClients) clientForDestination(destination xdsDestination) (*http.Client, string, error) {
747+
if destination.tlsContext == nil {
748+
return c.plaintext, "http", nil
749+
}
750+
bootstrap, err := c.adsClient.bootstrapConfig()
751+
if err != nil {
752+
return nil, "", err
753+
}
754+
tlsConfig, err := xdsresolver.DataPlaneTLSConfigFromBootstrap(bootstrap, destination.tlsContext, destination.Host)
755+
if err != nil {
756+
return nil, "", err
757+
}
758+
key := destination.TLSMode + "|" + destination.SNI
759+
if key == "|" {
760+
key = destination.Cluster
761+
}
762+
if cached := c.tlsClients[key]; cached != nil {
763+
return cached, "https", nil
764+
}
765+
client := &http.Client{
766+
Timeout: c.requestTimeout,
767+
Transport: &http.Transport{
768+
TLSClientConfig: tlsConfig,
769+
},
770+
}
771+
c.tlsClients[key] = client
772+
return client, "https", nil
773+
}
774+
775+
func (c *sampleADSClient) bootstrapConfig() (*xdsresolver.BootstrapConfig, error) {
776+
if c.bootstrap != nil {
777+
return c.bootstrap, nil
778+
}
779+
path := c.bootstrapPath
780+
if path == "" {
781+
path = os.Getenv("GRPC_XDS_BOOTSTRAP")
782+
}
783+
if path == "" {
784+
return nil, fmt.Errorf("data-plane mTLS requires GRPC_XDS_BOOTSTRAP or --bootstrap")
785+
}
786+
bootstrap, err := xdsresolver.ParseBootstrap(path)
787+
if err != nil {
788+
return nil, err
789+
}
790+
c.bootstrap = bootstrap
791+
return c.bootstrap, nil
792+
}
793+
689794
func activeDestinations(snapshot xdsRouteSnapshot) []xdsDestination {
690795
out := make([]xdsDestination, 0, len(snapshot.Destinations))
691796
for _, dest := range snapshot.Destinations {

dubbod/discovery/cmd/app/xds_client_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"strings"
1414
"testing"
1515
"time"
16+
17+
tlsv1 "github.com/kdubbo/xds-api/extensions/transport_sockets/tls/v1"
1618
)
1719

1820
func TestAutoDiscoverServiceTargetSingleService(t *testing.T) {
@@ -203,6 +205,47 @@ func TestRunSampleRequestsUsesUpdatedSnapshotOnSameStream(t *testing.T) {
203205
}
204206
}
205207

208+
func TestSampleRequestClientsDoNotBypassMTLS(t *testing.T) {
209+
clients := newSampleRequestClients(&sampleADSClient{}, time.Second)
210+
destination := xdsDestination{
211+
Cluster: "outbound|80|v1|nginx.app.svc.cluster.local",
212+
Host: "nginx.app.svc.cluster.local",
213+
TLSMode: "DUBBO_MUTUAL",
214+
SNI: "nginx.app.svc.cluster.local",
215+
tlsContext: &tlsv1.UpstreamTlsContext{
216+
Sni: "nginx.app.svc.cluster.local",
217+
},
218+
}
219+
220+
_, _, err := clients.clientForDestination(destination)
221+
if err == nil {
222+
t.Fatalf("clientForDestination() error = nil, want bootstrap error for mTLS route")
223+
}
224+
if !strings.Contains(err.Error(), "data-plane mTLS requires GRPC_XDS_BOOTSTRAP or --bootstrap") {
225+
t.Fatalf("error = %q, want bootstrap requirement", err.Error())
226+
}
227+
}
228+
229+
func TestDestinationFromClusterExposesTLSContext(t *testing.T) {
230+
tlsContext := &tlsv1.UpstreamTlsContext{Sni: "nginx.app.svc.cluster.local"}
231+
destination := destinationFromCluster(
232+
"outbound|80|v1|nginx.app.svc.cluster.local",
233+
50,
234+
[]xdsEndpoint{{Address: "10.0.0.1", Port: 80}},
235+
tlsContext,
236+
)
237+
238+
if destination.TLSMode != "DUBBO_MUTUAL" {
239+
t.Fatalf("TLSMode = %q, want DUBBO_MUTUAL", destination.TLSMode)
240+
}
241+
if destination.SNI != "nginx.app.svc.cluster.local" {
242+
t.Fatalf("SNI = %q, want nginx host", destination.SNI)
243+
}
244+
if destination.tlsContext != tlsContext {
245+
t.Fatalf("tlsContext was not preserved")
246+
}
247+
}
248+
206249
func endpointForServer(t *testing.T, server *httptest.Server) xdsEndpoint {
207250
t.Helper()
208251
parsed, err := neturl.Parse(server.URL)

0 commit comments

Comments
 (0)