Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions examples/spark-pi-pod-templates.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
# volumes:
# - name: "test-volume"
# hostPath:
# path: "/tmp"
# type: Directory
driver:
templateContainerName: spark-driver
template:
metadata:
labels:
version: 3.1.1
spec:
containers:
- name: spark-driver
resources:
limits:
cpu: "1200m"
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
serviceAccountName: spark
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory

cores: 1 # ???
#coreLimit: "1200m"
memory: "512m"
# labels:
# version: 3.1.1
# serviceAccount: spark
# volumeMounts:
# - name: "test-volume"
# mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
# labels:
# version: 3.1.1
# volumeMounts:
# - name: "test-volume"
# mountPath: "/tmp"
templateContainerName: spark-executor
template:
metadata:
labels:
version: 3.1.1
spec:
containers:
- name: spark-executor
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
6 changes: 6 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,12 @@ type SparkPodSpec struct {
// ShareProcessNamespace settings for the pod, following the Kubernetes specifications.
// +optional
ShareProcessNamespace *bool `json:"shareProcessNamespace,omitempty"`
// Pod template used
// +optional
Template *apiv1.PodTemplateSpec `json:"template,omitempty"`
// The container in the pod tempate that should be used as a basis for the driver or executor.
// +optional
TemplateContainerName *string `json:"templateContainerName,omitempty"`
}

// DriverSpec is specification of the driver.
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ const (
// SparkDynamicAllocationMaxExecutors is the Spark configuration key for specifying the
// upper bound of the number of executors to request if dynamic allocation is enabled.
SparkDynamicAllocationMaxExecutors = "spark.dynamicAllocation.maxExecutors"
// SparkDriverPodTemplateFile is the Spark configuration key for specifying the pod template
// file to be used for the driver pod.
SparkDriverPodTemplateFile = "spark.kubernetes.driver.podTemplateFile"
// SparkExecutorPodTemplateFile is the Spark configuration key for specifying the pod template
// file to be used for the executor pod.
SparkExecutorPodTemplateFile = "spark.kubernetes.executor.podTemplateFile"
// SparkDriverPodTemplateContainerName is the Spark configuration for specifying which container should be used as a basis for
// the driver. If not specified, or if the container name is not valid, Spark will assume that the first container
// in the list will be the driver container.
SparkDriverPodTemplateContainerName = "spark.kubernetes.driver.podTemplateContainerName"
// SparkExecutorPodTemplateContainerName is the Spark configuration for specifying which container should be used as a basis for
// the executor. If not specified, or if the container name is not valid, Spark will assume that the first container
// in the list will be the executor container.
SparkExecutorPodTemplateContainerName = "spark.kubernetes.executor.podTemplateContainerName"
)

const (
Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,17 @@ func isNextRetryDue(retryInterval *int64, attemptsDone int32, lastEventTime meta
return false
}

func newFailedSubmissionAttemptForError(app *v1beta2.SparkApplication, err error) v1beta2.SparkApplicationStatus {
return v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
}

// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication {
if app.PrometheusMonitoringEnabled() {
Expand All @@ -654,29 +665,38 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be

driverPodName := getDriverPodName(app)
submissionID := uuid.New().String()
submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID)
if err != nil {
app.Status = v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),

var driverPodTemplateFile *string
if app.Spec.Driver.Template != nil {
var err error
driverPodTemplateFile, err = createPodTemplateFile(app.Spec.Driver.Template, "driver", submissionID)
if err != nil {
app.Status = newFailedSubmissionAttemptForError(app, err)
return app
}
defer deletePodTemplateFile(*driverPodTemplateFile)
}

var executorPodTemplateFile *string
if app.Spec.Executor.Template != nil {
var err error
executorPodTemplateFile, err = createPodTemplateFile(app.Spec.Executor.Template, "executor", submissionID)
if err != nil {
app.Status = newFailedSubmissionAttemptForError(app, err)
return app
}
defer deletePodTemplateFile(*executorPodTemplateFile)
}

submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID, driverPodTemplateFile, executorPodTemplateFile)
if err != nil {
app.Status = newFailedSubmissionAttemptForError(app, err)
return app
}
// Try submitting the application by running spark-submit.
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, app))
if err != nil {
app.Status = v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
app.Status = newFailedSubmissionAttemptForError(app, err)
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
Expand Down
54 changes: 53 additions & 1 deletion pkg/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package sparkapplication

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -63,6 +65,7 @@ func runSparkSubmit(submission *submission) (bool, error) {

cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
cmd.Stderr = os.Stderr
output, err := cmd.Output()
glog.V(3).Infof("spark-submit output: %s", string(output))
if err != nil {
Expand All @@ -84,7 +87,7 @@ func runSparkSubmit(submission *submission) (bool, error) {
return true, nil
}

func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName string, submissionID string) ([]string, error) {
func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName string, submissionID string, driverPodTemplate *string, executorPodTemplate *string) ([]string, error) {
var args []string
if app.Spec.MainClass != nil {
args = append(args, "--class", *app.Spec.MainClass)
Expand Down Expand Up @@ -185,6 +188,23 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
}
}

if driverPodTemplate != nil {
args = append(args, "--conf", fmt.Sprintf("%s=%s", config.SparkDriverPodTemplateFile, *driverPodTemplate))
}
if executorPodTemplate != nil {
args = append(args, "--conf", fmt.Sprintf("%s=%s", config.SparkExecutorPodTemplateFile, *executorPodTemplate))
}

if app.Spec.Driver.TemplateContainerName != nil {
conf := fmt.Sprintf("%s=%s", config.SparkDriverPodTemplateContainerName, *app.Spec.Driver.TemplateContainerName)
args = append(args, "--conf", conf)
}

if app.Spec.Executor.TemplateContainerName != nil {
conf := fmt.Sprintf("%s=%s", config.SparkExecutorPodTemplateContainerName, *app.Spec.Executor.TemplateContainerName)
args = append(args, "--conf", conf)
}

if app.Spec.MainApplicationFile != nil {
// Add the main application file if it is present.
args = append(args, *app.Spec.MainApplicationFile)
Expand All @@ -198,6 +218,38 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
return args, nil
}

func createPodTemplateFile(template *v1.PodTemplateSpec, role string, submissionID string) (*string, error) {
bytes, err := json.Marshal(template)
if err != nil {
return nil, err
}
tmpFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-podtemplate-%s-*.json", submissionID, role))
if err != nil {
return nil, err
}
_, err = tmpFile.Write(bytes)
if err != nil {
deletePodTemplateFile(tmpFile.Name())
return nil, err
}
err = tmpFile.Close()

if err != nil {
deletePodTemplateFile(tmpFile.Name())
return nil, err
}
name := tmpFile.Name()
return &name, nil
}

func deletePodTemplateFile(fileName string) error {
err := os.Remove(fileName)
if os.IsNotExist(err) {
return nil
}
return err
}

func getMasterURL() (string, error) {
kubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
if kubernetesServiceHost == "" {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/sparkapplication/submission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ func TestProxyUserArg(t *testing.T) {

submissionID := uuid.New().String()
driverPodName := getDriverPodName(app)
args, err := buildSubmissionCommandArgs(app, driverPodName, submissionID)
driverPodTemplate := "driver.json"
executorPodTemplate := "executor.json"
args, err := buildSubmissionCommandArgs(app, driverPodName, submissionID, &driverPodTemplate, &executorPodTemplate)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading