Skip to content

Commit befcf0a

Browse files
ifilonenkofoxish
authored andcommitted
Python Bindings for launching PySpark Jobs from the JVM (#364)
* Adding PySpark Submit functionality. Launching Python from JVM * Addressing scala idioms related to PR351 * Removing extends Logging which was necessary for LogInfo * Refactored code to leverage the ContainerLocalizedFileResolver * Modified Unit tests so that they would pass * Modified Unit Test input to pass Unit Tests * Setup working environent for integration tests for PySpark * Comment out Python thread logic until Jenkins has python in Python * Modifying PythonExec to pass on Jenkins * Modifying python exec * Added unit tests to ClientV2 and refactored to include pyspark submission resources * Modified unit test check * Scalastyle * PR 348 file conflicts * Refactored unit tests and styles * further scala stylzing and logic * Modified unit tests to be more specific towards Class in question * Removed space delimiting for methods * Submission client redesign to use a step-based builder pattern. This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments. * Don't add the init-container step if all URIs are local. * Python arguments patch + tests + docs * Revert "Python arguments patch + tests + docs" This reverts commit 4533df2. * Revert "Don't add the init-container step if all URIs are local." This reverts commit e103225. * Revert "Submission client redesign to use a step-based builder pattern." This reverts commit 5499f6d. * style changes * space for styling
1 parent 6f6cfd6 commit befcf0a

File tree

21 files changed

+831
-73
lines changed

21 files changed

+831
-73
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr
2424

2525
This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically):
2626

