Skip to content

Feature: implement the BackendManager list #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -69,9 +70,10 @@ type GrpcProxyAgentOptions struct {
healthServerPort int
adminServerPort int

agentID string
syncInterval time.Duration
probeInterval time.Duration
agentID string
agentIdentifiers string
syncInterval time.Duration
probeInterval time.Duration

// file contains service account authorization token for enabling proxy-server token based authorization
serviceAccountTokenPath string
Expand All @@ -81,6 +83,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
return &agent.ClientSetConfig{
Address: fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort),
AgentID: o.agentID,
AgentIdentifiers: o.agentIdentifiers,
SyncInterval: o.syncInterval,
ProbeInterval: o.probeInterval,
DialOptions: dialOptions,
Expand All @@ -101,6 +104,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.")
flags.DurationVar(&o.probeInterval, "probe-interval", o.probeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.")
flags.StringVar(&o.serviceAccountTokenPath, "service-account-token-path", o.serviceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
flags.StringVar(&o.agentIdentifiers, "agent-identifiers", o.agentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::")
return flags
}

Expand All @@ -116,6 +120,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("SyncInterval set to %v.\n", o.syncInterval)
klog.V(1).Infof("ProbeInterval set to %v.\n", o.probeInterval)
klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.serviceAccountTokenPath)
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.agentIdentifiers))
}

func (o *GrpcProxyAgentOptions) Validate() error {
Expand Down Expand Up @@ -155,6 +160,27 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return fmt.Errorf("error checking service account token path %s, got %v", o.serviceAccountTokenPath, err)
}
}
if err := validateAgentIdentifiers(o.agentIdentifiers); err != nil {
return fmt.Errorf("agent address is invalid: %v", err)
}
return nil
}

func validateAgentIdentifiers(agentIdentifiers string) error {
decoded, err := url.ParseQuery(agentIdentifiers)
if err != nil {
return err
}
for idType := range decoded {
switch agent.IdentifierType(idType) {
case agent.IPv4:
case agent.IPv6:
case agent.CIDR:
case agent.Host:
default:
return fmt.Errorf("unknown address type: %s", idType)
}
}
return nil
}

Expand All @@ -168,6 +194,7 @@ func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
healthServerPort: 8093,
adminServerPort: 8094,
agentID: uuid.New().String(),
agentIdentifiers: "",
syncInterval: 1 * time.Second,
probeInterval: 1 * time.Second,
serviceAccountTokenPath: "",
Expand Down
34 changes: 32 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -115,6 +116,14 @@ type ProxyRunOptions struct {
authenticationAudience string
// Path to kubeconfig (used by kubernetes client)
kubeconfigPath string

// Proxy strategies used by the server.
// NOTE the order of the strategies matters. e.g., for list
// "destHost,destCIDR", the server will try to find a backend associating
// to the destination host first, if not found, it will try to find a
// backend within the destCIDR. if it still can't find any backend,
// it will use the default backend manager to choose a random backend.
proxyStrategies string
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand All @@ -141,6 +150,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.agentServiceAccount, "agent-service-account", o.agentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).")
flags.StringVar(&o.kubeconfigPath, "kubeconfig", o.kubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
flags.StringVar(&o.authenticationAudience, "authentication-audience", o.authenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
flags.StringVar(&o.proxyStrategies, "proxy-strategies", o.proxyStrategies, "The list of proxy strategies used by the server to pick a backend/tunnel, available strategies are: default, destHost.")
return flags
}

Expand All @@ -167,6 +177,7 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("AgentServiceAccount set to %q.\n", o.agentServiceAccount)
klog.V(1).Infof("AuthenticationAudience set to %q.\n", o.authenticationAudience)
klog.V(1).Infof("KubeconfigPath set to %q.\n", o.kubeconfigPath)
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.proxyStrategies)
}

func (o *ProxyRunOptions) Validate() error {
Expand Down Expand Up @@ -282,6 +293,19 @@ func (o *ProxyRunOptions) Validate() error {
}
}

// validate the proxy strategies
if o.proxyStrategies != "" {
pss := strings.Split(o.proxyStrategies, ",")
for _, ps := range pss {
switch ps {
case string(server.ProxyStrategyDestHost):
case string(server.ProxyStrategyDefault):
default:
return fmt.Errorf("unknown proxy strategy: %s, available strategy are: default, destHost", ps)
}
}
}

return nil
}

Expand Down Expand Up @@ -309,6 +333,7 @@ func newProxyRunOptions() *ProxyRunOptions {
agentServiceAccount: "",
kubeconfigPath: "",
authenticationAudience: "",
proxyStrategies: "default",
}
return &o
}
Expand Down Expand Up @@ -358,8 +383,13 @@ func (p *Proxy) run(o *ProxyRunOptions) error {
KubernetesClient: k8sClient,
AuthenticationAudience: o.authenticationAudience,
}
server := server.NewProxyServer(o.serverID, int(o.serverCount), authOpt)
klog.V(1).Infoln("Starting master server for client connections.")
ps, err := server.GenProxyStrategiesFromStr(o.proxyStrategies)
if err != nil {
return err
}
server := server.NewProxyServer(o.serverID, ps, int(o.serverCount), authOpt)

