Skip to content

Commit 834b32e

Browse files
authored
feat(destionation): add --stream-queue-capacity arg (#14622)
The destination controller has a hardcoded stream queue capacity of 100. This commit introduces a new command-line argument `--stream-queue-capacity` that allows users to customize this value according to their needs. We believe that '1' is a better default value for most use cases, as it reduces memory consumption and latency. However, we want to be incremental about this change. Therefore, this configuration allows for us to test this new default value in a controlled manner.
1 parent 3e89f44 commit 834b32e

File tree

9 files changed

+37
-11
lines changed

9 files changed

+37
-11
lines changed

controller/api/destination/endpoint_profile_translator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func newEndpointProfileTranslator(
5454
stream pb.Destination_GetProfileServer,
5555
endStream chan struct{},
5656
log *logging.Entry,
57+
queueCapacity int,
5758
) *endpointProfileTranslator {
5859
return &endpointProfileTranslator{
5960
forceOpaqueTransport: forceOpaqueTransport,
@@ -66,7 +67,7 @@ func newEndpointProfileTranslator(
6667

6768
stream: stream,
6869
endStream: endStream,
69-
updates: make(chan *watcher.Address, updateQueueCapacity),
70+
updates: make(chan *watcher.Address, queueCapacity),
7071
stop: make(chan struct{}),
7172

7273
log: log.WithField("component", "endpoint-profile-translator"),

controller/api/destination/endpoint_profile_translator_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
4444
mockGetProfileServer,
4545
nil,
4646
log,
47+
DefaultStreamQueueCapacity,
4748
)
4849
translator.Start()
4950
defer translator.Stop()
@@ -83,17 +84,19 @@ func TestEndpointProfileTranslator(t *testing.T) {
8384
}
8485
log := logging.WithField("test", t.Name())
8586
endStream := make(chan struct{})
87+
queueCapacity := DefaultStreamQueueCapacity
8688
translator := newEndpointProfileTranslator(
8789
true, true, "cluster", "identity", make(map[uint32]struct{}), nil,
8890
mockGetProfileServer,
8991
endStream,
9092
log,
93+
queueCapacity,
9194
)
9295

9396
// We avoid starting the translator so that it doesn't drain its update
9497
// queue and we can test the overflow behavior.
9598

96-
for i := 0; i < updateQueueCapacity/2; i++ {
99+
for i := 0; i < queueCapacity/2; i++ {
97100
if err := translator.Update(podAddr); err != nil {
98101
t.Fatal("Expected update")
99102
}
@@ -114,10 +117,10 @@ func TestEndpointProfileTranslator(t *testing.T) {
114117
}
115118

116119
// The queue should be full and the next update should fail.
117-
t.Logf("Queue length=%d capacity=%d", translator.queueLen(), updateQueueCapacity)
120+
t.Logf("Queue length=%d capacity=%d", translator.queueLen(), queueCapacity)
118121
if err := translator.Update(podAddr); err == nil {
119122
if !errors.Is(err, http.ErrServerClosed) {
120-
t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), updateQueueCapacity)
123+
t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), queueCapacity)
121124
}
122125
}
123126

controller/api/destination/endpoint_translator.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ const (
2727
envAdminListenAddr = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"
2828
envControlListenAddr = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"
2929

30-
updateQueueCapacity = 100
31-
3230
defaultProxyInboundPort = 4143
3331
)
3432

@@ -101,6 +99,7 @@ func newEndpointTranslator(
10199
stream pb.Destination_GetServer,
102100
endStream chan struct{},
103101
log *logging.Entry,
102+
queueCapacity int,
104103
) (*endpointTranslator, error) {
105104
log = log.WithFields(logging.Fields{
106105
"component": "endpoint-translator",
@@ -139,7 +138,7 @@ func newEndpointTranslator(
139138
endStream,
140139
log,
141140
counter,
142-
make(chan interface{}, updateQueueCapacity),
141+
make(chan interface{}, queueCapacity),
143142
make(chan struct{}),
144143
}, nil
145144
}

controller/api/destination/federated_service_watcher.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
364364
subscriber.stream,
365365
subscriber.endStream,
366366
fs.log,
367+
fs.config.StreamQueueCapacity,
367368
)
368369
if err != nil {
369370
fs.log.Errorf("Failed to create endpoint translator for remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
@@ -418,6 +419,7 @@ func (fs *federatedService) localDiscoverySubscribe(
418419
subscriber.stream,
419420
subscriber.endStream,
420421
fs.log,
422+
fs.config.StreamQueueCapacity,
421423
)
422424
if err != nil {
423425
fs.log.Errorf("Failed to create endpoint translator for %s: %s", localDiscovery, err)

controller/api/destination/federated_service_watcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error)
151151
if err != nil {
152152
return nil, fmt.Errorf("NewClusterStoreWithDecoder returned an error: %w", err)
153153
}
154-
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
154+
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{StreamQueueCapacity: DefaultStreamQueueCapacity}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
155155
if err != nil {
156156
return nil, fmt.Errorf("newFederatedServiceWatcher returned an error: %w", err)
157157
}

controller/api/destination/profile_translator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
4747
)
4848

4949
func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) (*profileTranslator, error) {
50+
return newProfileTranslatorWithCapacity(serviceID, stream, log, fqn, port, endStream, DefaultStreamQueueCapacity)
51+
}
52+
53+
func newProfileTranslatorWithCapacity(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}, queueCapacity int) (*profileTranslator, error) {
5054
parentRef := &meta.Metadata{
5155
Kind: &meta.Metadata_Resource{
5256
Resource: &meta.Resource{
@@ -73,7 +77,7 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
7377
endStream: endStream,
7478
log: log.WithField("component", "profile-translator"),
7579
overflowCounter: overflowCounter,
76-
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
80+
updates: make(chan *sp.ServiceProfile, queueCapacity),
7781
stop: make(chan struct{}),
7882
}, nil
7983
}

controller/api/destination/server.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import (
2424
kerrors "k8s.io/apimachinery/pkg/api/errors"
2525
)
2626

27+
// DefaultStreamQueueCapacity defines the default maximum number of pending
28+
// updates buffered per stream before the stream is closed.
29+
const DefaultStreamQueueCapacity = 100
30+
2731
type (
2832
Config struct {
2933
ControllerNS,
@@ -39,6 +43,8 @@ type (
3943
MeshedHttp2ClientParams *pb.Http2ClientParams
4044

4145
DefaultOpaquePorts map[uint32]struct{}
46+
47+
StreamQueueCapacity int
4248
}
4349

4450
server struct {
@@ -219,6 +225,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
219225
stream,
220226
streamEnd,
221227
log,
228+
s.config.StreamQueueCapacity,
222229
)
223230
if err != nil {
224231
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
@@ -257,6 +264,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
257264
stream,
258265
streamEnd,
259266
log,
267+
s.config.StreamQueueCapacity,
260268
)
261269
if err != nil {
262270
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
@@ -403,7 +411,7 @@ func (s *server) subscribeToServiceProfile(
403411
// We build up the pipeline of profile updaters backwards, starting from
404412
// the translator which takes profile updates, translates them to protobuf
405413
// and pushes them onto the gRPC stream.
406-
translator, err := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
414+
translator, err := newProfileTranslatorWithCapacity(service, stream, log, fqn, port, streamEnd, s.config.StreamQueueCapacity)
407415
if err != nil {
408416
return status.Errorf(codes.Internal, "Failed to create profile translator: %s", err)
409417
}
@@ -552,6 +560,7 @@ func (s *server) subscribeToEndpointProfile(
552560
stream,
553561
streamEnd,
554562
log,
563+
s.config.StreamQueueCapacity,
555564
)
556565
translator.Start()
557566
defer translator.Stop()

controller/api/destination/test_util.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ spec:
10621062
t.Fatalf("can't create cluster store: %s", err)
10631063
}
10641064

1065-
federatedServices, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, endpoints, log)
1065+
federatedServices, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{StreamQueueCapacity: DefaultStreamQueueCapacity}, clusterStore, endpoints, log)
10661066
if err != nil {
10671067
t.Fatalf("can't create federated service watcher: %s", err)
10681068
}
@@ -1082,6 +1082,7 @@ spec:
10821082
ClusterDomain: "mycluster.local",
10831083
IdentityTrustDomain: "trust.domain",
10841084
DefaultOpaquePorts: defaultOpaquePorts,
1085+
StreamQueueCapacity: DefaultStreamQueueCapacity,
10851086
},
10861087
workloads,
10871088
endpoints,
@@ -1191,6 +1192,7 @@ metadata:
11911192
mockGetServer,
11921193
nil,
11931194
logging.WithField("test", t.Name()),
1195+
DefaultStreamQueueCapacity,
11941196
)
11951197
if err != nil {
11961198
t.Fatalf("failed to create endpoint translator: %s", err)

controller/cmd/destination/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func Main(args []string) {
4747
// This will default to true. It can be overridden with experimental CLI
4848
// flags. Currently not exposed as a configuration value through Helm.
4949
exportControllerQueueMetrics := cmd.Bool("export-queue-metrics", true, "Exports queue metrics for the external workload controller")
50+
streamQueueCapacity := cmd.Int("stream-queue-capacity", destination.DefaultStreamQueueCapacity, "Maximum number of updates buffered per stream before the stream is closed")
5051

5152
traceCollector := flags.AddTraceFlags(cmd)
5253

@@ -66,6 +67,10 @@ func Main(args []string) {
6667

6768
flags.ConfigureAndParse(cmd, args)
6869

70+
if *streamQueueCapacity <= 0 {
71+
log.Fatalf("--stream-queue-capacity must be greater than 0")
72+
}
73+
6974
if *enableIPv6 && !*enableEndpointSlices {
7075
log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true")
7176
}
@@ -190,6 +195,7 @@ func Main(args []string) {
190195
EnableIPv6: *enableIPv6,
191196
ExtEndpointZoneWeights: *extEndpointZoneWeights,
192197
MeshedHttp2ClientParams: meshedHTTP2ClientParams,
198+
StreamQueueCapacity: *streamQueueCapacity,
193199
}
194200
server, err := destination.NewServer(
195201
*addr,

0 commit comments

Comments
 (0)