Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 167 additions & 62 deletions pkg/controllers/hub/trafficmanagerbackend/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -379,7 +382,7 @@
if condition.IsConditionStatusTrue(programmedCondition, profile.GetGeneration()) {
return profile, nil // return directly if the trafficManagerProfile is programmed
} else if condition.IsConditionStatusFalse(programmedCondition, profile.GetGeneration()) {
setFalseCondition(backend, nil, fmt.Sprintf("Invalid trafficManagerProfile %q: %v", backend.Spec.Profile.Name, programmedCondition.Message))
setFalseCondition(backend, nil, fmt.Sprintf("Invalid trafficManagerProfile %q, please check the trafficManagerProfile status", backend.Spec.Profile.Name))
} else {
setUnknownCondition(backend, fmt.Sprintf("In the processing of trafficManagerProfile %q", backend.Spec.Profile.Name))
}
Expand Down Expand Up @@ -773,53 +776,160 @@
For(&fleetnetv1beta1.TrafficManagerBackend{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(
&fleetnetv1beta1.TrafficManagerProfile{},
handler.EnqueueRequestsFromMapFunc(r.trafficManagerProfileEventHandler()),
handler.Funcs{
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received trafficManagerProfile update event", "trafficManagerProfile", klog.KObj(e.ObjectNew))
if e.ObjectOld == nil || e.ObjectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
"Failed to process an update event for trafficManagerProfile object")
return
}

Check warning on line 786 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L783-L786

Added lines #L783 - L786 were not covered by tests
oldProfile, ok := e.ObjectOld.(*fleetnetv1beta1.TrafficManagerProfile)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a trafficManagerProfile object", e.ObjectOld)),
"Failed to process an update event for trafficManagerProfile object")
return
}

Check warning on line 792 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L789-L792

Added lines #L789 - L792 were not covered by tests
newProfile, ok := e.ObjectNew.(*fleetnetv1beta1.TrafficManagerProfile)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a trafficManagerProfile object", e.ObjectNew)),
"Failed to process an update event for trafficManagerProfile object")
return
}

Check warning on line 798 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L795-L798

Added lines #L795 - L798 were not covered by tests
if !shouldHandleTrafficManagerProfileUpdateEvent(oldProfile, newProfile) {
klog.V(2).InfoS("Skipping requeueing trafficManagerProfile update event", "trafficManagerProfile", klog.KObj(e.ObjectNew))
return // no need to requeue if the clusters haven't changed
}

Check warning on line 802 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L800-L802

Added lines #L800 - L802 were not covered by tests
r.handleTrafficManagerProfileEvent(ctx, e.ObjectNew, q)
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received trafficManagerProfile delete event", "trafficManagerProfile", klog.KObj(e.Object))
r.handleTrafficManagerProfileEvent(ctx, e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received trafficManagerProfile generic event", "trafficManagerProfile", klog.KObj(e.Object))
r.handleTrafficManagerProfileEvent(ctx, e.Object, q)
},

Check warning on line 812 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L809-L812

Added lines #L809 - L812 were not covered by tests
},
).
Watches(
&fleetnetv1alpha1.ServiceImport{},
handler.EnqueueRequestsFromMapFunc(r.serviceImportEventHandler()),
handler.Funcs{
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received serviceImport update event", "serviceImport", klog.KObj(e.ObjectNew))
if e.ObjectOld == nil || e.ObjectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
"Failed to process an update event for serviceImport object")
return
}

Check warning on line 824 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L821-L824

Added lines #L821 - L824 were not covered by tests
oldServiceImport, ok := e.ObjectOld.(*fleetnetv1alpha1.ServiceImport)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a serviceImport object", e.ObjectOld)),
"Failed to process an update event for serviceImport object")
return
}

Check warning on line 830 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L827-L830

Added lines #L827 - L830 were not covered by tests
newServiceImport, ok := e.ObjectNew.(*fleetnetv1alpha1.ServiceImport)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a serviceImport object", e.ObjectNew)),
"Failed to process an update event for serviceImport object")
return
}

Check warning on line 836 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L833-L836

Added lines #L833 - L836 were not covered by tests
if !shouldHandleServiceImportUpateEvent(oldServiceImport, newServiceImport) {
klog.V(2).InfoS("Skipping requeueing serviceImport update event", "serviceImport", klog.KObj(e.ObjectNew))
return // no need to requeue if the clusters haven't changed
}