masterStop, err := p.runMasterServer(ctx, o, server)
if err != nil {
return fmt.Errorf("failed to run the master server: %v", err)
Expand Down Expand Up @@ -549,7 +579,7 @@ func (p *Proxy) runAgentServer(o *ProxyRunOptions, server *server.ProxyServer) e
}

addr := fmt.Sprintf(":%d", o.agentPort)
serverOptions := []grpc.ServerOption {
serverOptions := []grpc.ServerOption{
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.KeepaliveParams(keepalive.ServerParameters{Time: o.keepaliveTime}),
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module sigs.k8s.io/apiserver-network-proxy
go 1.12

require (
github.com/golang/mock v1.4.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2
github.com/google/uuid v1.1.1
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -18,6 +18,7 @@ require (
k8s.io/apimachinery v0.17.1
k8s.io/client-go v0.17.1
k8s.io/klog/v2 v2.0.0
rsc.io/quote/v3 v3.1.0 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.4.0 h1:Rd1kQnQu0Hq3qvJppYSG0HtP+f5LPPUiDswTLiEegLg=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
60 changes: 55 additions & 5 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"io/ioutil"
"net"
"net/url"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -79,6 +80,51 @@ func newConnectionManager() *connectionManager {
}
}

// AgentIdentifiers stores identifiers that will be used by the server when
// choosing agents
type AgentIdentifiers struct {
IPv4 []string
IPv6 []string
Host []string
CIDR []string
}

type IdentifierType string

const (
IPv4 IdentifierType = "ipv4"
IPv6 IdentifierType = "ipv6"
Host IdentifierType = "host"
CIDR IdentifierType = "cidr"
UID IdentifierType = "uid"
)

// GenAgentIdentifiers generates an AgentIdentifiers based on the input string, the
// input string should be a comma-seprated list with each item in the format
// of <IdentifierType>=<address>
func GenAgentIdentifiers(addrs string) (AgentIdentifiers, error) {
var agentIDs AgentIdentifiers
decoded, err := url.ParseQuery(addrs)
if err != nil {
return agentIDs, fmt.Errorf("fail to parse url encoded string: %v", err)
}
for idType, ids := range decoded {
switch IdentifierType(idType) {
case IPv4:
agentIDs.IPv4 = append(agentIDs.IPv4, ids...)
case IPv6:
agentIDs.IPv6 = append(agentIDs.IPv6, ids...)
case Host:
agentIDs.Host = append(agentIDs.Host, ids...)
case CIDR:
agentIDs.CIDR = append(agentIDs.CIDR, ids...)
default:
return agentIDs, fmt.Errorf("Unknown address type: %s", idType)
}
}
return agentIDs, nil
}

// AgentClient runs on the node network side. It connects to proxy server and establishes
// a stream connection from which it sends and receives network traffic.
type AgentClient struct {
Expand All @@ -88,9 +134,10 @@ type AgentClient struct {

cs *ClientSet // the clientset that includes this AgentClient.

stream agent.AgentService_ConnectClient
agentID string
serverID string // the id of the proxy server this client connects to.
stream agent.AgentService_ConnectClient
agentID string
agentIdentifiers string
serverID string // the id of the proxy server this client connects to.

// connect opts
address string
Expand All @@ -107,11 +154,12 @@ type AgentClient struct {
serviceAccountTokenPath string
}

func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, int, error) {
func newAgentClient(address, agentID, agentIdentifiers string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, int, error) {
a := &AgentClient{
cs: cs,
address: address,
agentID: agentID,
agentIdentifiers: agentIdentifiers,
opts: opts,
probeInterval: cs.probeInterval,
stopCh: make(chan struct{}),
Expand All @@ -132,7 +180,9 @@ func (a *AgentClient) Connect() (int, error) {
if err != nil {
return 0, err
}
ctx := metadata.AppendToOutgoingContext(context.Background(), header.AgentID, a.agentID)
ctx := metadata.AppendToOutgoingContext(context.Background(),
header.AgentID, a.agentID,
header.AgentIdentifiers, a.agentIdentifiers)
if a.serviceAccountTokenPath != "" {
if ctx, err = a.initializeAuthContext(ctx); err != nil {
conn.Close()
Expand Down
7 changes: 6 additions & 1 deletion pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type ClientSet struct {
serviceAccountTokenPath string
// channel to signal shutting down the client set. Primarily for test.
stopCh <-chan struct{}

agentIdentifiers string // The identifiers of the agent, which will be used
// by the server when choosing agent
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -108,6 +111,7 @@ func (cs *ClientSet) RemoveClient(serverID string) {
type ClientSetConfig struct {
Address string
AgentID string
AgentIdentifiers string
SyncInterval time.Duration
ProbeInterval time.Duration
DialOptions []grpc.DialOption
Expand All @@ -118,6 +122,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
return &ClientSet{
clients: make(map[string]*AgentClient),
agentID: cc.AgentID,
agentIdentifiers: cc.AgentIdentifiers,
address: cc.Address,
syncInterval: cc.SyncInterval,
probeInterval: cc.ProbeInterval,
Expand All @@ -128,7 +133,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
}

func (cs *ClientSet) newAgentClient() (*AgentClient, int, error) {
return newAgentClient(cs.address, cs.agentID, cs, cs.dialOptions...)
return newAgentClient(cs.address, cs.agentID, cs.agentIdentifiers, cs, cs.dialOptions...)
}

func (cs *ClientSet) resetBackoff() *wait.Backoff {
Expand Down
Loading