27+
- Bloomberg
2728
- Google
2829
- Haiwen
2930
- Hyperpilot

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,8 @@ object SparkSubmit {
335335
(clusterManager, deployMode) match {
336336
case (KUBERNETES, CLIENT) =>
337337
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
338-
case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
339-
printErrorAndExit("Kubernetes does not currently support python or R applications.")
338+
case (KUBERNETES, CLUSTER) if args.isR =>
339+
printErrorAndExit("Kubernetes does not currently support R applications.")
340340
case (STANDALONE, CLUSTER) if args.isPython =>
341341
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
342342
"applications on standalone clusters.")
@@ -620,8 +620,14 @@ object SparkSubmit {
620620

621621
if (isKubernetesCluster) {
622622
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
623-
childArgs += args.primaryResource
624-
childArgs += args.mainClass
623+
if (args.isPython) {
624+
childArgs += args.primaryResource
625+
childArgs += "org.apache.spark.deploy.PythonRunner"
626+
childArgs += args.pyFiles
627+
} else {
628+
childArgs += args.primaryResource
629+
childArgs += args.mainClass
630+
}
625631
childArgs ++= args.childArgs
626632
}
627633

docs/running-on-kubernetes.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,32 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio
180180
kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens
181181
is currently supported.
182182

183+
### Running PySpark
184+
185+
Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
186+
Python files can be distributed by including, in the conf, `--py-files`
187+
188+
Below is an example submission:
189+
190+
191+
```
192+
bin/spark-submit \
193+
--deploy-mode cluster \
194+
--master k8s://http://127.0.0.1:8001 \
195+
--kubernetes-namespace default \
196+
--conf spark.executor.memory=500m \
197+
--conf spark.driver.memory=1G \
198+
--conf spark.driver.cores=1 \
199+
--conf spark.executor.cores=1 \
200+
--conf spark.executor.instances=1 \
201+
--conf spark.app.name=spark-pi \
202+
--conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
203+
--conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
204+
--conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
205+
--py-files local:///opt/spark/examples/src/main/python/sort.py \
206+
local:///opt/spark/examples/src/main/python/pi.py 100
207+
```
208+
183209
## Dynamic Executor Scaling
184210

185211
Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ package object constants {
6767
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
6868
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
6969
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
70+
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
71+
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
7072

7173
// Bootstrapping dependencies with the init-container
7274
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ private[spark] class Client(
4747
appName: String,
4848
kubernetesResourceNamePrefix: String,
4949
kubernetesAppId: String,
50+
mainAppResource: String,
51+
pythonResource: Option[PythonSubmissionResourcesImpl],
5052
mainClass: String,
5153
sparkConf: SparkConf,
5254
appArgs: Array[String],
53-
sparkJars: Seq[String],
54-
sparkFiles: Seq[String],
5555
waitForAppCompletion: Boolean,
5656
kubernetesClient: KubernetesClient,
5757
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
@@ -82,9 +82,7 @@ private[spark] class Client(
8282
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
8383

8484
def run(): Unit = {
85-
validateNoDuplicateFileNames(sparkJars)
86-
validateNoDuplicateFileNames(sparkFiles)
87-
85+
val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs)
8886
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
8987
sparkConf,
9088
KUBERNETES_DRIVER_LABEL_PREFIX,
@@ -136,7 +134,7 @@ private[spark] class Client(
136134
.endEnv()
137135
.addNewEnv()
138136
.withName(ENV_DRIVER_ARGS)
139-
.withValue(appArgs.mkString(" "))
137+
.withValue(arguments.mkString(" "))
140138
.endEnv()
141139
.withNewResources()
142140
.addToRequests("cpu", driverCpuQuantity)
@@ -182,10 +180,13 @@ private[spark] class Client(
182180
.map(_.build())
183181

184182
val containerLocalizedFilesResolver = initContainerComponentsProvider
185-
.provideContainerLocalizedFilesResolver()
183+
.provideContainerLocalizedFilesResolver(mainAppResource)
186184
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
187185
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()
188-
186+
val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()
187+
val resolvedPrimaryPySparkResource = pythonResource.map {
188+
p => p.primaryPySparkResource(containerLocalizedFilesResolver)
189+
}.getOrElse("")
189190
val initContainerBundler = initContainerComponentsProvider
190191
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
191192
resolvedSparkJars ++ resolvedSparkFiles)
@@ -221,7 +222,7 @@ private[spark] class Client(
221222
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
222223
case (confKey, confValue) => s"-D$confKey=$confValue"
223224
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
224-
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
225+
val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec()
225226
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
226227
.addNewEnv()
227228
.withName(ENV_MOUNTED_CLASSPATH)
@@ -233,7 +234,15 @@ private[spark] class Client(
233234
.endEnv()
234235
.endContainer()
235236
.endSpec()
236-
.build()
237+
val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter()
238+
val resolvedDriverPod = pythonResource.map {
239+
p => p.driverPodWithPySparkEnvs(
240+
driverPodFileMounter,
241+
resolvedPrimaryPySparkResource,
242+
resolvedPySparkFiles.mkString(","),
243+
driverContainer.getName,
244+
resolvedDriverPodBuilder
245+
)}.getOrElse(resolvedDriverPodBuilder.build())
237246
Utils.tryWithResource(
238247
kubernetesClient
239248
.pods()
@@ -271,17 +280,6 @@ private[spark] class Client(
271280
}
272281
}
273282
}
274-
275-
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
276-
val fileNamesToUris = allFiles.map { file =>
277-
(new File(Utils.resolveURI(file).getPath).getName, file)
278-
}
279-
fileNamesToUris.groupBy(_._1).foreach {
280-
case (fileName, urisWithFileName) =>
281-
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
282-
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
283-
}
284-
}
285283
}
286284

287285
private[spark] object Client {
@@ -292,22 +290,34 @@ private[spark] object Client {
292290
val appArgs = args.drop(2)
293291
run(sparkConf, mainAppResource, mainClass, appArgs)
294292
}
295-
296293
def run(
297294
sparkConf: SparkConf,
298295
mainAppResource: String,
299296
mainClass: String,
300297
appArgs: Array[String]): Unit = {
298+
val isPython = mainAppResource.endsWith(".py")
299+
val pythonResource: Option[PythonSubmissionResourcesImpl] =
300+
if (isPython) {
301+
Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs))
302+
} else None
303+
// Since you might need jars for SQL UDFs in PySpark
304+
def sparkJarFilter(): Seq[String] =
305+
pythonResource.map {p => p.sparkJars}.getOrElse(
306+
Option(mainAppResource)
307+
.filterNot(_ == SparkLauncher.NO_RESOURCE)
308+
.toSeq)
301309
val sparkJars = sparkConf.getOption("spark.jars")
302310
.map(_.split(","))
303-
.getOrElse(Array.empty[String]) ++
304-
Option(mainAppResource)
305-
.filterNot(_ == SparkLauncher.NO_RESOURCE)
306-
.toSeq
311+
.getOrElse(Array.empty[String]) ++ sparkJarFilter()
307312
val launchTime = System.currentTimeMillis
308313
val sparkFiles = sparkConf.getOption("spark.files")
309314
.map(_.split(","))
310315
.getOrElse(Array.empty[String])
316+
val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles}
317+
validateNoDuplicateFileNames(sparkJars)
318+
validateNoDuplicateFileNames(sparkFiles)
319+
pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)}
320+
val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String])
311321
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
312322
// The resource name prefix is derived from the application name, making it easy to connect the
313323
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
@@ -326,6 +336,7 @@ private[spark] object Client {
326336
namespace,
327337
sparkJars,
328338
sparkFiles,
339+
pySparkFiles,
329340
sslOptionsProvider.getSslOptions)
330341
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
331342
master,
@@ -346,16 +357,26 @@ private[spark] object Client {
346357
appName,
347358
kubernetesResourceNamePrefix,
348359
kubernetesAppId,
360+
mainAppResource,
361+
pythonResource,
349362
mainClass,
350363
sparkConf,
351364
appArgs,
352-
sparkJars,
353-
sparkFiles,
354365
waitForAppCompletion,
355366
kubernetesClient,
356367
initContainerComponentsProvider,
357368
kubernetesCredentialsMounterProvider,
358369
loggingPodStatusWatcher).run()
359370
}
360371
}
372+
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
373+
val fileNamesToUris = allFiles.map { file =>
374+
(new File(Utils.resolveURI(file).getPath).getName, file)
375+
}
376+
fileNamesToUris.groupBy(_._1).foreach {
377+
case (fileName, urisWithFileName) =>
378+
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
379+
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
380+
}
381+
}
361382
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver {
2424
def resolveSubmittedAndRemoteSparkJars(): Seq[String]
2525
def resolveSubmittedSparkJars(): Seq[String]
2626
def resolveSubmittedSparkFiles(): Seq[String]
27+
def resolveSubmittedPySparkFiles(): Seq[String]
28+
def resolvePrimaryResourceFile(): String
2729
}
2830