Check warning on line 840 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L838-L840

Added lines #L838 - L840 were not covered by tests
r.handleServiceImportEvent(ctx, e.ObjectNew, q)
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received serviceImport delete event", "serviceImport", klog.KObj(e.Object))
r.handleServiceImportEvent(ctx, e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received serviceImport generic event", "serviceImport", klog.KObj(e.Object))
r.handleServiceImportEvent(ctx, e.Object, q)
},

Check warning on line 850 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L847-L850

Added lines #L847 - L850 were not covered by tests
},
).
Watches(
&fleetnetv1alpha1.InternalServiceExport{},
handler.EnqueueRequestsFromMapFunc(r.internalServiceExportEventHandler()),
handler.Funcs{
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received internalServiceExport update event", "internalServiceExport", klog.KObj(e.ObjectNew))
if e.ObjectOld == nil || e.ObjectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
"Failed to process an update event for internalServiceExport object")
return
}

Check warning on line 862 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L859-L862

Added lines #L859 - L862 were not covered by tests
oldInternalServiceExport, ok := e.ObjectOld.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a internalServiceExport object", e.ObjectOld)),
"Failed to process an update event for internalServiceExport object")
return
}

Check warning on line 868 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L865-L868

Added lines #L865 - L868 were not covered by tests
newInternalServiceExport, ok := e.ObjectNew.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a internalServiceExport object", e.ObjectNew)),
"Failed to process an update event for internalServiceExport object")
return
}

Check warning on line 874 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L871-L874

Added lines #L871 - L874 were not covered by tests
if !shouldHandleInternalServiceExportUpdateEvent(oldInternalServiceExport, newInternalServiceExport) {
klog.V(2).InfoS("Skipping requeueing internalServiceExport update event", "internalServiceExport", klog.KObj(e.ObjectNew))
return
}

Check warning on line 878 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L876-L878

Added lines #L876 - L878 were not covered by tests
r.handleInternalServiceExportEvent(ctx, e.ObjectNew, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Received internalServiceExport generic event", "internalServiceExport", klog.KObj(e.Object))
r.handleInternalServiceExportEvent(ctx, e.Object, q)
},

Check warning on line 884 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L881-L884

Added lines #L881 - L884 were not covered by tests
},
).
Complete(r)
}

func (r *Reconciler) trafficManagerProfileEventHandler() handler.MapFunc {
return func(ctx context.Context, object client.Object) []reconcile.Request {
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
fieldMatcher := client.MatchingFields{
trafficManagerBackendProfileFieldKey: object.GetName(),
}
// For now, we only support the backend and profile in the same namespace.
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
klog.ErrorS(err,
"Failed to list trafficManagerBackends for the profile",
"trafficManagerProfile", klog.KObj(object))
return []reconcile.Request{}
}
func shouldHandleTrafficManagerProfileUpdateEvent(old, new *fleetnetv1beta1.TrafficManagerProfile) bool {
oldCondition := meta.FindStatusCondition(old.Status.Conditions, string(fleetnetv1beta1.TrafficManagerProfileConditionProgrammed))
newCondition := meta.FindStatusCondition(new.Status.Conditions, string(fleetnetv1beta1.TrafficManagerProfileConditionProgrammed))
return !condition.EqualConditionIgnoreReason(oldCondition, newCondition)
}

res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
for _, backend := range trafficManagerBackendList.Items {
res = append(res, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backend.Namespace,
Name: backend.Name,
},
})
}
return res
}
func shouldHandleServiceImportUpateEvent(old, new *fleetnetv1alpha1.ServiceImport) bool {
return !equality.Semantic.DeepEqual(old.Status.Clusters, new.Status.Clusters)
}

