Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion controller/api/destination/destination_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func FuzzProfileTranslatorUpdate(data []byte) int {

id := watcher.ServiceID{Namespace: "bar", Name: "foo"}
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
translator, err := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
if err != nil {
return 0
}
translator.Start()
defer translator.Stop()
translator.Update(profile)
Expand Down
11 changes: 8 additions & 3 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func newEndpointTranslator(
stream pb.Destination_GetServer,
endStream chan struct{},
log *logging.Entry,
) *endpointTranslator {
) (*endpointTranslator, error) {
log = log.WithFields(logging.Fields{
"component": "endpoint-translator",
"service": service,
Expand All @@ -115,6 +115,11 @@ func newEndpointTranslator(

filteredSnapshot := newEmptyAddressSet()

counter, err := updatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"service": service})
if err != nil {
return nil, fmt.Errorf("failed to create updates queue overflow counter: %w", err)
}

return &endpointTranslator{
controllerNS,
identityTrustDomain,
Expand All @@ -133,10 +138,10 @@ func newEndpointTranslator(
stream,
endStream,
log,
updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
counter,
make(chan interface{}, updateQueueCapacity),
make(chan struct{}),
}
}, nil
}

func (et *endpointTranslator) Add(set watcher.AddressSet) {
Expand Down
19 changes: 14 additions & 5 deletions controller/api/destination/federated_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
return
}

translator := newEndpointTranslator(
translator, err := newEndpointTranslator(
fs.config.ControllerNS,
remoteConfig.TrustDomain,
fs.config.ForceOpaqueTransport,
Expand All @@ -365,13 +365,18 @@ func (fs *federatedService) remoteDiscoverySubscribe(
subscriber.endStream,
fs.log,
)
if err != nil {
fs.log.Errorf("Failed to create endpoint translator for remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
return
}

translator.Start()
subscriber.remoteTranslators[id] = translator

fs.log.Debugf("Subscribing to remote discovery service %s in cluster %s", id.service, id.cluster)
err := remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator)
err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator)
if err != nil {
fs.log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
fs.log.Errorf("Failed to subscribe to remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
}
}

Expand All @@ -397,7 +402,7 @@ func (fs *federatedService) localDiscoverySubscribe(
subscriber *federatedServiceSubscriber,
localDiscovery string,
) {
translator := newEndpointTranslator(
translator, err := newEndpointTranslator(
fs.config.ControllerNS,
fs.config.IdentityTrustDomain,
fs.config.ForceOpaqueTransport,
Expand All @@ -414,11 +419,15 @@ func (fs *federatedService) localDiscoverySubscribe(
subscriber.endStream,
fs.log,
)
if err != nil {
fs.log.Errorf("Failed to create endpoint translator for %s: %s", localDiscovery, err)
return
}
translator.Start()
subscriber.localTranslators[localDiscovery] = translator

fs.log.Debugf("Subscribing to local discovery service %s", localDiscovery)
err := fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
err = fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
if err != nil {
fs.log.Errorf("Failed to subscribe to %s: %s", localDiscovery, err)
}
Expand Down
11 changes: 8 additions & 3 deletions controller/api/destination/profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
},
)

func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) (*profileTranslator, error) {
parentRef := &meta.Metadata{
Kind: &meta.Metadata_Resource{
Resource: &meta.Resource{
Expand All @@ -59,6 +59,11 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
},
}

overflowCounter, err := profileUpdatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)})
if err != nil {
return nil, fmt.Errorf("failed to create profile updates queue overflow counter: %w", err)
}

return &profileTranslator{
fullyQualifiedName: fqn,
port: port,
Expand All @@ -67,10 +72,10 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
stream: stream,
endStream: endStream,
log: log.WithField("component", "profile-translator"),
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
overflowCounter: overflowCounter,
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
stop: make(chan struct{}),
}
}, nil
}

// Update is called from a client-go informer callback and therefore must not
Expand Down
10 changes: 8 additions & 2 deletions controller/api/destination/profile_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,10 @@ func newMockTranslator(t *testing.T) (*profileTranslator, chan *pb.DestinationPr
t.Helper()
id := watcher.ServiceID{Namespace: "bar", Name: "foo"}
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
translator, err := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
if err != nil {
t.Fatalf("failed to create profile translator: %s", err)
}
return translator, server.profilesReceived
}

