Skip to content

Commit 667c5c5

Browse files
authored
feat: reduce tmb reconcile (#320)
1 parent b6c8364 commit 667c5c5

File tree

3 files changed

+705
-192
lines changed

3 files changed

+705
-192
lines changed

pkg/controllers/hub/trafficmanagerbackend/controller.go

Lines changed: 167 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
"golang.org/x/sync/errgroup"
2222
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/api/equality"
2324
apierrors "k8s.io/apimachinery/pkg/api/errors"
2425
"k8s.io/apimachinery/pkg/api/meta"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/apimachinery/pkg/types"
2728
"k8s.io/client-go/tools/record"
29+
"k8s.io/client-go/util/workqueue"
2830
"k8s.io/klog/v2"
2931
"k8s.io/utils/ptr"
3032
ctrl "sigs.k8s.io/controller-runtime"
3133
"sigs.k8s.io/controller-runtime/pkg/builder"
3234
"sigs.k8s.io/controller-runtime/pkg/client"
3335
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
36+
"sigs.k8s.io/controller-runtime/pkg/event"
3437
"sigs.k8s.io/controller-runtime/pkg/handler"
3538
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
3639
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -379,7 +382,7 @@ func (r *Reconciler) validateTrafficManagerProfile(ctx context.Context, backend
379382
if condition.IsConditionStatusTrue(programmedCondition, profile.GetGeneration()) {
380383
return profile, nil // return directly if the trafficManagerProfile is programmed
381384
} else if condition.IsConditionStatusFalse(programmedCondition, profile.GetGeneration()) {
382-
setFalseCondition(backend, nil, fmt.Sprintf("Invalid trafficManagerProfile %q: %v", backend.Spec.Profile.Name, programmedCondition.Message))
385+
setFalseCondition(backend, nil, fmt.Sprintf("Invalid trafficManagerProfile %q, please check the trafficManagerProfile status", backend.Spec.Profile.Name))
383386
} else {
384387
setUnknownCondition(backend, fmt.Sprintf("In the processing of trafficManagerProfile %q", backend.Spec.Profile.Name))
385388
}
@@ -773,53 +776,160 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, dis
773776
For(&fleetnetv1beta1.TrafficManagerBackend{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
774777
Watches(
775778
&fleetnetv1beta1.TrafficManagerProfile{},
776-
handler.EnqueueRequestsFromMapFunc(r.trafficManagerProfileEventHandler()),
779+
handler.Funcs{
780+
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
781+
klog.V(2).InfoS("Received trafficManagerProfile update event", "trafficManagerProfile", klog.KObj(e.ObjectNew))
782+
if e.ObjectOld == nil || e.ObjectNew == nil {
783+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
784+
"Failed to process an update event for trafficManagerProfile object")
785+
return
786+
}
787+
oldProfile, ok := e.ObjectOld.(*fleetnetv1beta1.TrafficManagerProfile)
788+
if !ok {
789+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a trafficManagerProfile object", e.ObjectOld)),
790+
"Failed to process an update event for trafficManagerProfile object")
791+
return
792+
}
793+
newProfile, ok := e.ObjectNew.(*fleetnetv1beta1.TrafficManagerProfile)
794+
if !ok {
795+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a trafficManagerProfile object", e.ObjectNew)),
796+
"Failed to process an update event for trafficManagerProfile object")
797+
return
798+
}
799+
if !shouldHandleTrafficManagerProfileUpdateEvent(oldProfile, newProfile) {
800+
klog.V(2).InfoS("Skipping requeueing trafficManagerProfile update event", "trafficManagerProfile", klog.KObj(e.ObjectNew))
801+
return // no need to requeue if the clusters haven't changed
802+
}
803+
r.handleTrafficManagerProfileEvent(ctx, e.ObjectNew, q)
804+
},
805+
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
806+
klog.V(2).InfoS("Received trafficManagerProfile delete event", "trafficManagerProfile", klog.KObj(e.Object))
807+
r.handleTrafficManagerProfileEvent(ctx, e.Object, q)
808+
},
809+
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
810+
klog.V(2).InfoS("Received trafficManagerProfile generic event", "trafficManagerProfile", klog.KObj(e.Object))
811+
r.handleTrafficManagerProfileEvent(ctx, e.Object, q)
812+
},
813+
},
777814
).
778815
Watches(
779816
&fleetnetv1alpha1.ServiceImport{},
780-
handler.EnqueueRequestsFromMapFunc(r.serviceImportEventHandler()),
817+
handler.Funcs{
818+
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
819+
klog.V(2).InfoS("Received serviceImport update event", "serviceImport", klog.KObj(e.ObjectNew))
820+
if e.ObjectOld == nil || e.ObjectNew == nil {
821+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
822+
"Failed to process an update event for serviceImport object")
823+
return
824+
}
825+
oldServiceImport, ok := e.ObjectOld.(*fleetnetv1alpha1.ServiceImport)
826+
if !ok {
827+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a serviceImport object", e.ObjectOld)),
828+
"Failed to process an update event for serviceImport object")
829+
return
830+
}
831+
newServiceImport, ok := e.ObjectNew.(*fleetnetv1alpha1.ServiceImport)
832+
if !ok {
833+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a serviceImport object", e.ObjectNew)),
834+
"Failed to process an update event for serviceImport object")
835+
return
836+
}
837+
if !shouldHandleServiceImportUpateEvent(oldServiceImport, newServiceImport) {
838+
klog.V(2).InfoS("Skipping requeueing serviceImport update event", "serviceImport", klog.KObj(e.ObjectNew))
839+
return // no need to requeue if the clusters haven't changed
840+
}
841+
r.handleServiceImportEvent(ctx, e.ObjectNew, q)
842+
},
843+
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
844+
klog.V(2).InfoS("Received serviceImport delete event", "serviceImport", klog.KObj(e.Object))
845+
r.handleServiceImportEvent(ctx, e.Object, q)
846+
},
847+
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
848+
klog.V(2).InfoS("Received serviceImport generic event", "serviceImport", klog.KObj(e.Object))
849+
r.handleServiceImportEvent(ctx, e.Object, q)
850+
},
851+
},
781852
).
782853
Watches(
783854
&fleetnetv1alpha1.InternalServiceExport{},
784-
handler.EnqueueRequestsFromMapFunc(r.internalServiceExportEventHandler()),
855+
handler.Funcs{
856+
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
857+
klog.V(2).InfoS("Received internalServiceExport update event", "internalServiceExport", klog.KObj(e.ObjectNew))
858+
if e.ObjectOld == nil || e.ObjectNew == nil {
859+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", e)),
860+
"Failed to process an update event for internalServiceExport object")
861+
return
862+
}
863+
oldInternalServiceExport, ok := e.ObjectOld.(*fleetnetv1alpha1.InternalServiceExport)
864+
if !ok {
865+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a internalServiceExport object", e.ObjectOld)),
866+
"Failed to process an update event for internalServiceExport object")
867+
return
868+
}
869+
newInternalServiceExport, ok := e.ObjectNew.(*fleetnetv1alpha1.InternalServiceExport)
870+
if !ok {
871+
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a internalServiceExport object", e.ObjectNew)),
872+
"Failed to process an update event for internalServiceExport object")
873+
return
874+
}
875+
if !shouldHandleInternalServiceExportUpdateEvent(oldInternalServiceExport, newInternalServiceExport) {
876+
klog.V(2).InfoS("Skipping requeueing internalServiceExport update event", "internalServiceExport", klog.KObj(e.ObjectNew))
877+
return
878+
}
879+
r.handleInternalServiceExportEvent(ctx, e.ObjectNew, q)
880+
},
881+
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
882+
klog.V(2).InfoS("Received internalServiceExport generic event", "internalServiceExport", klog.KObj(e.Object))
883+
r.handleInternalServiceExportEvent(ctx, e.Object, q)
884+
},
885+
},
785886
).
786887
Complete(r)
787888
}
788889

