@@ -28,6 +28,7 @@ import (
28
28
"k8s.io/apimachinery/pkg/api/errors"
29
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
30
"k8s.io/apimachinery/pkg/util/intstr"
31
+ "sigs.k8s.io/controller-runtime/pkg/log"
31
32
32
33
"github.com/kubeflow/spark-operator/v2/api/v1beta2"
33
34
"github.com/kubeflow/spark-operator/v2/pkg/common"
@@ -83,18 +84,19 @@ func getDriverIngressURL(ingressURLFormat string, app *v1beta2.SparkApplication)
83
84
return parsedURL , nil
84
85
}
85
86
86
- func (r * Reconciler ) createDriverIngress (app * v1beta2.SparkApplication , driverIngressConfiguration * v1beta2.DriverIngressConfiguration , service SparkService , ingressURL * url.URL , ingressClassName string ) (* SparkIngress , error ) {
87
+ func (r * Reconciler ) createDriverIngress (ctx context. Context , app * v1beta2.SparkApplication , driverIngressConfiguration * v1beta2.DriverIngressConfiguration , service SparkService , ingressURL * url.URL , ingressClassName string ) (* SparkIngress , error ) {
87
88
if driverIngressConfiguration .ServicePort == nil {
88
89
return nil , fmt .Errorf ("cannot create Driver Ingress for application %s/%s due to empty ServicePort on driverIngressConfiguration" , app .Namespace , app .Name )
89
90
}
90
91
ingressName := fmt .Sprintf ("%s-ing-%d" , app .Name , * driverIngressConfiguration .ServicePort )
91
92
if util .IngressCapabilities .Has ("networking.k8s.io/v1" ) {
92
- return r .createDriverIngressV1 (app , service , ingressName , ingressURL , ingressClassName , []networkingv1.IngressTLS {}, map [string ]string {})
93
+ return r .createDriverIngressV1 (ctx , app , service , ingressName , ingressURL , ingressClassName , []networkingv1.IngressTLS {}, map [string ]string {})
93
94
}
94
- return r .createDriverIngressLegacy (app , service , ingressName , ingressURL )
95
+ return r .createDriverIngressLegacy (ctx , app , service , ingressName , ingressURL )
95
96
}
96
97
97
- func (r * Reconciler ) createDriverIngressV1 (app * v1beta2.SparkApplication , service SparkService , ingressName string , ingressURL * url.URL , ingressClassName string , defaultIngressTLS []networkingv1.IngressTLS , defaultIngressAnnotations map [string ]string ) (* SparkIngress , error ) {
98
+ func (r * Reconciler ) createDriverIngressV1 (ctx context.Context , app * v1beta2.SparkApplication , service SparkService , ingressName string , ingressURL * url.URL , ingressClassName string , defaultIngressTLS []networkingv1.IngressTLS , defaultIngressAnnotations map [string ]string ) (* SparkIngress , error ) {
99
+ logger := log .FromContext (ctx )
98
100
ingressResourceAnnotations := util .GetWebUIIngressAnnotations (app )
99
101
if len (ingressResourceAnnotations ) == 0 && len (defaultIngressAnnotations ) != 0 {
100
102
ingressResourceAnnotations = defaultIngressAnnotations
@@ -160,17 +162,17 @@ func (r *Reconciler) createDriverIngressV1(app *v1beta2.SparkApplication, servic
160
162
ingress .Spec .IngressClassName = & ingressClassName
161
163
}
162
164
163
- if err := r .client .Create (context . TODO () , ingress ); err != nil {
165
+ if err := r .client .Create (ctx , ingress ); err != nil {
164
166
if ! errors .IsAlreadyExists (err ) {
165
167
return nil , fmt .Errorf ("failed to create ingress %s/%s: %v" , ingress .Namespace , ingress .Name , err )
166
168
}
167
169
168
- if err := r .client .Update (context . TODO () , ingress ); err != nil {
170
+ if err := r .client .Update (ctx , ingress ); err != nil {
169
171
return nil , fmt .Errorf ("failed to update ingress %s/%s: %v" , ingress .Namespace , ingress .Name , err )
170
172
}
171
- logger .Info ("Updated networking.v1/Ingress for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , " ingressName" , ingress .Name )
173
+ logger .Info ("Updated networking.v1/Ingress for SparkApplication" , "ingressName" , ingress .Name )
172
174
} else {
173
- logger .Info ("Created networking.v1/Ingress for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , " ingressName" , ingress .Name )
175
+ logger .Info ("Created networking.v1/Ingress for SparkApplication" , "ingressName" , ingress .Name )
174
176
}
175
177
176
178
return & SparkIngress {
@@ -182,7 +184,8 @@ func (r *Reconciler) createDriverIngressV1(app *v1beta2.SparkApplication, servic
182
184
}, nil
183
185
}
184
186
185
- func (r * Reconciler ) createDriverIngressLegacy (app * v1beta2.SparkApplication , service SparkService , ingressName string , ingressURL * url.URL ) (* SparkIngress , error ) {
187
+ func (r * Reconciler ) createDriverIngressLegacy (ctx context.Context , app * v1beta2.SparkApplication , service SparkService , ingressName string , ingressURL * url.URL ) (* SparkIngress , error ) {
188
+ logger := log .FromContext (ctx )
186
189
ingressResourceAnnotations := util .GetWebUIIngressAnnotations (app )
187
190
// var ingressTLSHosts networkingv1.IngressTLS[]
188
191
// That we convert later for extensionsv1beta1, but return as is in SparkIngress.
@@ -236,17 +239,17 @@ func (r *Reconciler) createDriverIngressLegacy(app *v1beta2.SparkApplication, se
236
239
if len (ingressTLSHosts ) != 0 {
237
240
ingress .Spec .TLS = convertIngressTLSHostsToLegacy (ingressTLSHosts )
238
241
}
239
- if err := r .client .Create (context . TODO () , ingress ); err != nil {
242
+ if err := r .client .Create (ctx , ingress ); err != nil {
240
243
if ! errors .IsAlreadyExists (err ) {
241
244
return nil , fmt .Errorf ("failed to create ingress %s/%s: %v" , ingress .Namespace , ingress .Name , err )
242
245
}
243
246
244
- if err := r .client .Update (context . TODO () , ingress ); err != nil {
247
+ if err := r .client .Update (ctx , ingress ); err != nil {
245
248
return nil , fmt .Errorf ("failed to update ingress %s/%s: %v" , ingress .Namespace , ingress .Name , err )
246
249
}
247
- logger .Info ("Updated extensions.v1beta1/Ingress for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , " ingressName" , ingress .Name )
250
+ logger .Info ("Updated extensions.v1beta1/Ingress for SparkApplication" , "ingressName" , ingress .Name )
248
251
} else {
249
- logger .Info ("Created extensions.v1beta1/Ingress for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , " ingressName" , ingress .Name )
252
+ logger .Info ("Created extensions.v1beta1/Ingress for SparkApplication" , "ingressName" , ingress .Name )
250
253
}
251
254
252
255
return & SparkIngress {
@@ -269,6 +272,7 @@ func convertIngressTLSHostsToLegacy(ingressTLSHosts []networkingv1.IngressTLS) [
269
272
}
270
273
271
274
func (r * Reconciler ) createDriverIngressService (
275
+ ctx context.Context ,
272
276
app * v1beta2.SparkApplication ,
273
277
portName string ,
274
278
port int32 ,
@@ -278,6 +282,7 @@ func (r *Reconciler) createDriverIngressService(
278
282
serviceAnnotations map [string ]string ,
279
283
serviceLabels map [string ]string ,
280
284
) (* SparkService , error ) {
285
+ logger := log .FromContext (ctx )
281
286
service := & corev1.Service {
282
287
ObjectMeta : metav1.ObjectMeta {
283
288
Name : serviceName ,
@@ -312,18 +317,18 @@ func (r *Reconciler) createDriverIngressService(
312
317
service .ObjectMeta .Annotations = serviceAnnotations
313
318
}
314
319
315
- if err := r .client .Create (context . TODO () , service ); err != nil {
320
+ if err := r .client .Create (ctx , service ); err != nil {
316
321
if ! errors .IsAlreadyExists (err ) {
317
322
return nil , err
318
323
}
319
324
320
325
// Update the service if it already exists.
321
- if err := r .client .Update (context . TODO () , service ); err != nil {
326
+ if err := r .client .Update (ctx , service ); err != nil {
322
327
return nil , err
323
328
}
324
- logger .Info ("Updated service for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , "serviceName" , service .Name )
329
+ logger .Info ("Updated service for SparkApplication" , "name" , service .Name )
325
330
} else {
326
- logger .Info ("Created service for SparkApplication" , "name" , app . Name , "namespace" , app . Namespace , "serviceName" , service .Name )
331
+ logger .Info ("Created service for SparkApplication" , "name" , service .Name )
327
332
}
328
333
329
334
return & SparkService {
@@ -390,6 +395,7 @@ func getDriverIngressServiceLabels(driverIngressConfiguration *v1beta2.DriverIng
390
395
}
391
396
392
397
func (r * Reconciler ) createDriverIngressServiceFromConfiguration (
398
+ ctx context.Context ,
393
399
app * v1beta2.SparkApplication ,
394
400
driverIngressConfiguration * v1beta2.DriverIngressConfiguration ,
395
401
) (* SparkService , error ) {
@@ -402,5 +408,5 @@ func (r *Reconciler) createDriverIngressServiceFromConfiguration(
402
408
serviceType := getDriverIngressServiceType (driverIngressConfiguration )
403
409
serviceAnnotations := getDriverIngressServiceAnnotations (driverIngressConfiguration )
404
410
serviceLabels := getDriverIngressServiceLabels (driverIngressConfiguration )
405
- return r .createDriverIngressService (app , portName , port , port , serviceName , serviceType , serviceAnnotations , serviceLabels )
411
+ return r .createDriverIngressService (ctx , app , portName , port , port , serviceName , serviceType , serviceAnnotations , serviceLabels )
406
412
}
0 commit comments