Skip to content

Commit e02c3d0

Browse files
implement the DestHostBackendManager
1 parent 9e6944a commit e02c3d0

File tree

12 files changed

+232
-14
lines changed

12 files changed

+232
-14
lines changed

cmd/agent/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type GrpcProxyAgentOptions struct {
7070
adminServerPort int
7171

7272
agentID string
73+
agentHost string
7374
syncInterval time.Duration
7475
probeInterval time.Duration
7576

@@ -81,6 +82,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOption grpc.DialOption) *age
8182
return &agent.ClientSetConfig{
8283
Address: fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort),
8384
AgentID: o.agentID,
85+
AgentHost: o.agentHost,
8486
SyncInterval: o.syncInterval,
8587
ProbeInterval: o.probeInterval,
8688
DialOption: dialOption,
@@ -101,6 +103,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
101103
flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.")
102104
flags.DurationVar(&o.probeInterval, "probe-interval", o.probeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.")
103105
flags.StringVar(&o.serviceAccountTokenPath, "service-account-token-path", o.serviceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
106+
flags.StringVar(&o.agentHost, "agent-host", o.agentHost, "The host address of this agent, which can be either an IPv4/6 address or a domain name.")
104107
return flags
105108
}
106109

@@ -116,6 +119,7 @@ func (o *GrpcProxyAgentOptions) Print() {
116119
klog.Warningf("SyncInterval set to %v.\n", o.syncInterval)
117120
klog.Warningf("ProbeInterval set to %v.\n", o.probeInterval)
118121
klog.Warningf("ServiceAccountTokenPath set to \"%s\".\n", o.serviceAccountTokenPath)
122+
klog.Warningf("AgentHost set to %s.\n", o.agentHost)
119123
}
120124

121125
func (o *GrpcProxyAgentOptions) Validate() error {
@@ -168,6 +172,7 @@ func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
168172
healthServerPort: 8093,
169173
adminServerPort: 8094,
170174
agentID: uuid.New().String(),
175+
agentHost: "",
171176
syncInterval: 1 * time.Second,
172177
probeInterval: 1 * time.Second,
173178
serviceAccountTokenPath: "",

cmd/server/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ type ProxyRunOptions struct {
107107
authenticationAudience string
108108
// Path to kubeconfig (used by kubernetes client)
109109
kubeconfigPath string
110+
111+
// Proxy strategy used by the server, can be either "destHost" or "default".
112+
// If not set, the "default" strategy will be used.
113+
proxyStrategy string
110114
}
111115

112116
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -132,6 +136,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
132136
flags.StringVar(&o.agentServiceAccount, "agent-service-account", o.agentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).")
133137
flags.StringVar(&o.kubeconfigPath, "kubeconfig", o.kubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
134138
flags.StringVar(&o.authenticationAudience, "authentication-audience", o.authenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
139+
flags.StringVar(&o.proxyStrategy, "proxy-strategy", o.proxyStrategy, "TODO ...")
135140
return flags
136141
}
137142

@@ -157,6 +162,7 @@ func (o *ProxyRunOptions) Print() {
157162
klog.Warningf("AgentServiceAccount set to %q.\n", o.agentServiceAccount)
158163
klog.Warningf("AuthenticationAudience set to %q.\n", o.authenticationAudience)
159164
klog.Warningf("KubeconfigPath set to %q.\n", o.kubeconfigPath)
165+
klog.Warningf("ProxyStrategy set to %s.\n", o.proxyStrategy)
160166
}
161167

162168
func (o *ProxyRunOptions) Validate() error {
@@ -298,6 +304,7 @@ func newProxyRunOptions() *ProxyRunOptions {
298304
agentServiceAccount: "",
299305
kubeconfigPath: "",
300306
authenticationAudience: "",
307+
proxyStrategy: "default",
301308
}
302309
return &o
303310
}
@@ -347,7 +354,7 @@ func (p *Proxy) run(o *ProxyRunOptions) error {
347354
KubernetesClient: k8sClient,
348355
AuthenticationAudience: o.authenticationAudience,
349356
}
350-
server := server.NewProxyServer(o.serverID, int(o.serverCount), authOpt)
357+
server := server.NewProxyServer(o.serverID, o.proxyStrategy, int(o.serverCount), authOpt)
351358

352359
klog.Info("Starting master server for client connections.")
353360
masterStop, err := p.runMasterServer(ctx, o, server)

pkg/agent/client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,10 @@ type AgentClient struct {
8888

8989
cs *ClientSet // the clientset that includes this AgentClient.
9090

91-
stream agent.AgentService_ConnectClient
92-
agentID string
93-
serverID string // the id of the proxy server this client connects to.
91+
stream agent.AgentService_ConnectClient
92+
agentID string
93+
agentHost string
94+
serverID string // the id of the proxy server this client connects to.
9495

9596
// connect opts
9697
address string
@@ -107,11 +108,12 @@ type AgentClient struct {
107108
serviceAccountTokenPath string
108109
}
109110

110-
func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, int, error) {
111+
func newAgentClient(address, agentID, agentHost string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, int, error) {
111112
a := &AgentClient{
112113
cs: cs,
113114
address: address,
114115
agentID: agentID,
116+
agentHost: agentHost,
115117
opts: opts,
116118
probeInterval: cs.probeInterval,
117119
stopCh: make(chan struct{}),
@@ -132,7 +134,9 @@ func (a *AgentClient) Connect() (int, error) {
132134
if err != nil {
133135
return 0, err
134136
}
135-
ctx := metadata.AppendToOutgoingContext(context.Background(), header.AgentID, a.agentID)
137+
ctx := metadata.AppendToOutgoingContext(context.Background(),
138+
header.AgentID, a.agentID,
139+
header.AgentHost, a.agentHost)
136140
if a.serviceAccountTokenPath != "" {
137141
if ctx, err = a.initializeAuthContext(ctx); err != nil {
138142
conn.Close()

pkg/agent/clientset.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type ClientSet struct {
4848
serviceAccountTokenPath string
4949
// channel to signal shutting down the client set. Primarily for test.
5050
stopCh <-chan struct{}
51+
52+
agentHost string // The IP address of the agent
5153
}
5254

5355
func (cs *ClientSet) ClientsCount() int {
@@ -108,6 +110,7 @@ func (cs *ClientSet) RemoveClient(serverID string) {
108110
type ClientSetConfig struct {
109111
Address string
110112
AgentID string
113+
AgentHost string
111114
SyncInterval time.Duration
112115
ProbeInterval time.Duration
113116
DialOption grpc.DialOption
@@ -118,6 +121,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
118121
return &ClientSet{
119122
clients: make(map[string]*AgentClient),
120123
agentID: cc.AgentID,
124+
agentHost: cc.AgentHost,
121125
address: cc.Address,
122126
syncInterval: cc.SyncInterval,
123127
probeInterval: cc.ProbeInterval,
@@ -128,7 +132,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
128132
}
129133

130134
func (cs *ClientSet) newAgentClient() (*AgentClient, int, error) {
131-
return newAgentClient(cs.address, cs.agentID, cs, cs.dialOption)
135+
return newAgentClient(cs.address, cs.agentID, cs.agentHost, cs, cs.dialOption)
132136
}
133137

134138
func (cs *ClientSet) resetBackoff() *wait.Backoff {

pkg/server/backend_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type BackendManager interface {
7878
// contains multiple requests.
7979
Backend(ctx context.Context) (Backend, error)
8080
BackendStorage
81+
ReadinessManager
8182
}
8283

8384
var _ BackendManager = &DefaultBackendManager{}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"k8s.io/klog/v2"
6+
)
7+
8+
type DestHostBackendManager struct {
9+
*DefaultBackendStorage
10+
}
11+
12+
var _ BackendManager = &DestHostBackendManager{}
13+
14+
func NewDestHostBackendManager() *DestHostBackendManager {
15+
return &DestHostBackendManager{DefaultBackendStorage: NewDefaultBackendStorage()}
16+
}
17+
18+
// Backend tries to get a backend by IP. If the desired backend is not found,
19+
// just return a default one
20+
func (dibm *DestHostBackendManager) Backend(ctx context.Context) (Backend, error) {
21+
dibm.mu.RLock()
22+
defer dibm.mu.RUnlock()
23+
if len(dibm.backends) == 0 {
24+
return nil, &ErrNotFound{}
25+
}
26+
destHost := ctx.Value("destHost").(string)
27+
if destHost != "" {
28+
bes, exist := dibm.backends[destHost]
29+
if exist && len(bes) > 0 {
30+
return dibm.backends[destHost][0], nil
31+
}
32+
return nil, &ErrNotFound{}
33+
}
34+
35+
// fallback to the default behavior, i.e., get a random backend
36+
agentID := dibm.agentIDs[dibm.random.Intn(len(dibm.agentIDs))]
37+
klog.Infof("pick agentID=%s as backend", agentID)
38+
return dibm.backends[agentID][0], nil
39+
}

pkg/server/readiness_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ type ReadinessManager interface {
2323
Ready() (bool, string)
2424
}
2525

26-
var _ ReadinessManager = &DefaultBackendManager{}
26+
var _ ReadinessManager = &DefaultBackendStorage{}
2727

28-
func (s *DefaultBackendManager) Ready() (bool, string) {
28+
func (s *DefaultBackendStorage) Ready() (bool, string) {
2929
if s.NumBackends() == 0 {
3030
return false, "no connection to any proxy agent"
3131
}

pkg/server/server.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/klog/v2"
3333
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
3434
"sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics"
35+
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
3536
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
3637
"sigs.k8s.io/apiserver-network-proxy/proto/header"
3738
)
@@ -125,6 +126,8 @@ type ProxyServer struct {
125126

126127
// agent authentication
127128
AgentAuthenticationOptions *AgentTokenAuthenticationOptions
129+
130+
proxyStrategy string
128131
}
129132

130133
// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
@@ -201,8 +204,15 @@ func (s *ProxyServer) getFrontendsForBackendConn(agentID string, backend Backend
201204
}
202205

203206
// NewProxyServer creates a new ProxyServer instance
204-
func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
205-
bm := NewDefaultBackendManager()
207+
func NewProxyServer(serverID, proxyStrategy string, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
208+
var bm BackendManager
209+
if proxyStrategy == "destHost" {
210+
bm = NewDestHostBackendManager()
211+
} else {
212+
// to ensure backward compatibility, use the "default" backend manager
213+
// for all other proxy strategies
214+
bm = NewDefaultBackendManager()
215+
}
206216
return &ProxyServer{
207217
frontends: make(map[string](map[int64]*ProxyClientConnection)),
208218
PendingDial: NewPendingDialManager(),
@@ -211,6 +221,7 @@ func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions
211221
BackendManager: bm,
212222
AgentAuthenticationOptions: agentAuthenticationOptions,
213223
Readiness: bm,
224+
proxyStrategy: proxyStrategy,
214225
}
215226
}
216227

@@ -270,7 +281,12 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
270281
// the address, then we can send the Dial_REQ to the
271282
// same agent. That way we save the agent from creating
272283
// a new connection to the address.
273-
backend, err = s.BackendManager.Backend(context.TODO())
284+
ctx := context.Background()
285+
if s.proxyStrategy == "destHost" {
286+
addr := util.RemovePortFromHost(pkt.GetDialRequest().Address)
287+
ctx = context.WithValue(ctx, "destHost", addr)
288+
}
289+
backend, err = s.BackendManager.Backend(ctx)
274290
if err != nil {
275291
klog.Errorf(">>> failed to get a backend: %v", err)
276292
continue
@@ -370,6 +386,21 @@ func agentID(stream agent.AgentService_ConnectServer) (string, error) {
370386
return agentIDs[0], nil
371387
}
372388

389+
func agentHost(stream agent.AgentService_ConnectServer) (string, error) {
390+
md, ok := metadata.FromIncomingContext(stream.Context())
391+
if !ok {
392+
return "", fmt.Errorf("failed to get context")
393+
}
394+
agentHosts := md.Get(header.AgentHost)
395+
if len(agentHosts) > 1 {
396+
return "", fmt.Errorf("expected at most one agent IP in the context, got %v", agentHosts)
397+
}
398+
if len(agentHosts) == 0 {
399+
return "", nil
400+
}
401+
return agentHosts[0], nil
402+
}
403+
373404
func (s *ProxyServer) validateAuthToken(token string) error {
374405
trReq := &authv1.TokenReview{
375406
Spec: authv1.TokenReviewSpec{
@@ -446,6 +477,18 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
446477
if err != nil {
447478
return err
448479
}
480+
if s.proxyStrategy == "destHost" {
481+
// if the proxyStrategy is "destHost", use the agentHost as the ID
482+
agentHost, err := agentHost(stream)
483+
if err != nil {
484+
return err
485+
}
486+
if agentHost != "" {
487+
// if the agentHost is set, assign it to the agentID
488+
agentID = agentHost
489+
}
490+
}
491+
449492
klog.Infof("Connect request from agent %s", agentID)
450493
backend := s.BackendManager.AddBackend(agentID, stream)
451494
defer s.BackendManager.RemoveBackend(agentID, stream)

pkg/server/tunnel.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"k8s.io/klog/v2"
2828
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
29+
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
2930
)
3031

3132
// Tunnel implements Proxy based on HTTP Connect, which tunnels the traffic to
@@ -69,7 +70,12 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6970
},
7071
}
7172
klog.Infof("Set pending(rand=%d) to %v", random, w)
72-
backend, err := t.Server.BackendManager.Backend(context.TODO())
73+
ctx := context.Background()
74+
if t.Server.proxyStrategy == "destHost" {
75+
addr := util.RemovePortFromHost(r.Host)
76+
ctx = context.WithValue(ctx, "destHost", addr)
77+
}
78+
backend, err := t.Server.BackendManager.Backend(ctx)
7379
if err != nil {
7480
http.Error(w, fmt.Sprintf("currently no tunnels available: %v", err), http.StatusInternalServerError)
7581
return

pkg/util/net.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package util
18+
19+
import (
20+
"strings"
21+
)
22+
23+
// containIPv6Addr checks if the given host identity contains an
24+
// IPv6 address
25+
func containIPv6Addr(host string) bool {
26+
// the shortest IPv6 address is ::
27+
return len(strings.Split(host, ":")) > 2
28+
}
29+
30+
// containPortIPv6 checks if the host that contains an IPv6 address
31+
// also contains a port number
32+
func containPortIPv6(host string) bool {
33+
// based on to RFC 3986, section 3.2.2, host identified by an
34+
// IPv6 is distinguished by enclosing the IP literal within square
35+
// brackets ("[" and "]")
36+
return strings.ContainsRune(host, '[')
37+
}
38+
39+
// RemovePortFromHost removes port number from the host address that
40+
// may be of the form "<host>:<port>" where the <host> can be an either
41+
// an IPv4/6 address or a domain name
42+
func RemovePortFromHost(host string) string {
43+
if !containIPv6Addr(host) {
44+
return strings.Split(host, ":")[0]
45+
}
46+
if containPortIPv6(host) {
47+
host = host[:strings.LastIndexByte(host, ':')]
48+
}
49+
return strings.Trim(host, "[]")
50+
}

0 commit comments

Comments
 (0)