Skip to content

Commit 8d57f11

Browse files
sryzapdeyhim
authored andcommitted
SPARK-1126. spark-app preliminary
This is a starting version of the spark-app script for running compiled binaries against Spark. It still needs tests and some polish. The only testing I've done so far has been using it to launch jobs in yarn-standalone mode against a pseudo-distributed cluster. This leaves out the changes required for launching python scripts. I think it might be best to save those for another JIRA/PR (while keeping to the design so that they won't require backwards-incompatible changes). Author: Sandy Ryza <[email protected]> Closes apache#86 from sryza/sandy-spark-1126 and squashes the following commits: d428d85 [Sandy Ryza] Commenting, doc, and import fixes from Patrick's comments e7315c6 [Sandy Ryza] Fix failing tests 34de899 [Sandy Ryza] Change --more-jars to --jars and fix docs 299ddca [Sandy Ryza] Fix scalastyle a94c627 [Sandy Ryza] Add newline at end of SparkSubmit 04bc4e2 [Sandy Ryza] SPARK-1126. spark-submit script
1 parent 2d71427 commit 8d57f11

File tree

8 files changed

+630
-25
lines changed

8 files changed

+630
-25
lines changed

bin/spark-submit

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
21+
ORIG_ARGS=$@
22+
23+
while (($#)); do
24+
if [ $1 = "--deploy-mode" ]; then
25+
DEPLOY_MODE=$2
26+
elif [ $1 = "--driver-memory" ]; then
27+
DRIVER_MEMORY=$2
28+
fi
29+
30+
shift
31+
done
32+
33+
if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client" ]; then
34+
export SPARK_MEM=$DRIVER_MEMORY
35+
fi
36+
37+
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit $ORIG_ARGS
38+
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.io.File
21+
import java.net.URL
22+
23+
import org.apache.spark.executor.ExecutorURLClassLoader
24+
25+
import scala.collection.mutable.ArrayBuffer
26+
import scala.collection.mutable.HashMap
27+
import scala.collection.mutable.Map
28+
29+
/**
30+
* Scala code behind the spark-submit script. The script handles setting up the classpath with
31+
* relevant Spark dependencies and provides a layer over the different cluster managers and deploy
32+
* modes that Spark supports.
33+
*/
34+
object SparkSubmit {
35+
val YARN = 1
36+
val STANDALONE = 2
37+
val MESOS = 4
38+
val LOCAL = 8
39+
val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
40+
41+
var clusterManager: Int = LOCAL
42+
43+
def main(args: Array[String]) {
44+
val appArgs = new SparkSubmitArguments(args)
45+
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
46+
launch(childArgs, classpath, sysProps, mainClass)
47+
}
48+
49+
/**
50+
* @return
51+
* a tuple containing the arguments for the child, a list of classpath
52+
* entries for the child, and the main class for the child
53+
*/
54+
def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
55+
ArrayBuffer[String], Map[String, String], String) = {
56+
if (appArgs.master.startsWith("yarn")) {
57+
clusterManager = YARN
58+
} else if (appArgs.master.startsWith("spark")) {
59+
clusterManager = STANDALONE
60+
} else if (appArgs.master.startsWith("mesos")) {
61+
clusterManager = MESOS
62+
} else if (appArgs.master.startsWith("local")) {
63+
clusterManager = LOCAL
64+
} else {
65+
System.err.println("master must start with yarn, mesos, spark, or local")
66+
System.exit(1)
67+
}
68+
69+
// Because "yarn-standalone" and "yarn-client" encapsulate both the master
70+
// and deploy mode, we have some logic to infer the master and deploy mode
71+
// from each other if only one is specified, or exit early if they are at odds.
72+
if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
73+
appArgs.deployMode = "cluster"
74+
}
75+
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
76+
System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
77+
System.exit(1)
78+
}
79+
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
80+
System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
81+
System.exit(1)
82+
}
83+
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
84+
appArgs.master = "yarn-standalone"
85+
}
86+
if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
87+
appArgs.master = "yarn-client"
88+
}
89+
90+
val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
91+
92+
val childClasspath = new ArrayBuffer[String]()
93+
val childArgs = new ArrayBuffer[String]()
94+
val sysProps = new HashMap[String, String]()
95+
var childMainClass = ""
96+
97+
if (clusterManager == MESOS && deployOnCluster) {
98+
System.err.println("Mesos does not support running the driver on the cluster")
99+
System.exit(1)
100+
}
101+
102+
if (!deployOnCluster) {
103+
childMainClass = appArgs.mainClass
104+
childClasspath += appArgs.primaryResource
105+
} else if (clusterManager == YARN) {
106+
childMainClass = "org.apache.spark.deploy.yarn.Client"
107+
childArgs += ("--jar", appArgs.primaryResource)
108+
childArgs += ("--class", appArgs.mainClass)
109+
}
110+
111+
val options = List[OptionAssigner](
112+
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
113+
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
114+
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
115+
new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
116+
new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
117+
new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
118+
new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
119+
new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
120+
new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
121+
sysProp = "spark.executor.memory"),
122+
new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
123+
new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
124+
new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
125+
new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
126+
new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
127+
sysProp = "spark.cores.max"),
128+
new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
129+
new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
130+
new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
131+
new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
132+
new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
133+
)
134+
135+
// more jars
136+
if (appArgs.jars != null && !deployOnCluster) {
137+
for (jar <- appArgs.jars.split(",")) {
138+
childClasspath += jar
139+
}
140+
}
141+
142+
for (opt <- options) {
143+
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
144+
(clusterManager & opt.clusterManager) != 0) {
145+
if (opt.clOption != null) {
146+
childArgs += (opt.clOption, opt.value)
147+
} else if (opt.sysProp != null) {
148+
sysProps.put(opt.sysProp, opt.value)
149+
}
150+
}
151+
}
152+
153+
if (deployOnCluster && clusterManager == STANDALONE) {
154+
if (appArgs.supervise) {
155+
childArgs += "--supervise"
156+
}
157+
158+
childMainClass = "org.apache.spark.deploy.Client"
159+
childArgs += "launch"
160+
childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
161+
}
162+
163+
// args
164+
if (appArgs.childArgs != null) {
165+
if (!deployOnCluster || clusterManager == STANDALONE) {
166+
childArgs ++= appArgs.childArgs
167+
} else if (clusterManager == YARN) {
168+
for (arg <- appArgs.childArgs) {
169+
childArgs += ("--args", arg)
170+
}
171+
}
172+
}
173+
174+
(childArgs, childClasspath, sysProps, childMainClass)
175+
}
176+
177+
def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
178+
sysProps: Map[String, String], childMainClass: String) {
179+
val loader = new ExecutorURLClassLoader(new Array[URL](0),
180+
Thread.currentThread.getContextClassLoader)
181+
Thread.currentThread.setContextClassLoader(loader)
182+
183+
for (jar <- childClasspath) {
184+
addJarToClasspath(jar, loader)
185+
}
186+
187+
for ((key, value) <- sysProps) {
188+
System.setProperty(key, value)
189+
}
190+
191+
val mainClass = Class.forName(childMainClass, true, loader)
192+
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
193+
mainMethod.invoke(null, childArgs.toArray)
194+
}
195+
196+
def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
197+
val localJarFile = new File(localJar)
198+
if (!localJarFile.exists()) {
199+
System.err.println("Jar does not exist: " + localJar + ". Skipping.")
200+
}
201+
202+
val url = localJarFile.getAbsoluteFile.toURI.toURL
203+
loader.addURL(url)
204+
}
205+
}
206+
207+
private[spark] class OptionAssigner(val value: String,
208+
val clusterManager: Int,
209+
val deployOnCluster: Boolean,
210+
val clOption: String = null,
211+
val sysProp: String = null
212+
) { }

0 commit comments

Comments
 (0)