2931
private[spark] class ContainerLocalizedFilesResolverImpl(
3032
sparkJars: Seq[String],
3133
sparkFiles: Seq[String],
34+
pySparkFiles: Seq[String],
35+
primaryPyFile: String,
3236
jarsDownloadPath: String,
3337
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {
3438

39+
3540
override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = {
3641
sparkJars.map { jar =>
3742
val jarUri = Utils.resolveURI(jar)
@@ -53,16 +58,30 @@ private[spark] class ContainerLocalizedFilesResolverImpl(
5358
resolveSubmittedFiles(sparkFiles, filesDownloadPath)
5459
}
5560

56-
private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
57-
files.map { file =>
58-
val fileUri = Utils.resolveURI(file)
59-
Option(fileUri.getScheme).getOrElse("file") match {
60-
case "file" =>
61-
val fileName = new File(fileUri.getPath).getName
62-
s"$downloadPath/$fileName"
63-
case _ =>
64-
file
65-
}
61+
override def resolveSubmittedPySparkFiles(): Seq[String] = {
62+
def filterMainResource(x: String) = x match {
63+
case `primaryPyFile` => None
64+
case _ => Some(resolveFile(x, filesDownloadPath))
65+
}
66+
pySparkFiles.flatMap(x => filterMainResource(x))
67+
}
68+
69+
override def resolvePrimaryResourceFile(): String = {
70+
Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("")
71+
}
72+
73+
private def resolveFile(file: String, downloadPath: String) = {
74+
val fileUri = Utils.resolveURI(file)
75+
Option(fileUri.getScheme).getOrElse("file") match {
76+
case "file" =>
77+
val fileName = new File(fileUri.getPath).getName
78+
s"$downloadPath/$fileName"
79+
case _ =>
80+
file
6681
}
6782
}
83+
84+
private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
85+
files.map { file => resolveFile(file, downloadPath) }
86+
}
6887
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ import org.apache.spark.util.Utils
3232
*/
3333
private[spark] trait DriverInitContainerComponentsProvider {
3434

35-
def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
35+
def provideContainerLocalizedFilesResolver(
36+
mainAppResource: String): ContainerLocalizedFilesResolver
3637
def provideInitContainerSubmittedDependencyUploader(
3738
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
3839
def provideSubmittedDependenciesSecretBuilder(
3940
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
4041
: Option[SubmittedDependencySecretBuilder]
4142
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
43+
def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter
4244
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
4345
uris: Iterable[String]): Option[InitContainerBundle]
4446
}
@@ -49,6 +51,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
4951
namespace: String,
5052
sparkJars: Seq[String],
5153
sparkFiles: Seq[String],
54+
pySparkFiles: Seq[String],
5255
resourceStagingServerExternalSslOptions: SSLOptions)
5356
extends DriverInitContainerComponentsProvider {
5457

@@ -104,6 +107,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
104107
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
105108
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
106109
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
110+
private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles)
107111

108112
private def provideInitContainerConfigMap(
109113
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
@@ -130,17 +134,18 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
130134
}
131135
new SparkInitContainerConfigMapBuilderImpl(
132136
sparkJars,
133-
sparkFiles,
137+
sparkFiles ++ pySparkSubmitted,
134138
jarsDownloadPath,
135139
filesDownloadPath,
136140
configMapName,
137141
configMapKey,
138142
submittedDependencyConfigPlugin).build()
139143
}
140144

141-
override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
145+
override def provideContainerLocalizedFilesResolver(mainAppResource: String)
146+
: ContainerLocalizedFilesResolver = {
142147
new ContainerLocalizedFilesResolverImpl(
143-
sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
148+
sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath)
144149
}
145150

146151
private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
@@ -159,7 +164,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
159164
namespace,
160165
stagingServerUri,
161166
sparkJars,
162-
sparkFiles,
167+
sparkFiles ++ pySparkSubmitted,
163168
resourceStagingServerExternalSslOptions,
164169
RetrofitClientFactoryImpl)
165170
}
@@ -201,13 +206,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
201206
configMapKey,
202207
resourceStagingServerSecretPlugin)
203208
}
204-
209+
override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = {
210+
new DriverPodKubernetesFileMounterImpl()
211+
}
205212
override def provideInitContainerBundle(
206213
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
207214
uris: Iterable[String]): Option[InitContainerBundle] = {
208-
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
209-
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
210-
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
215+
// Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
216+
// is empty or only has `local://` URIs
217+
if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) {
211218
Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
212219
provideInitContainerBootstrap(),
213220
provideExecutorInitContainerConfiguration()))

0 commit comments

Comments
 (0)