Expand Down Expand Up @@ -593,7 +596,10 @@ func TestProfileTranslator(t *testing.T) {

t.Run("Sends empty update", func(t *testing.T) {
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := newProfileTranslator(watcher.ID{}, server, logging.WithField("test", t.Name()), "", 80, nil)
translator, err := newProfileTranslator(watcher.ID{}, server, logging.WithField("test", t.Name()), "", 80, nil)
if err != nil {
t.Fatalf("failed to create profile translator: %s", err)
}

translator.Start()
defer translator.Stop()
Expand Down
17 changes: 13 additions & 4 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
log.Errorf("Failed to get remote cluster %s", cluster)
return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster)
}
translator := newEndpointTranslator(
translator, err := newEndpointTranslator(
s.config.ControllerNS,
remoteConfig.TrustDomain,
s.config.ForceOpaqueTransport,
Expand All @@ -220,6 +220,9 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
streamEnd,
log,
)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
}
translator.Start()
defer translator.Stop()

Expand All @@ -238,7 +241,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
} else {
log.Debug("Local discovery service detected")
// Local discovery
translator := newEndpointTranslator(
translator, err := newEndpointTranslator(
s.config.ControllerNS,
s.config.IdentityTrustDomain,
s.config.ForceOpaqueTransport,
Expand All @@ -255,6 +258,9 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
streamEnd,
log,
)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
}
translator.Start()
defer translator.Stop()

Expand Down Expand Up @@ -397,7 +403,10 @@ func (s *server) subscribeToServiceProfile(
// We build up the pipeline of profile updaters backwards, starting from
// the translator which takes profile updates, translates them to protobuf
// and pushes them onto the gRPC stream.
translator := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
translator, err := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create profile translator: %s", err)
}
translator.Start()
defer translator.Stop()

Expand All @@ -408,7 +417,7 @@ func (s *server) subscribeToServiceProfile(

// Create an adaptor that merges service-level opaque port configurations
// onto profile updates.
err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
err = s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
if err != nil {
log.Warnf("Failed to subscribe to service updates for %s: %s", service, err)
return err
Expand Down
5 changes: 4 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ metadata:
metadataAPI.Sync(nil)

mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
translator := newEndpointTranslator(
translator, err := newEndpointTranslator(
"linkerd",
"trust.domain",
forceOpaqueTransport,
Expand All @@ -1136,5 +1136,8 @@ metadata:
nil,
logging.WithField("test", t.Name()),
)
if err != nil {
t.Fatalf("failed to create endpoint translator: %s", err)
}
return mockGetServer, translator
}
22 changes: 15 additions & 7 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string,

sp := ew.getOrNewServicePublisher(id)

sp.subscribe(port, hostname, listener)
return nil
return sp.subscribe(port, hostname, listener)
}

// Unsubscribe removes a listener from the subscribers list for this authority.
Expand Down Expand Up @@ -659,7 +658,7 @@ func (sp *servicePublisher) updateService(newService *corev1.Service) {

}

func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -669,10 +668,15 @@ func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener En
}
port, ok := sp.ports[key]
if !ok {
port = sp.newPortPublisher(srcPort, hostname)
var err error
port, err = sp.newPortPublisher(srcPort, hostname)
if err != nil {
return err
}
sp.ports[key] = port
}
port.subscribe(listener)
return nil
}

func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
Expand All @@ -693,7 +697,7 @@ func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener
}
}

func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *portPublisher {
func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) (*portPublisher, error) {
targetPort := intstr.FromInt(int(srcPort))
svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name)
if err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -707,6 +711,10 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por

log := sp.log.WithField("port", srcPort)

metrics, err := endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname))
if err != nil {
return nil, err
}
port := &portPublisher{
listeners: []EndpointUpdateListener{},
targetPort: targetPort,
Expand All @@ -716,7 +724,7 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por
k8sAPI: sp.k8sAPI,
metadataAPI: sp.metadataAPI,
log: log,
metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)),
metrics: metrics,
enableEndpointSlices: sp.enableEndpointSlices,
localTrafficPolicy: sp.localTrafficPolicy,
}
Expand Down Expand Up @@ -744,7 +752,7 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por
}
}

return port
return port, nil
}

func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels {
Expand Down
14 changes: 12 additions & 2 deletions controller/api/destination/watcher/opaque_ports_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat
numListeners = float64(len(ss.listeners))
}

opw.subscribersGauge.With(id.Labels()).Set(numListeners)
gauge, err := opw.subscribersGauge.GetMetricWith(id.Labels())
if err != nil {
opw.log.Errorf("failed to get service_subscribers metric: %q", err)
} else {
gauge.Set(numListeners)
}

return nil
}
Expand All @@ -125,7 +130,12 @@ func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpd

labels := id.Labels()
if len(ss.listeners) > 0 {
opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
gauge, err := opw.subscribersGauge.GetMetricWith(labels)
if err != nil {
opw.log.Errorf("failed to get service_subscribers metric: %q", err)
} else {
gauge.Set(float64(len(ss.listeners)))
}
} else {
if !opw.subscribersGauge.Delete(labels) {
opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
Expand Down
Loading