789-
func (r *Reconciler) trafficManagerProfileEventHandler() handler.MapFunc {
790-
return func(ctx context.Context, object client.Object) []reconcile.Request {
791-
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
792-
fieldMatcher := client.MatchingFields{
793-
trafficManagerBackendProfileFieldKey: object.GetName(),
794-
}
795-
// For now, we only support the backend and profile in the same namespace.
796-
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
797-
klog.ErrorS(err,
798-
"Failed to list trafficManagerBackends for the profile",
799-
"trafficManagerProfile", klog.KObj(object))
800-
return []reconcile.Request{}
801-
}
890+
func shouldHandleTrafficManagerProfileUpdateEvent(old, new *fleetnetv1beta1.TrafficManagerProfile) bool {
891+
oldCondition := meta.FindStatusCondition(old.Status.Conditions, string(fleetnetv1beta1.TrafficManagerProfileConditionProgrammed))
892+
newCondition := meta.FindStatusCondition(new.Status.Conditions, string(fleetnetv1beta1.TrafficManagerProfileConditionProgrammed))
893+
return !condition.EqualConditionIgnoreReason(oldCondition, newCondition)
894+
}
802895

803-
res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
804-
for _, backend := range trafficManagerBackendList.Items {
805-
res = append(res, reconcile.Request{
806-
NamespacedName: types.NamespacedName{
807-
Namespace: backend.Namespace,
808-
Name: backend.Name,
809-
},
810-
})
811-
}
812-
return res
813-
}
896+
func shouldHandleServiceImportUpateEvent(old, new *fleetnetv1alpha1.ServiceImport) bool {
897+
return !equality.Semantic.DeepEqual(old.Status.Clusters, new.Status.Clusters)
814898
}
815899

