Skip to content

Commit 600dffe

Browse files
committed
fix #1706
1 parent 69813fb commit 600dffe

File tree

12 files changed

+38
-30
lines changed

12 files changed

+38
-30
lines changed

cmd/training-operator.v1/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func main() {
6060
var gangSchedulerName string
6161
var namespace string
6262
var monitoringPort int
63+
var controllerThreads int
6364
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
6465
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
6566
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
@@ -74,6 +75,7 @@ func main() {
7475
"If set, it only monitors kubeflow jobs in the given namespace.")
7576
flag.IntVar(&monitoringPort, "monitoring-port", 9443, "Endpoint port for displaying monitoring metrics. "+
7677
"It can be set to \"0\" to disable the metrics serving.")
78+
flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.")
7779

7880
// PyTorch related flags
7981
flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image",
@@ -120,7 +122,7 @@ func main() {
120122
"scheme not supported", "scheme", s)
121123
os.Exit(1)
122124
}
123-
if err = setupFunc(mgr, enableGangScheduling); err != nil {
125+
if err = setupFunc(mgr, enableGangScheduling, controllerThreads); err != nil {
124126
setupLog.Error(err, "unable to create controller", "controller", s)
125127
os.Exit(1)
126128
}

pkg/controller.v1/mpi/mpijob_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,10 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
176176
}
177177

178178
// SetupWithManager sets up the controller with the Manager.
179-
func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
179+
func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
180180
c, err := controller.New(jc.ControllerName(), mgr, controller.Options{
181-
Reconciler: jc,
181+
Reconciler: jc,
182+
MaxConcurrentReconciles: controllerThreads,
182183
})
183184

184185
if err != nil {

pkg/controller.v1/mpi/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ var _ = BeforeSuite(func() {
8787
Expect(err).NotTo(HaveOccurred())
8888

8989
reconciler = NewReconciler(mgr, false)
90-
Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred())
90+
Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred())
9191

9292
go func() {
9393
defer GinkgoRecover()

pkg/controller.v1/mxnet/mxjob_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
177177
}
178178

179179
// SetupWithManager sets up the controller with the Manager.
180-
func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
180+
func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
181181
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
182-
Reconciler: r,
182+
Reconciler: r,
183+
MaxConcurrentReconciles: controllerThreads,
183184
})
184185

185186
if err != nil {

pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,10 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
172172
}
173173

174174
// SetupWithManager sets up the controller with the Manager.
175-
func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
175+
func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
176176
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
177-
Reconciler: r,
177+
Reconciler: r,
178+
MaxConcurrentReconciles: controllerThreads,
178179
})
179180

180181
if err != nil {

pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ var _ = BeforeSuite(func() {
8383

8484
r := NewReconciler(mgr, false)
8585

86-
Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred())
86+
Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred())
8787

8888
go func() {
8989
defer GinkgoRecover()

pkg/controller.v1/pytorch/pytorchjob_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,10 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
176176
}
177177

178178
// SetupWithManager sets up the controller with the Manager.
179-
func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
179+
func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
180180
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
181-
Reconciler: r,
181+
Reconciler: r,
182+
MaxConcurrentReconciles: controllerThreads,
182183
})
183184

184185
if err != nil {

pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ var _ = BeforeSuite(func() {
8888

8989
r := NewReconciler(mgr, false)
9090

91-
Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred())
91+
Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred())
9292

9393
go func() {
9494
defer GinkgoRecover()

pkg/controller.v1/register_controller.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,26 @@ import (
3131

3232
const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet"
3333

34-
type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool) error
34+
type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool, controllerThreads int) error
3535

3636
var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{
37-
kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
38-
return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
37+
kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
38+
return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
3939
},
40-
kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
41-
return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
40+
kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
41+
return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
4242
},
43-
kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
44-
return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
43+
kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
44+
return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
4545
},
46-
kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
47-
return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
46+
kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
47+
return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
4848
},
49-
kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
50-
return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
49+
kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
50+
return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
5151
},
52-
kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
53-
return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
52+
kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
53+
return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
5454
},
5555
}
5656

pkg/controller.v1/tensorflow/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ var _ = BeforeSuite(func() {
8989
Expect(err).NotTo(HaveOccurred())
9090

9191
reconciler = NewReconciler(mgr, false)
92-
Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred())
92+
Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred())
9393

9494
go func() {
9595
defer GinkgoRecover()

0 commit comments

Comments
 (0)