Skip to content

Commit 5e8ec0e

Browse files
committed
envoy per gateway
* create a xdsIR per gateway * create a infraIR per gateway * use the gateway namespace-name as the key for above IRs * populate the envoy bootstrap config with a node id that matches the IR key * populate the xds server snapshot for each xds request based on the node id Fixes: #349 Signed-off-by: Arko Dasgupta <[email protected]>
1 parent 70a0d3c commit 5e8ec0e

File tree

13 files changed

+118
-175
lines changed

13 files changed

+118
-175
lines changed

internal/cmd/xdstest.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,19 +187,19 @@ func xDSTest() error {
187187
for {
188188
time.Sleep(10 * time.Second)
189189
logger.Info("Updating the cache for first-listener with first-route")
190-
err := snapCache.GenerateNewSnapshot(cacheVersion1.GetXdsResources())
190+
err := snapCache.GenerateNewSnapshot("", cacheVersion1.GetXdsResources())
191191
if err != nil {
192192
logger.Error(err, "Something went wrong with generating a snapshot")
193193
}
194194
time.Sleep(10 * time.Second)
195195
logger.Info("Updating the cache for first-listener with second-route")
196-
err = snapCache.GenerateNewSnapshot(cacheVersion2.GetXdsResources())
196+
err = snapCache.GenerateNewSnapshot("", cacheVersion2.GetXdsResources())
197197
if err != nil {
198198
logger.Error(err, "Something went wrong with generating a snapshot")
199199
}
200200
time.Sleep(10 * time.Second)
201201
logger.Info("Updating the cache for second-listener with second-route")
202-
err = snapCache.GenerateNewSnapshot(cacheVersion3.GetXdsResources())
202+
err = snapCache.GenerateNewSnapshot("", cacheVersion3.GetXdsResources())
203203
if err != nil {
204204
logger.Error(err, "Something went wrong with generating a snapshot")
205205
}

internal/gatewayapi/runner/runner.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,22 +95,24 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
9595
yamlInfraIR, _ := yaml.Marshal(&result.InfraIR)
9696
r.Logger.Info(string(yamlInfraIR))
9797

98-
// Publish the IRs. Use the service name as the key
99-
// to ensure there is always one element in the map.
98+
// Publish the IRs.
10099
// Also validate the ir before sending it.
101-
if err := result.InfraIR.Validate(); err != nil {
102-
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
103-
} else {
104-
r.InfraIR.Store(r.Name(), result.InfraIR)
100+
for key, val := range result.InfraIR {
101+
if err := val.Validate(); err != nil {
102+
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
103+
} else {
104+
r.InfraIR.Store(key, val)
105+
}
105106
}
106-
107107
// Wait until all HTTPRoutes have been reconciled , else the translation
108108
// result will be incomplete, and might cause churn in the data plane.
109109
if xdsIRReady {
110-
if err := result.XdsIR.Validate(); err != nil {
111-
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
112-
} else {
113-
r.XdsIR.Store(r.Name(), result.XdsIR)
110+
for key, val := range result.XdsIR {
111+
if err := val.Validate(); err != nil {
112+
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
113+
} else {
114+
r.XdsIR.Store(key, val)
115+
}
114116
}
115117
}
116118

internal/gatewayapi/translator.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ const (
2222
KindService = "Service"
2323
KindSecret = "Secret"
2424

25-
// OwningGatewayClassLabel is the owner reference label used for managed infra.
26-
// The value should be the name of the accepted Envoy GatewayClass.
27-
OwningGatewayClassLabel = "gateway.envoyproxy.io/owning-gatewayclass"
25+
// OwningGatewayLabel is the owner reference label used for managed infra.
26+
// The value should be the name of the accepted Envoy Gateway.
27+
OwningGatewayLabel = "gateway.envoyproxy.io/owning-gateway"
2828

2929
// minEphemeralPort is the first port in the ephemeral port range.
3030
minEphemeralPort = 1024
@@ -33,6 +33,9 @@ const (
3333
wellKnownPortShift = 10000
3434
)
3535

36+
type XdsIRMap map[string]*ir.Xds
37+
type InfraIRMap map[string]*ir.Infra
38+
3639
// Resources holds the Gateway API and related
3740
// resources that the translators needs as inputs.
3841
type Resources struct {
@@ -83,11 +86,11 @@ type Translator struct {
8386
type TranslateResult struct {
8487
Gateways []*v1beta1.Gateway
8588
HTTPRoutes []*v1beta1.HTTPRoute
86-
XdsIR *ir.Xds
87-
InfraIR *ir.Infra
89+
XdsIR XdsIRMap
90+
InfraIR InfraIRMap
8891
}
8992

90-
func newTranslateResult(gateways []*GatewayContext, httpRoutes []*HTTPRouteContext, xdsIR *ir.Xds, infraIR *ir.Infra) *TranslateResult {
93+
func newTranslateResult(gateways []*GatewayContext, httpRoutes []*HTTPRouteContext, xdsIR XdsIRMap, infraIR InfraIRMap) *TranslateResult {
9194
translateResult := &TranslateResult{
9295
XdsIR: xdsIR,
9396
InfraIR: infraIR,
@@ -104,11 +107,8 @@ func newTranslateResult(gateways []*GatewayContext, httpRoutes []*HTTPRouteConte
104107
}
105108

106109
func (t *Translator) Translate(resources *Resources) *TranslateResult {
107-
xdsIR := &ir.Xds{}
108-
109-
infraIR := ir.NewInfra()
110-
infraIR.Proxy.Name = string(t.GatewayClassName)
111-
infraIR.Proxy.GetProxyMetadata().Labels = GatewayClassOwnerLabel(string(t.GatewayClassName))
110+
xdsIR := make(XdsIRMap)
111+
infraIR := make(InfraIRMap)
112112

113113
// Get Gateways belonging to our GatewayClass.
114114
gateways := t.GetRelevantGateways(resources.Gateways)
@@ -129,6 +129,7 @@ func (t *Translator) GetRelevantGateways(gateways []*v1beta1.Gateway) []*Gateway
129129
if gateway == nil {
130130
panic("received nil gateway")
131131
}
132+
132133
if gateway.Spec.GatewayClassName == t.GatewayClassName {
133134
gc := &GatewayContext{
134135
Gateway: gateway.DeepCopy(),
@@ -151,12 +152,13 @@ type portListeners struct {
151152
hostnames map[string]int
152153
}
153154

154-
func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR *ir.Xds, infraIR *ir.Infra, resources *Resources) {
155+
func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap, infraIR InfraIRMap, resources *Resources) {
155156
portListenerInfo := map[v1beta1.PortNumber]*portListeners{}
156157

157158
// Iterate through all listeners and collect info about protocols
158159
// and hostnames per port.
159160
for _, gateway := range gateways {
161+
160162
for _, listener := range gateway.listeners {
161163
if portListenerInfo[listener.Port] == nil {
162164
portListenerInfo[listener.Port] = &portListeners{
@@ -221,6 +223,16 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR *ir.Xds,
221223
// and compute status for each, and add valid ones
222224
// to the Xds IR.
223225
for _, gateway := range gateways {
226+
// init IR per gateway
227+
irKey := irStringKey(gateway.Gateway)
228+
gwXdsIR := &ir.Xds{}
229+
gwInfraIR := ir.NewInfra()
230+
gwInfraIR.Proxy.Name = irKey
231+
gwInfraIR.Proxy.GetProxyMetadata().Labels = GatewayOwnerLabel(gateway.Name)
232+
// save the IR references in the map before the translation starts
233+
xdsIR[irKey] = gwXdsIR
234+
infraIR[irKey] = gwInfraIR
235+
224236
for _, listener := range gateway.listeners {
225237
// Process protocol & supported kinds
226238
switch listener.Protocol {
@@ -453,7 +465,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR *ir.Xds,
453465
// see more https://gateway-api.sigs.k8s.io/references/spec/#gateway.networking.k8s.io/v1beta1.Listener.
454466
irListener.Hostnames = append(irListener.Hostnames, "*")
455467
}
456-
xdsIR.HTTP = append(xdsIR.HTTP, irListener)
468+
gwXdsIR.HTTP = append(gwXdsIR.HTTP, irListener)
457469

458470
// Add the listener to the Infra IR. Infra IR ports must have a unique port number.
459471
if !slices.Contains(foundPorts, servicePort) {
@@ -469,7 +481,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR *ir.Xds,
469481
ContainerPort: containerPort,
470482
}
471483
// Only 1 listener is supported.
472-
infraIR.Proxy.Listeners[0].Ports = append(infraIR.Proxy.Listeners[0].Ports, infraPort)
484+
gwInfraIR.Proxy.Listeners[0].Ports = append(gwInfraIR.Proxy.Listeners[0].Ports, infraPort)
473485
}
474486
}
475487
}
@@ -488,7 +500,7 @@ func servicePortToContainerPort(servicePort int32) int32 {
488500
return servicePort
489501
}
490502

491-
func (t *Translator) ProcessHTTPRoutes(httpRoutes []*v1beta1.HTTPRoute, gateways []*GatewayContext, resources *Resources, xdsIR *ir.Xds) []*HTTPRouteContext {
503+
func (t *Translator) ProcessHTTPRoutes(httpRoutes []*v1beta1.HTTPRoute, gateways []*GatewayContext, resources *Resources, xdsIR XdsIRMap) []*HTTPRouteContext {
492504
var relevantHTTPRoutes []*HTTPRouteContext
493505

494506
for _, h := range httpRoutes {
@@ -1022,8 +1034,8 @@ func (t *Translator) ProcessHTTPRoutes(httpRoutes []*v1beta1.HTTPRoute, gateways
10221034
})
10231035
}
10241036
}
1025-
1026-
irListener := xdsIR.GetListener(irListenerName(listener))
1037+
irKey := irStringKey(listener.gateway)
1038+
irListener := xdsIR[irKey].GetListener(irListenerName(listener))
10271039
irListener.Routes = append(irListener.Routes, perHostRoutes...)
10281040

10291041
// Theoretically there should only be one parent ref per
@@ -1139,6 +1151,10 @@ func isValidHostname(hostname string) error {
11391151
return nil
11401152
}
11411153

1154+
func irStringKey(gateway *v1beta1.Gateway) string {
1155+
return fmt.Sprintf("%s-%s", gateway.Namespace, gateway.Name)
1156+
}
1157+
11421158
func irListenerName(listener *ListenerContext) string {
11431159
return fmt.Sprintf("%s-%s-%s", listener.gateway.Namespace, listener.gateway.Name, listener.Name)
11441160
}
@@ -1162,8 +1178,8 @@ func irTLSConfig(tlsSecret *v1.Secret) *ir.TLSListenerConfig {
11621178
}
11631179
}
11641180

1165-
// GatewayClassOwnerLabel returns the GatewayCLass Owner label using
1181+
// GatewayOwnerLabel returns the Gateway Owner label using
11661182
// the provided name as the value.
1167-
func GatewayClassOwnerLabel(name string) map[string]string {
1168-
return map[string]string{OwningGatewayClassLabel: name}
1183+
func GatewayOwnerLabel(name string) map[string]string {
1184+
return map[string]string{OwningGatewayLabel: name}
11691185
}

internal/infrastructure/kubernetes/bootstrap.yaml.tpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ dynamic_resources:
2525
set_node_on_first_message_only: true
2626
node:
2727
cluster: envoy-gateway-system
28-
id: envoy-default
28+
id: {{ .NodeId }}
2929
static_resources:
3030
clusters:
3131
- connect_timeout: 1s

internal/infrastructure/kubernetes/deployment.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type bootstrapParameters struct {
6464
XdsServer xdsServerParameters
6565
// AdminServer defines the configuration of the Envoy admin interface.
6666
AdminServer adminServerParameters
67+
// NodeId defined the node ID of the Envoy instance.
68+
NodeId string
6769
}
6870

6971
type xdsServerParameters struct {
@@ -102,7 +104,7 @@ func (i *Infra) expectedDeployment(infra *ir.Infra) (*appsv1.Deployment, error)
102104

103105
// Set the labels based on the owning gatewayclass name.
104106
labels := envoyLabels(infra.GetProxyInfra().GetProxyMetadata().Labels)
105-
if _, ok := labels[gatewayapi.OwningGatewayClassLabel]; !ok {
107+
if _, ok := labels[gatewayapi.OwningGatewayLabel]; !ok {
106108
return nil, fmt.Errorf("missing owning gatewayclass label")
107109
}
108110

@@ -164,6 +166,7 @@ func expectedContainers(infra *ir.Infra) ([]corev1.Container, error) {
164166
Port: envoyAdminPort,
165167
AccessLogPath: envoyAdminAccessLogPath,
166168
},
169+
NodeId: infra.Proxy.Name,
167170
},
168171
}
169172
if err := cfg.render(); err != nil {

internal/infrastructure/kubernetes/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (i *Infra) expectedService(infra *ir.Infra) (*corev1.Service, error) {
3434

3535
// Set the labels based on the owning gatewayclass name.
3636
labels := envoyLabels(infra.GetProxyInfra().GetProxyMetadata().Labels)
37-
if _, ok := labels[gatewayapi.OwningGatewayClassLabel]; !ok {
37+
if _, ok := labels[gatewayapi.OwningGatewayLabel]; !ok {
3838
return nil, fmt.Errorf("missing owning gatewayclass label")
3939
}
4040

internal/infrastructure/runner/runner.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,23 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
4343
// Subscribe to resources
4444
for range r.InfraIR.Subscribe(ctx) {
4545
r.Logger.Info("received a notification")
46-
in := r.InfraIR.Get()
47-
switch {
48-
case in == nil:
49-
// The resource map is nil at startup.
50-
r.Logger.Info("infra ir is nil, skipping")
51-
continue
52-
case in.Proxy == nil:
53-
if err := r.mgr.DeleteInfra(ctx, in); err != nil {
54-
r.Logger.Error(err, "failed to delete infra")
55-
}
56-
default:
57-
// Manage the proxy infra.
58-
if err := r.mgr.CreateInfra(ctx, in); err != nil {
59-
r.Logger.Error(err, "failed to create new infra")
46+
for _, in := range r.InfraIR.LoadAll() {
47+
switch {
48+
case in == nil:
49+
// The resource map is nil at startup.
50+
r.Logger.Info("infra ir is nil, skipping")
51+
continue
52+
case in.Proxy == nil:
53+
if err := r.mgr.DeleteInfra(ctx, in); err != nil {
54+
r.Logger.Error(err, "failed to delete infra")
55+
}
56+
default:
57+
// Manage the proxy infra.
58+
if err := r.mgr.CreateInfra(ctx, in); err != nil {
59+
r.Logger.Error(err, "failed to create new infra")
60+
}
6061
}
6162
}
63+
r.Logger.Info("subscriber shutting down")
6264
}
63-
64-
r.Logger.Info("subscriber shutting down")
6565
}

internal/message/types.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,36 +93,12 @@ type XdsIR struct {
9393
watchable.Map[string, *ir.Xds]
9494
}
9595

96-
func (x *XdsIR) Get() *ir.Xds {
97-
// Iterate and return the first element
98-
for _, v := range x.LoadAll() {
99-
return v
100-
}
101-
return nil
102-
}
103-
10496
// InfraIR message
10597
type InfraIR struct {
10698
watchable.Map[string, *ir.Infra]
10799
}
108100

109-
func (i *InfraIR) Get() *ir.Infra {
110-
// Iterate and return the first element
111-
for _, v := range i.LoadAll() {
112-
return v
113-
}
114-
return nil
115-
}
116-
117101
// Xds message
118102
type Xds struct {
119103
watchable.Map[string, *xdstypes.ResourceVersionTable]
120104
}
121-
122-
func (x *Xds) Get() *xdstypes.ResourceVersionTable {
123-
// Iterate and return the first element
124-
for _, v := range x.LoadAll() {
125-
return v
126-
}
127-
return nil
128-
}

internal/message/types_test.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,6 @@ import (
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1212
"k8s.io/apimachinery/pkg/types"
1313
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
14-
15-
"github.com/envoyproxy/gateway/internal/ir"
16-
xdstypes "github.com/envoyproxy/gateway/internal/xds/types"
17-
listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
18-
xdscachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
19-
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
2014
)
2115

2216
func TestProviderResources(t *testing.T) {
@@ -166,36 +160,3 @@ func TestProviderResources(t *testing.T) {
166160
resources.DeleteGatewayClasses()
167161
assert.Nil(t, resources.GetGatewayClasses())
168162
}
169-
170-
func TestXdsIR(t *testing.T) {
171-
msg := new(XdsIR)
172-
assert.Nil(t, msg.Get())
173-
in := &ir.Xds{
174-
HTTP: []*ir.HTTPListener{{Name: "test"}},
175-
}
176-
msg.Store("xds-ir", in)
177-
assert.Equal(t, msg.Get(), in)
178-
}
179-
180-
func TestInfraIR(t *testing.T) {
181-
msg := new(InfraIR)
182-
assert.Nil(t, msg.Get())
183-
in := &ir.Infra{
184-
Proxy: &ir.ProxyInfra{Name: "test"},
185-
}
186-
msg.Store("infra-ir", in)
187-
assert.Equal(t, msg.Get(), in)
188-
}
189-
190-
func TestXds(t *testing.T) {
191-
msg := new(Xds)
192-
assert.Nil(t, msg.Get())
193-
in := &xdstypes.ResourceVersionTable{
194-
XdsResources: xdstypes.XdsResources{
195-
resourcev3.ListenerType: []xdscachetypes.Resource{&listenerv3.Listener{Name: "test"}},
196-
},
197-
}
198-
msg.Store("xds", in)
199-
diff := cmp.Diff(in, msg.Get(), protocmp.Transform())
200-
require.Empty(t, diff)
201-
}

internal/provider/kubernetes/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (r *gatewayReconciler) hasMatchingController(obj client.Object) bool {
113113
func (r *gatewayReconciler) enqueueRequestForOwningGatewayClass() handler.EventHandler {
114114
return handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
115115
labels := a.GetLabels()
116-
gcName, found := labels[gatewayapi.OwningGatewayClassLabel]
116+
gcName, found := labels[gatewayapi.OwningGatewayLabel]
117117
if found {
118118
var reqs []reconcile.Request
119119
for _, gw := range r.resources.Gateways.LoadAll() {

0 commit comments

Comments
 (0)