816-
func (r *Reconciler) serviceImportEventHandler() handler.MapFunc {
817-
return func(ctx context.Context, object client.Object) []reconcile.Request {
818-
return r.enqueueTrafficManagerBackendByServiceImport(ctx, object)
900+
func shouldHandleInternalServiceExportUpdateEvent(old, new *fleetnetv1alpha1.InternalServiceExport) bool {
901+
// Most of the referenced service fields are immutable, so we only check the fields that can be changed.
902+
return old.Spec.Type != new.Spec.Type ||
903+
old.Spec.IsDNSLabelConfigured != new.Spec.IsDNSLabelConfigured ||
904+
old.Spec.IsInternalLoadBalancer != new.Spec.IsInternalLoadBalancer ||
905+
!equality.Semantic.DeepEqual(old.Spec.PublicIPResourceID, new.Spec.PublicIPResourceID) ||
906+
!equality.Semantic.DeepEqual(old.Spec.Weight, new.Spec.Weight)
907+
}
908+
909+
func (r *Reconciler) handleTrafficManagerProfileEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
910+
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
911+
fieldMatcher := client.MatchingFields{
912+
trafficManagerBackendProfileFieldKey: object.GetName(),
913+
}
914+
// For now, we only support the backend and profile in the same namespace.
915+
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
916+
klog.ErrorS(err,
917+
"Failed to list trafficManagerBackends for the profile",
918+
"trafficManagerProfile", klog.KObj(object))
919+
return
920+
}
921+
922+
for _, backend := range trafficManagerBackendList.Items {
923+
q.Add(reconcile.Request{
924+
NamespacedName: types.NamespacedName{
925+
Namespace: backend.Namespace,
926+
Name: backend.Name,
927+
},
928+
})
819929
}
820930
}
821931

822-
func (r *Reconciler) enqueueTrafficManagerBackendByServiceImport(ctx context.Context, object client.Object) []reconcile.Request {
932+
func (r *Reconciler) handleServiceImportEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
823933
trafficManagerBackendList := &fleetnetv1beta1.TrafficManagerBackendList{}
824934
fieldMatcher := client.MatchingFields{
825935
trafficManagerBackendBackendFieldKey: object.GetName(),
@@ -829,49 +939,44 @@ func (r *Reconciler) enqueueTrafficManagerBackendByServiceImport(ctx context.Con
829939
klog.ErrorS(err,
830940
"Failed to list trafficManagerBackends for the serviceImport",
831941
"serviceImport", klog.KObj(object))
832-
return []reconcile.Request{}
942+
return
833943
}
834944

835-
res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
836945
for _, backend := range trafficManagerBackendList.Items {
837-
res = append(res, reconcile.Request{
946+
q.Add(reconcile.Request{
838947
NamespacedName: types.NamespacedName{
839948
Namespace: backend.Namespace,
840949
Name: backend.Name,
841950
},
842951
})
843952
}
844-
return res
845953
}
846954

847-
func (r *Reconciler) internalServiceExportEventHandler() handler.MapFunc {
848-
return func(ctx context.Context, object client.Object) []reconcile.Request {
849-
internalServiceExport, ok := object.(*fleetnetv1alpha1.InternalServiceExport)
850-
if !ok {
851-
return []reconcile.Request{}
852-
}
955+
func (r *Reconciler) handleInternalServiceExportEvent(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
956+
internalServiceExport, ok := object.(*fleetnetv1alpha1.InternalServiceExport)
957+
if !ok {
958+
return
959+
}
853960

854-
serviceImport := &fleetnetv1alpha1.ServiceImport{}
855-
serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name}
856-
serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name)
857-
if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil {
858-
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(internalServiceExport))
859-
return []reconcile.Request{}
860-
}
861-
for _, cs := range serviceImport.Status.Clusters {
862-
// When the cluster exposes the service, first we will check whether the cluster can be exposed or not.
863-
// For example, whether the service spec conflicts with other existing services.
864-
// If the cluster is not in the serviceImport status, there are two possibilities:
865-
// * the controller is still in the processing of this cluster.
866-
// * the cluster cannot be exposed because of the conflicted spec, which will be clearly indicated in the
867-
// serviceExport status.
868-
// For the first case, when the processing is finished, serviceImport will be updated so that this controller
869-
// will be triggered again.
870-
if cs.Cluster == internalServiceExport.Spec.ServiceReference.ClusterID {
871-
return r.enqueueTrafficManagerBackendByServiceImport(ctx, serviceImport)
872-
}
961+
serviceImport := &fleetnetv1alpha1.ServiceImport{}
962+
serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name}
963+
serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name)
964+
if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil {
965+
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(internalServiceExport))
966+
return
967+
}
968+
for _, cs := range serviceImport.Status.Clusters {
969+
// When the cluster exposes the service, first we will check whether the cluster can be exposed or not.
970+
// For example, whether the service spec conflicts with other existing services.
971+
// If the cluster is not in the serviceImport status, there are two possibilities:
972+
// * the controller is still in the processing of this cluster.
973+
// * the cluster cannot be exposed because of the conflicted spec, which will be clearly indicated in the
974+
// serviceExport status.
975+
// For the first case, when the processing is finished, serviceImport will be updated so that this controller
976+
// will be triggered again.
977+
if cs.Cluster == internalServiceExport.Spec.ServiceReference.ClusterID {
978+
r.handleServiceImportEvent(ctx, serviceImport, q)
873979
}
874-
return []reconcile.Request{}
875980
}
876981
}
877982

0 commit comments

Comments
 (0)