func (r *Reconciler) serviceImportEventHandler() handler.MapFunc {
return func(ctx context.Context, object client.Object) []reconcile.Request {
return r.enqueueTrafficManagerBackendByServiceImport(ctx, object)
func shouldHandleInternalServiceExportUpdateEvent(old, new *fleetnetv1alpha1.InternalServiceExport) bool {
// Most of the referenced service fields are immutable, so we only check the fields that can be changed.
return old.Spec.Type != new.Spec.Type ||
old.Spec.IsDNSLabelConfigured != new.Spec.IsDNSLabelConfigured ||
old.Spec.IsInternalLoadBalancer != new.Spec.IsInternalLoadBalancer ||
!equality.Semantic.DeepEqual(old.Spec.PublicIPResourceID, new.Spec.PublicIPResourceID) ||
!equality.Semantic.DeepEqual(old.Spec.Weight, new.Spec.Weight)
}

func (r *Reconciler) handleTrafficManagerProfileEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
fieldMatcher := client.MatchingFields{
trafficManagerBackendProfileFieldKey: object.GetName(),
}
// For now, we only support the backend and profile in the same namespace.
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
klog.ErrorS(err,
"Failed to list trafficManagerBackends for the profile",
"trafficManagerProfile", klog.KObj(object))
return
}

Check warning on line 920 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L916-L920

Added lines #L916 - L920 were not covered by tests

for _, backend := range trafficManagerBackendList.Items {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backend.Namespace,
Name: backend.Name,
},
})
}
}

func (r *Reconciler) enqueueTrafficManagerBackendByServiceImport(ctx context.Context, object client.Object) []reconcile.Request {
func (r *Reconciler) handleServiceImportEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
fieldMatcher := client.MatchingFields{
trafficManagerBackendBackendFieldKey: object.GetName(),
Expand All @@ -829,49 +939,44 @@
klog.ErrorS(err,
"Failed to list trafficManagerBackends for the serviceImport",
"serviceImport", klog.KObj(object))
return []reconcile.Request{}
return

Check warning on line 942 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L942

Added line #L942 was not covered by tests
}

res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
for _, backend := range trafficManagerBackendList.Items {
res = append(res, reconcile.Request{
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backend.Namespace,
Name: backend.Name,
},
})
}
return res
}

func (r *Reconciler) internalServiceExportEventHandler() handler.MapFunc {
return func(ctx context.Context, object client.Object) []reconcile.Request {
internalServiceExport, ok := object.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
return []reconcile.Request{}
}
func (r *Reconciler) handleInternalServiceExportEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
internalServiceExport, ok := object.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
return
}

Check warning on line 959 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L958-L959

Added lines #L958 - L959 were not covered by tests

serviceImport := &fleetnetv1alpha1.ServiceImport{}
serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name}
serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name)
if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil {
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(internalServiceExport))
return []reconcile.Request{}
}
for _, cs := range serviceImport.Status.Clusters {
// When the cluster exposes the service, first we will check whether the cluster can be exposed or not.
// For example, whether the service spec conflicts with other existing services.
// If the cluster is not in the serviceImport status, there are two possibilities:
// * the controller is still in the processing of this cluster.
// * the cluster cannot be exposed because of the conflicted spec, which will be clearly indicated in the
// serviceExport status.
// For the first case, when the processing is finished, serviceImport will be updated so that this controller
// will be triggered again.
if cs.Cluster == internalServiceExport.Spec.ServiceReference.ClusterID {
return r.enqueueTrafficManagerBackendByServiceImport(ctx, serviceImport)
}
serviceImport := &fleetnetv1alpha1.ServiceImport{}
serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name}
serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name)
if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil {
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(internalServiceExport))
return
}

Check warning on line 967 in pkg/controllers/hub/trafficmanagerbackend/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/hub/trafficmanagerbackend/controller.go#L965-L967

Added lines #L965 - L967 were not covered by tests
for _, cs := range serviceImport.Status.Clusters {
// When the cluster exposes the service, first we will check whether the cluster can be exposed or not.
// For example, whether the service spec conflicts with other existing services.
// If the cluster is not in the serviceImport status, there are two possibilities:
// * the controller is still in the processing of this cluster.
// * the cluster cannot be exposed because of the conflicted spec, which will be clearly indicated in the
// serviceExport status.
// For the first case, when the processing is finished, serviceImport will be updated so that this controller
// will be triggered again.
if cs.Cluster == internalServiceExport.Spec.ServiceReference.ClusterID {
r.handleServiceImportEvent(ctx, serviceImport, q)
}
return []reconcile.Request{}
}
}

Expand Down
Loading
Loading