From a371d26ba770c781b86ed20d2922ab8fc043f52e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 15 May 2014 17:08:58 -0700 Subject: [PATCH 01/12] Route bin/pyspark through Spark submit The bin/pyspark script takes two pathways, depending on the application. If the application is a python file, bin/pyspark passes the python file directly to Spark submit, which launches the python application as a sub-process within the JVM. If the application is the pyspark shell, however, bin/pyspark starts the python REPL as the parent process, which launches the JVM as a sub-process. A significant benefit here is that all keyboard signals are propagated first to the Python interpreter properly. The existing code already provided a code path to do this; all we need to change is to use spark-submit instead of spark-class to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case. This has been tested locally (OSX) for both cases, and using IPython. --- bin/pyspark | 14 +++-- .../apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 60 ++++++++++++------- .../spark/deploy/SparkSubmitArguments.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 12 ++++ python/pyspark/java_gateway.py | 10 +++- python/pyspark/shell.py | 2 +- 7 files changed, 70 insertions(+), 32 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 10e35e0f1734e..2e754b50cd165 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -51,14 +51,20 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py +export PYSPARK_SUBMIT_ARGS="$@" if [ -n "$IPYTHON_OPTS" ]; then IPYTHON=1 fi -# Only use ipython if no command line arguments were provided [SPARK-1134] -if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then - exec ipython $IPYTHON_OPTS +# If a python file is provided, directly run spark-submit +if [[ "$1" =~ \.py$ ]]; then + exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS else - exec "$PYSPARK_PYTHON" "$@" + # Only use ipython if no command line arguments were provided [SPARK-1134] + if [[ "$IPYTHON" = "1" ]]; then + exec ipython $IPYTHON_OPTS + else + exec "$PYSPARK_PYTHON" + fi fi diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index e20d4486c8f0c..2dfa02bd26f13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -42,7 +42,7 @@ object PythonRunner { // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument val pathElements = new ArrayBuffer[String] - pathElements ++= pyFiles.split(",") + pathElements ++= Option(pyFiles).getOrElse("").split(",") pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e86182e4c56ce..e86d2725c8e74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,10 +41,10 @@ object SparkSubmit { private var clusterManager: Int = LOCAL /** - * A special jar name that indicates the class being run is inside of Spark itself, - * and therefore no user jar is needed. + * Special primary resource names that represent shells rather than application jars. */ - private val RESERVED_JAR_NAME = "spark-internal" + private val SPARK_SHELL = "spark-shell" + private val PYSPARK_SHELL = "pyspark-shell" def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) @@ -71,8 +71,8 @@ object SparkSubmit { * entries for the child, a list of system properties, a list of env vars * and the main class for the child */ - private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], - ArrayBuffer[String], Map[String, String], String) = { + private[spark] def createLaunchEnv(args: SparkSubmitArguments) + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { if (args.master.startsWith("local")) { clusterManager = LOCAL } else if (args.master.startsWith("yarn")) { @@ -121,24 +121,30 @@ object SparkSubmit { printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } - // If we're running a Python app, set the Java class to run to be our PythonRunner, add - // Python files to deployment list, and pass the main file and Python path to PythonRunner + // If we're running a python app, set the main class to our specific python runner if (isPython) { if (deployOnCluster) { printErrorAndExit("Cannot currently run Python driver programs on cluster") } - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "py4j.GatewayServer" + args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0") + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.files = Utils.mergeFileLists(args.files, args.primaryResource) + } val pyFiles = Option(args.pyFiles).getOrElse("") - args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs - args.primaryResource = RESERVED_JAR_NAME + args.files = Utils.mergeFileLists(args.files, pyFiles) sysProps("spark.submit.pyFiles") = pyFiles } // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { childMainClass = args.mainClass - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } } else if (clusterManager == YARN) { @@ -219,7 +225,7 @@ object SparkSubmit { // For python files, the primary resource is already distributed as a regular file if (!isYarnCluster && !isPython) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) @@ -293,8 +299,8 @@ object SparkSubmit { } private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { - val localJarFile = new File(new URI(localJar).getPath()) - if (!localJarFile.exists()) { + val localJarFile = new File(new URI(localJar).getPath) + if (!localJarFile.exists) { printWarning(s"Jar $localJar does not exist, skipping.") } @@ -303,14 +309,24 @@ object SparkSubmit { } /** - * Merge a sequence of comma-separated file lists, some of which may be null to indicate - * no files, into a single comma-separated string. + * Return whether the given primary resource represents a user jar. + */ + private def isUserJar(primaryResource: String): Boolean = { + !isShell(primaryResource) && !isPython(primaryResource) + } + + /** + * Return whether the given primary resource represents a shell. + */ + private def isShell(primaryResource: String): Boolean = { + primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL + } + + /** + * Return whether the given primary resource requires running python. */ - private[spark] def mergeFileLists(lists: String*): String = { - val merged = lists.filter(_ != null) - .flatMap(_.split(",")) - .mkString(",") - if (merged == "") null else merged + private[spark] def isPython(primaryResource: String): Boolean = { + primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 2d327aa3fb27f..256626e9ed6af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { case v => primaryResource = v inSparkOpts = false - isPython = v.endsWith(".py") + isPython = SparkSubmit.isPython(v) parse(tail) } } else { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 388f7222428db..6935fc1ddd72d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1166,4 +1166,16 @@ private[spark] object Utils extends Logging { true } } + + /** + * Merge a sequence of comma-separated file lists into a single comma-separated string. + * The provided strings may be null or empty to indicate no files. + */ + def mergeFileLists(lists: String*): String = { + lists + .filter(_ != null) + .filter(_ != "") + .flatMap(_.split(",")) + .mkString(",") + } } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3d0936fdca911..6af2b94d31a32 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -34,9 +34,13 @@ def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and settings from spark-env.sh on_windows = platform.system() == "Windows" - script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" - command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", - "--die-on-broken-pipe", "0"] + script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" + submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") + if submit_args is not None: + submit_args = submit_args.split(" ") + else: + submit_args = [] + command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index d172d588bfbd8..ebd714db7a918 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -40,7 +40,7 @@ if os.environ.get("SPARK_EXECUTOR_URI"): SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) -sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files) +sc = SparkContext(appName="PySparkShell", pyFiles=add_files) print("""Welcome to ____ __ From afe47bfd7406198453df3f38f0083b7182749011 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 15 May 2014 17:57:37 -0700 Subject: [PATCH 02/12] Fix spark shell --- bin/spark-shell | 4 ++-- bin/spark-shell.cmd | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/spark-shell b/bin/spark-shell index 7f03349c5e910..bc459166d0483 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -46,11 +46,11 @@ function main(){ # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" - $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS - $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main fi } diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd index ca0c722c926f5..4b9708a8c03f3 100755 --- a/bin/spark-shell.cmd +++ b/bin/spark-shell.cmd @@ -19,4 +19,4 @@ rem set SPARK_HOME=%~dp0.. -cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main +cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main From fe4c8a74e21db08ec9a0111cf7f82e7cc62e3ccb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 15 May 2014 18:22:04 -0700 Subject: [PATCH 03/12] Update --help for bin/pyspark --- bin/pyspark | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bin/pyspark b/bin/pyspark index 2e754b50cd165..6f0c2a8c4fe7c 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 +if [[ "$@" == *--help* ]]; then + echo "Usage: ./bin/pyspark [python file] [options]" + ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + exit 0 +fi + # Exit if the user hasn't compiled Spark if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark From 6fba4123446d35b041ce4c958d27fda8e5ec59fd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 11:37:37 -0700 Subject: [PATCH 04/12] Deal with quotes + address various comments --- bin/pyspark | 24 +++++++++++--- .../org/apache/spark/deploy/SparkSubmit.scala | 18 ++++++++-- .../scala/org/apache/spark/util/Utils.scala | 12 ------- python/pyspark/java_gateway.py | 33 ++++++++++++++++--- 4 files changed, 63 insertions(+), 24 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 6f0c2a8c4fe7c..f147da22da2c7 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 if [[ "$@" == *--help* ]]; then - echo "Usage: ./bin/pyspark [python file] [options]" + echo "Usage: ./bin/pyspark [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 fi @@ -57,15 +57,31 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py -export PYSPARK_SUBMIT_ARGS="$@" +# If IPython options are specified, assume user wants to run IPython if [ -n "$IPYTHON_OPTS" ]; then IPYTHON=1 fi -# If a python file is provided, directly run spark-submit +# Build up arguments list manually to preserve quotes. We export Spark submit arguments as an +# environment variable because shell.py must run as a PYTHONSTARTUP script, which does not take +# in arguments. This is required mainly for IPython notebooks. + +PYSPARK_SUBMIT_ARGS="" +whitespace="[[:space:]]" +for i in "$@"; do + if [[ $i =~ $whitespace ]]; then + i=\"$i\" + fi + PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i" +done +export PYSPARK_SUBMIT_ARGS + +# If a python file is provided, directly run spark-submit. if [[ "$1" =~ \.py$ ]]; then - exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS + echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." + echo -e "Use ./bin/spark-submit \n" + exec $FWDIR/bin/spark-submit "$@" else # Only use ipython if no command line arguments were provided [SPARK-1134] if [[ "$IPYTHON" = "1" ]]; then diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e86d2725c8e74..9424b65573726 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -134,10 +134,10 @@ object SparkSubmit { // Usage: PythonAppRunner
[app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs - args.files = Utils.mergeFileLists(args.files, args.primaryResource) + args.files = mergeFileLists(args.files, args.primaryResource) } val pyFiles = Option(args.pyFiles).getOrElse("") - args.files = Utils.mergeFileLists(args.files, pyFiles) + args.files = mergeFileLists(args.files, pyFiles) sysProps("spark.submit.pyFiles") = pyFiles } @@ -300,7 +300,7 @@ object SparkSubmit { private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { val localJarFile = new File(new URI(localJar).getPath) - if (!localJarFile.exists) { + if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") } @@ -328,6 +328,18 @@ object SparkSubmit { private[spark] def isPython(primaryResource: String): Boolean = { primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL } + + /** + * Merge a sequence of comma-separated file lists, some of which may be null to indicate + * no files, into a single comma-separated string. + */ + private[spark] def mergeFileLists(lists: String*): String = { + lists + .filter(_ != null) + .filter(_ != "") + .flatMap(_.split(",")) + .mkString(",") + } } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6935fc1ddd72d..388f7222428db 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1166,16 +1166,4 @@ private[spark] object Utils extends Logging { true } } - - /** - * Merge a sequence of comma-separated file lists into a single comma-separated string. - * The provided strings may be null or empty to indicate no files. - */ - def mergeFileLists(lists: String*): String = { - lists - .filter(_ != null) - .filter(_ != "") - .flatMap(_.split(",")) - .mkString(",") - } } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6af2b94d31a32..77a9c8dfa936d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -23,7 +23,6 @@ from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient - def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] @@ -36,10 +35,7 @@ def launch_gateway(): on_windows = platform.system() == "Windows" script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") - if submit_args is not None: - submit_args = submit_args.split(" ") - else: - submit_args = [] + submit_args = split_preserve_quotes(submit_args) command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: @@ -80,3 +76,30 @@ def run(self): java_import(gateway.jvm, "scala.Tuple2") return gateway + +def split_preserve_quotes(args): + """ + Given a string of space-delimited arguments with quotes, + split it into a list while preserving the quote boundaries. + """ + if args is None: + return [] + split_list = [] + quoted_string = "" + wait_for_quote = False + for arg in args.split(" "): + if not wait_for_quote: + if arg.startswith("\""): + wait_for_quote = True + quoted_string = arg + else: + split_list.append(arg) + else: + quoted_string += " " + arg + if quoted_string.endswith("\""): + # Strip quotes + quoted_string = quoted_string[1:-1] + split_list.append(quoted_string) + quoted_string = "" + wait_for_quote = False + return split_list From a823661f8fc04577f9cc3b2a46e100468f56a963 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 11:57:24 -0700 Subject: [PATCH 05/12] Fix --die-on-broken-pipe not propagated properly At the end of parsing options, we add an empty string to child arguments. This caused the arguments to py4j.JavaGateway to be interpreted as ["", "--die-on-broken-pipe", "0"]. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9424b65573726..04245223330e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -128,7 +128,7 @@ object SparkSubmit { } if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "py4j.GatewayServer" - args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0") + args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner
[app arguments] diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 256626e9ed6af..264d4544cd31c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -302,7 +302,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { parse(tail) } } else { - childArgs += value + if (!value.isEmpty) { + childArgs += value + } parse(tail) } From 06eb1382f061673c26467bb2d2659ad1d2e9194c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 14:50:08 -0700 Subject: [PATCH 06/12] Use shlex instead of writing our own parser --- python/pyspark/java_gateway.py | 30 ++---------------------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 77a9c8dfa936d..3cacbb4991e49 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -18,6 +18,7 @@ import os import sys import signal +import shlex import platform from subprocess import Popen, PIPE from threading import Thread @@ -35,7 +36,7 @@ def launch_gateway(): on_windows = platform.system() == "Windows" script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") - submit_args = split_preserve_quotes(submit_args) + submit_args = shlex.split(submit_args) command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: @@ -76,30 +77,3 @@ def run(self): java_import(gateway.jvm, "scala.Tuple2") return gateway - -def split_preserve_quotes(args): - """ - Given a string of space-delimited arguments with quotes, - split it into a list while preserving the quote boundaries. - """ - if args is None: - return [] - split_list = [] - quoted_string = "" - wait_for_quote = False - for arg in args.split(" "): - if not wait_for_quote: - if arg.startswith("\""): - wait_for_quote = True - quoted_string = arg - else: - split_list.append(arg) - else: - quoted_string += " " + arg - if quoted_string.endswith("\""): - # Strip quotes - quoted_string = quoted_string[1:-1] - split_list.append(quoted_string) - quoted_string = "" - wait_for_quote = False - return split_list From b7ba0d86b350de91628f5d3d28796e4cb41f79d0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 15:00:14 -0700 Subject: [PATCH 07/12] Address a few comments (minor) --- bin/pyspark | 6 +++--- bin/spark-shell | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index f147da22da2c7..0188e342a0f22 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,7 +25,7 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 -if [[ "$@" == *--help* ]]; then +if [[ "$@" = *--help ]] || [[ "$@" = *--h ]]; then echo "Usage: ./bin/pyspark [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 @@ -79,8 +79,8 @@ export PYSPARK_SUBMIT_ARGS # If a python file is provided, directly run spark-submit. if [[ "$1" =~ \.py$ ]]; then - echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." - echo -e "Use ./bin/spark-submit \n" + echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2 + echo -e "Use ./bin/spark-submit \n" 1>&2 exec $FWDIR/bin/spark-submit "$@" else # Only use ipython if no command line arguments were provided [SPARK-1134] diff --git a/bin/spark-shell b/bin/spark-shell index bc459166d0483..36129a552852f 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -28,7 +28,7 @@ esac # Enter posix mode for bash set -o posix -if [[ "$@" == *--help* ]]; then +if [[ "$@" = *--help ]] || [[ "$@" = *--h ]]; then echo "Usage: ./bin/spark-shell [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 From 456d844c5de4489f1c7ba823338b49a42256db5d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 15:24:08 -0700 Subject: [PATCH 08/12] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set --- python/pyspark/java_gateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3cacbb4991e49..91ae8263f66b8 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -36,6 +36,7 @@ def launch_gateway(): on_windows = platform.system() == "Windows" script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") + submit_args = submit_args if submit_args is not None else "" submit_args = shlex.split(submit_args) command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: From 1866f8573620c49200c57b81761638602e1f916d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 15:47:14 -0700 Subject: [PATCH 09/12] Windows is not cooperating Splitting a string on java.io.File.separator on Windows (i.e. "\") fails with a PatternSyntaxException. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 388f7222428db..0c7cff019fce1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging { * Strip the directory from a path name */ def stripDirectory(path: String): String = { - path.split(File.separator).last + new File(path).getName } /** From c8cb3bfd3bf210edd1ef0e7601e839f7f21909eb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 18:01:03 -0700 Subject: [PATCH 10/12] Handle perverse app names (with escaped quotes) Now, the following works: bin/pyspark --master local --name "bat \" man \" is \" bat \" man" --- bin/pyspark | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 0188e342a0f22..10454375c6cdb 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -63,16 +63,15 @@ if [ -n "$IPYTHON_OPTS" ]; then IPYTHON=1 fi -# Build up arguments list manually to preserve quotes. We export Spark submit arguments as an -# environment variable because shell.py must run as a PYTHONSTARTUP script, which does not take -# in arguments. This is required mainly for IPython notebooks. +# Build up arguments list manually to preserve quotes and backslashes. +# We export Spark submit arguments as an environment variable because shell.py must run as a +# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks. PYSPARK_SUBMIT_ARGS="" whitespace="[[:space:]]" for i in "$@"; do - if [[ $i =~ $whitespace ]]; then - i=\"$i\" - fi + if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi + if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i" done export PYSPARK_SUBMIT_ARGS From 01066fa6ce4a61097c75f3611c38a87d16db9060 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 19:26:23 -0700 Subject: [PATCH 11/12] bin/pyspark for Windows This is tested on Windows 7. The only thing that does not work is having perverse app names with escaped quotes (e.g. "foo \" jam"). App names with spaces are fine. Other than that, the behavior on Windows is the same on OSX. --- bin/pyspark2.cmd | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index d7cfd5eec501c..0ef9eea95342e 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -31,7 +31,7 @@ set FOUND_JAR=0 for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( set FOUND_JAR=1 ) -if "%FOUND_JAR%"=="0" ( +if [%FOUND_JAR%] == [0] ( echo Failed to find Spark assembly JAR. echo You need to build Spark with sbt\sbt assembly before running this program. goto exit @@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if it exists if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" rem Figure out which Python to use. -if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python +if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python set PYTHONPATH=%FWDIR%python;%PYTHONPATH% set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py +set PYSPARK_SUBMIT_ARGS=%* echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH% -"%PYSPARK_PYTHON%" %* +rem Check whether the argument is a file +for /f %%i in ('echo %1^| findstr /R "\.py"') do ( + set PYTHON_FILE=%%i +) + +if [%PYTHON_FILE%] == [] ( + %PYSPARK_PYTHON% +) else ( + echo. + echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0. + echo Use ./bin/spark-submit ^ + echo. + "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS% +) + :exit From bf37e36d15d85f10f43152cb35b52358132e6887 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 22:21:22 -0700 Subject: [PATCH 12/12] Minor changes --- bin/pyspark | 2 +- bin/spark-shell | 2 +- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 9 ++++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 10454375c6cdb..9e1364e44c8c4 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,7 +25,7 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 -if [[ "$@" = *--help ]] || [[ "$@" = *--h ]]; then +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then echo "Usage: ./bin/pyspark [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 diff --git a/bin/spark-shell b/bin/spark-shell index 36129a552852f..c158683ab3f99 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -28,7 +28,7 @@ esac # Enter posix mode for bash set -o posix -if [[ "$@" = *--help ]] || [[ "$@" = *--h ]]; then +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then echo "Usage: ./bin/spark-shell [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 04245223330e4..a99b2176e2b5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -334,11 +334,10 @@ object SparkSubmit { * no files, into a single comma-separated string. */ private[spark] def mergeFileLists(lists: String*): String = { - lists - .filter(_ != null) - .filter(_ != "") - .flatMap(_.split(",")) - .mkString(",") + val merged = lists.filter(_ != null) + .flatMap(_.split(",")) + .mkString(",") + if (merged == "") null else merged } }