@@ -20,6 +20,8 @@ import (
2020 "context"
2121 "encoding/json"
2222 "fmt"
23+ cronUtil "github.com/devtron-labs/devtron/util/cron"
24+ "github.com/robfig/cron/v3"
2325 "log"
2426 "net/http"
2527 "net/url"
@@ -43,7 +45,7 @@ import (
4345 "github.com/devtron-labs/devtron/internal/constants"
4446 "github.com/devtron-labs/devtron/internal/util"
4547 "github.com/devtron-labs/devtron/pkg/cluster/repository"
46- util2 "github.com/devtron-labs/devtron/util"
48+ globalUtil "github.com/devtron-labs/devtron/util"
4749 "github.com/go-pg/pg"
4850 "go.uber.org/zap"
4951)
@@ -201,7 +203,9 @@ type ClusterServiceImpl struct {
201203func NewClusterServiceImpl (repository repository.ClusterRepository , logger * zap.SugaredLogger ,
202204 K8sUtil * k8s.K8sServiceImpl , K8sInformerFactory informer.K8sInformerFactory ,
203205 userAuthRepository repository3.UserAuthRepository , userRepository repository3.UserRepository ,
204- roleGroupRepository repository3.RoleGroupRepository ) * ClusterServiceImpl {
206+ roleGroupRepository repository3.RoleGroupRepository ,
207+ envVariables * globalUtil.EnvironmentVariables ,
208+ cronLogger * cronUtil.CronLoggerImpl ) (* ClusterServiceImpl , error ) {
205209 clusterService := & ClusterServiceImpl {
206210 clusterRepository : repository ,
207211 logger : logger ,
@@ -211,8 +215,19 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
211215 userRepository : userRepository ,
212216 roleGroupRepository : roleGroupRepository ,
213217 }
218+ // initialise cron
219+ newCron := cron .New (cron .WithChain (cron .Recover (cronLogger )))
220+ newCron .Start ()
221+ cfg := envVariables .GlobalClusterConfig
222+ // add function into cron
223+ _ , err := newCron .AddFunc (fmt .Sprintf ("@every %dm" , cfg .ClusterStatusCronTime ), clusterService .getAndUpdateClusterConnectionStatus )
224+ if err != nil {
225+ fmt .Println ("error in adding cron function into cluster cron service" )
226+ return clusterService , err
227+ }
228+ logger .Infow ("cluster cron service started successfully!" , "cronTime" , cfg .ClusterStatusCronTime )
214229 go clusterService .buildInformer ()
215- return clusterService
230+ return clusterService , nil
216231}
217232
218233func (impl * ClusterServiceImpl ) ConvertClusterBeanToCluster (clusterBean * ClusterBean , userId int32 ) * repository.Cluster {
@@ -242,6 +257,20 @@ func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *Cluster
242257 return model
243258}
244259
260+ // getAndUpdateClusterConnectionStatus is a cron function to update the connection status of all clusters
261+ func (impl * ClusterServiceImpl ) getAndUpdateClusterConnectionStatus () {
262+ impl .logger .Debug ("starting cluster connection status fetch thread" )
263+ defer impl .logger .Debug ("stopped cluster connection status fetch thread" )
264+
265+ //getting all clusters
266+ clusters , err := impl .FindAllExceptVirtual ()
267+ if err != nil {
268+ impl .logger .Errorw ("error in getting all clusters" , "err" , err )
269+ return
270+ }
271+ impl .ConnectClustersInBatch (clusters , true )
272+ }
273+
245274func (impl * ClusterServiceImpl ) Save (parent context.Context , bean * ClusterBean , userId int32 ) (* ClusterBean , error ) {
246275 //validating config
247276
@@ -289,7 +318,7 @@ func (impl *ClusterServiceImpl) Save(parent context.Context, bean *ClusterBean,
289318
290319 //on successful creation of new cluster, update informer cache for namespace group by cluster
291320 //here sync for ea mode only
292- if util2 .IsBaseStack () {
321+ if globalUtil .IsBaseStack () {
293322 impl .SyncNsInformer (bean )
294323 }
295324 impl .logger .Info ("saving secret for cluster informer" )
@@ -530,7 +559,7 @@ func (impl *ClusterServiceImpl) Update(ctx context.Context, bean *ClusterBean, u
530559 bean .Id = model .Id
531560
532561 //here sync for ea mode only
533- if bean .HasConfigOrUrlChanged && util2 .IsBaseStack () {
562+ if bean .HasConfigOrUrlChanged && globalUtil .IsBaseStack () {
534563 impl .SyncNsInformer (bean )
535564 }
536565 impl .logger .Infow ("saving secret for cluster informer" )
@@ -643,7 +672,7 @@ func (impl *ClusterServiceImpl) buildInformer() {
643672 impl .K8sInformerFactory .BuildInformer (clusterInfo )
644673}
645674
646- func (impl ClusterServiceImpl ) DeleteFromDb (bean * ClusterBean , userId int32 ) error {
675+ func (impl * ClusterServiceImpl ) DeleteFromDb (bean * ClusterBean , userId int32 ) error {
647676 existingCluster , err := impl .clusterRepository .FindById (bean .Id )
648677 if err != nil {
649678 impl .logger .Errorw ("No matching entry found for delete." , "id" , bean .Id )
@@ -668,7 +697,7 @@ func (impl ClusterServiceImpl) DeleteFromDb(bean *ClusterBean, userId int32) err
668697 return nil
669698}
670699
671- func (impl ClusterServiceImpl ) CheckIfConfigIsValid (cluster * ClusterBean ) error {
700+ func (impl * ClusterServiceImpl ) CheckIfConfigIsValid (cluster * ClusterBean ) error {
672701 clusterConfig := cluster .GetClusterConfig ()
673702 response , err := impl .K8sUtil .DiscoveryClientGetLiveZCall (clusterConfig )
674703 if err != nil {
@@ -1068,7 +1097,7 @@ func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sCli
10681097 mutex .Unlock ()
10691098}
10701099
1071- func (impl ClusterServiceImpl ) ConvertClusterBeanObjectToCluster (bean * ClusterBean ) * v1alpha1.Cluster {
1100+ func (impl * ClusterServiceImpl ) ConvertClusterBeanObjectToCluster (bean * ClusterBean ) * v1alpha1.Cluster {
10721101 configMap := bean .Config
10731102 serverUrl := bean .ServerUrl
10741103 bearerToken := ""
@@ -1097,7 +1126,7 @@ func (impl ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBe
10971126 return cl
10981127}
10991128
1100- func (impl ClusterServiceImpl ) GetClusterConfigByClusterId (clusterId int ) (* k8s.ClusterConfig , error ) {
1129+ func (impl * ClusterServiceImpl ) GetClusterConfigByClusterId (clusterId int ) (* k8s.ClusterConfig , error ) {
11011130 clusterBean , err := impl .FindById (clusterId )
11021131 if err != nil {
11031132 impl .logger .Errorw ("error in getting clusterBean by cluster id" , "err" , err , "clusterId" , clusterId )
@@ -1108,7 +1137,7 @@ func (impl ClusterServiceImpl) GetClusterConfigByClusterId(clusterId int) (*k8s.
11081137 return clusterConfig , nil
11091138}
11101139
1111- func (impl ClusterServiceImpl ) IsClusterReachable (clusterId int ) (bool , error ) {
1140+ func (impl * ClusterServiceImpl ) IsClusterReachable (clusterId int ) (bool , error ) {
11121141 cluster , err := impl .clusterRepository .FindById (clusterId )
11131142 if err != nil {
11141143 impl .logger .Errorw ("error in finding cluster from clusterId" , "envId" , clusterId )
0 commit comments