Skip to content

Commit ab731f1

Browse files
mccheahfoxish
authored andcommitted
Allow adding arbitrary files (alteryx#71)
* Allow adding arbitrary files * Address comments and add documentation
1 parent 261a624 commit ab731f1

File tree

12 files changed

+243
-32
lines changed

12 files changed

+243
-32
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ object SparkSubmit extends CommandLineUtils {
485485
sysProp = "spark.kubernetes.namespace"),
486486
OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER,
487487
sysProp = "spark.kubernetes.driver.uploads.jars"),
488+
OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER,
489+
sysProp = "spark.kubernetes.driver.uploads.files"),
488490

489491
// Other options
490492
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
7474
// Kubernetes only
7575
var kubernetesNamespace: String = null
7676
var kubernetesUploadJars: String = null
77+
var kubernetesUploadFiles: String = null
7778

7879
// Standalone cluster mode only
7980
var supervise: Boolean = false
@@ -203,6 +204,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
203204
kubernetesUploadJars = Option(kubernetesUploadJars)
204205
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars"))
205206
.orNull
207+
kubernetesUploadFiles = Option(kubernetesUploadFiles)
208+
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files"))
209+
.orNull
206210

207211
// Try to set main class from JAR if no --class argument is given
208212
if (mainClass == null && !isPython && !isR && primaryResource != null) {
@@ -447,6 +451,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
447451
case KUBERNETES_UPLOAD_JARS =>
448452
kubernetesUploadJars = value
449453

454+
case KUBERNETES_UPLOAD_FILES =>
455+
kubernetesUploadFiles = value
456+
450457
case HELP =>
451458
printUsageAndExit(0)
452459

docs/running-on-kubernetes.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,20 @@ from the other deployment modes. See the [configuration page](configuration.html
217217
<td><code>spark.kubernetes.driver.uploads.jars</code></td>
218218
<td>(none)</td>
219219
<td>
220-
Comma-separated list of jars to sent to the driver and all executors when submitting the application in cluster
220+
Comma-separated list of jars to send to the driver and all executors when submitting the application in cluster
221221
mode. Refer to <a href="running-on-kubernetes.html#adding-other-jars">adding other jars</a> for more information.
222222
</td>
223223
</tr>
224+
<tr>
225+
<td><code>spark.kubernetes.driver.uploads.files</code></td>
226+
<td>(none)</td>
227+
<td>
228+
Comma-separated list of files to send to the driver and all executors when submitting the application in cluster
229+
mode. The files are added in a flat hierarchy to the current working directory of the driver, having the same
230+
names as the names of the original files. Note that two files with the same name cannot be added, even if they
231+
were in different source directories on the client disk.
232+
</td>
233+
</tr>
224234
<tr>
225235
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
226236
<td>executorMemory * 0.10, with minimum of 384 </td>

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class SparkSubmitOptionParser {
8080
protected final String KUBERNETES_MASTER = "--kubernetes-master";
8181
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
8282
protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars";
83+
protected final String KUBERNETES_UPLOAD_FILES = "--upload-files";
8384

8485
/**
8586
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -122,7 +123,8 @@ class SparkSubmitOptionParser {
122123
{ TOTAL_EXECUTOR_CORES },
123124
{ KUBERNETES_MASTER },
124125
{ KUBERNETES_NAMESPACE },
125-
{ KUBERNETES_UPLOAD_JARS }
126+
{ KUBERNETES_UPLOAD_JARS },
127+
{ KUBERNETES_UPLOAD_FILES }
126128
};
127129

128130
/**

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ private[spark] class Client(
6161
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
6262
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
6363
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
64-
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS)
64+
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty)
65+
private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty)
66+
uploadedFiles.foreach(validateNoDuplicateUploadFileNames)
6567
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
6668
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
6769

@@ -513,18 +515,40 @@ private[spark] class Client(
513515
case "container" => ContainerAppResource(appResourceUri.getPath)
514516
case other => RemoteAppResource(other)
515517
}
516-
517-
val uploadJarsBase64Contents = compressJars(uploadedJars)
518+
val uploadJarsBase64Contents = compressFiles(uploadedJars)
519+
val uploadFilesBase64Contents = compressFiles(uploadedFiles)
518520
KubernetesCreateSubmissionRequest(
519521
appResource = resolvedAppResource,
520522
mainClass = mainClass,
521523
appArgs = appArgs,
522524
secret = secretBase64String,
523525
sparkProperties = sparkConf.getAll.toMap,
524-
uploadedJarsBase64Contents = uploadJarsBase64Contents)
526+
uploadedJarsBase64Contents = uploadJarsBase64Contents,
527+
uploadedFilesBase64Contents = uploadFilesBase64Contents)
528+
}
529+
530+
// Because uploaded files should be added to the working directory of the driver, they
531+
// need to not have duplicate file names. They are added to the working directory so the
532+
// user can reliably locate them in their application. This is similar in principle to how
533+
// YARN handles its `spark.files` setting.
534+
private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = {
535+
val pathsWithDuplicateNames = uploadedFilesCommaSeparated
536+
.split(",")
537+
.groupBy(new File(_).getName)
538+
.filter(_._2.length > 1)
539+
if (pathsWithDuplicateNames.nonEmpty) {
540+
val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames
541+
.values
542+
.flatten
543+
.toList
544+
.sortBy(new File(_).getName)
545+
throw new SparkException("Cannot upload files with duplicate names via" +
546+
s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" +
547+
s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}")
548+
}
525549
}
526550

527-
private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
551+
private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
528552
maybeFilePaths
529553
.map(_.split(","))
530554
.map(CompressionUtils.createTarGzip(_))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,27 @@ package object config {
9494
private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
9595
ConfigBuilder("spark.kubernetes.driver.uploads.jars")
9696
.doc("""
97-
| Comma-separated list of jars to sent to the driver and
97+
| Comma-separated list of jars to send to the driver and
9898
| all executors when submitting the application in cluster
9999
| mode.
100100
""".stripMargin)
101101
.stringConf
102102
.createOptional
103103

104+
private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES =
105+
ConfigBuilder("spark.kubernetes.driver.uploads.files")
106+
.doc("""
107+
| Comma-separated list of files to send to the driver and
108+
| all executors when submitting the application in cluster
109+
| mode. The files are added in a flat hierarchy to the
110+
| current working directory of the driver, having the same
111+
| names as the names of the original files. Note that two
112+
| files with the same name cannot be added, even if they
113+
| were in different source directories on the client disk.
114+
""".stripMargin)
115+
.stringConf
116+
.createOptional
117+
104118
// Note that while we set a default for this when we start up the
105119
// scheduler, the specific default value is dynamically determined
106120
// based on the executor memory.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ case class KubernetesCreateSubmissionRequest(
2626
appArgs: Array[String],
2727
sparkProperties: Map[String, String],
2828
secret: String,
29-
uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
29+
uploadedJarsBase64Contents: Option[TarGzippedData],
30+
uploadedFilesBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
3031
message = "create"
3132
clientSparkVersion = SPARK_VERSION
3233
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ private[spark] object CompressionUtils extends Logging {
6868
while (usedFileNames.contains(resolvedFileName)) {
6969
val oldResolvedFileName = resolvedFileName
7070
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
71-
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
72-
s" file name $resolvedFileName instead.")
71+
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
72+
s" with file name $resolvedFileName instead.")
7373
deduplicationCounter += 1
7474
}
7575
usedFileNames += resolvedFileName

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.rest.kubernetes
1818

1919
import java.io.File
2020
import java.net.URI
21+
import java.nio.file.Paths
2122
import java.util.concurrent.CountDownLatch
2223
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
2324

@@ -27,7 +28,7 @@ import org.apache.commons.codec.binary.Base64
2728
import scala.collection.mutable
2829
import scala.collection.mutable.ArrayBuffer
2930

30-
import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
31+
import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
3132
import org.apache.spark.deploy.SparkHadoopUtil
3233
import org.apache.spark.deploy.rest._
3334
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
@@ -149,37 +150,42 @@ private[spark] class KubernetesSparkRestServer(
149150
appArgs,
150151
sparkProperties,
151152
secret,
152-
uploadedJars) =>
153+
uploadedJars,
154+
uploadedFiles) =>
153155
val decodedSecret = Base64.decodeBase64(secret)
154156
if (!expectedApplicationSecret.sameElements(decodedSecret)) {
155157
responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
156158
handleError("Unauthorized to submit application.")
157159
} else {
158160
val tempDir = Utils.createTempDir()
159161
val appResourcePath = resolvedAppResource(appResource, tempDir)
160-
val jarsDirectory = new File(tempDir, "jars")
161-
if (!jarsDirectory.mkdir) {
162-
throw new IllegalStateException("Failed to create jars dir at" +
163-
s"${jarsDirectory.getAbsolutePath}")
164-
}
165-
val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory)
166-
val driverExtraClasspath = sparkProperties
167-
.get("spark.driver.extraClassPath")
168-
.map(_.split(","))
169-
.getOrElse(Array.empty[String])
162+
val writtenJars = writeUploadedJars(uploadedJars, tempDir)
163+
val writtenFiles = writeUploadedFiles(uploadedFiles)
164+
val resolvedSparkProperties = new mutable.HashMap[String, String]
165+
resolvedSparkProperties ++= sparkProperties
166+
167+
// Resolve driver classpath and jars
170168
val originalJars = sparkProperties.get("spark.jars")
171169
.map(_.split(","))
172170
.getOrElse(Array.empty[String])
173171
val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath)
174172
val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
173+
val driverExtraClasspath = sparkProperties
174+
.get("spark.driver.extraClassPath")
175+
.map(_.split(","))
176+
.getOrElse(Array.empty[String])
175177
val driverClasspath = driverExtraClasspath ++
176178
resolvedJars ++
177-
sparkJars ++
178-
Array(appResourcePath)
179-
val resolvedSparkProperties = new mutable.HashMap[String, String]
180-
resolvedSparkProperties ++= sparkProperties
179+
sparkJars
181180
resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")
182181

182+
// Resolve spark.files
183+
val originalFiles = sparkProperties.get("spark.files")
184+
.map(_.split(","))
185+
.getOrElse(Array.empty[String])
186+
val resolvedFiles = originalFiles ++ writtenFiles
187+
resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",")
188+
183189
val command = new ArrayBuffer[String]
184190
command += javaExecutable
185191
command += "-cp"
@@ -229,6 +235,21 @@ private[spark] class KubernetesSparkRestServer(
229235
}
230236
}
231237

238+
private def writeUploadedJars(files: Option[TarGzippedData], rootTempDir: File):
239+
Seq[String] = {
240+
val resolvedDirectory = new File(rootTempDir, "jars")
241+
if (!resolvedDirectory.mkdir()) {
242+
throw new IllegalStateException(s"Failed to create jars dir at " +
243+
resolvedDirectory.getAbsolutePath)
244+
}
245+
writeBase64ContentsToFiles(files, resolvedDirectory)
246+
}
247+
248+
private def writeUploadedFiles(files: Option[TarGzippedData]): Seq[String] = {
249+
val workingDir = Paths.get("").toFile.getAbsoluteFile
250+
writeBase64ContentsToFiles(files, workingDir)
251+
}
252+
232253
def resolvedAppResource(appResource: AppResource, tempDir: File): String = {
233254
val appResourcePath = appResource match {
234255
case UploadedAppResource(resourceContentsBase64, resourceName) =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.integrationtest.jobs
18+
19+
import java.nio.file.Paths
20+
21+
import com.google.common.base.Charsets
22+
import com.google.common.io.Files
23+
24+
import org.apache.spark.SparkException
25+
import org.apache.spark.sql.SparkSession
26+
27+
private[spark] object FileExistenceTest {
28+
29+
def main(args: Array[String]): Unit = {
30+
if (args.length < 2) {
31+
throw new IllegalArgumentException("Usage: WordCount <source-file> <expected contents>")
32+
}
33+
// Can't use SparkContext.textFile since the file is local to the driver
34+
val file = Paths.get(args(0)).toFile
35+
if (!file.exists()) {
36+
throw new SparkException(s"Failed to find file at ${file.getAbsolutePath}")
37+
} else {
38+
// scalastyle:off println
39+
val contents = Files.toString(file, Charsets.UTF_8)
40+
if (args(1) != contents) {
41+
throw new SparkException(s"Contents do not match. Expected: ${args(1)}," +
42+
s" actual, $contents")
43+
} else {
44+
println(s"File found at ${file.getAbsolutePath} with correct contents.")
45+
}
46+
// scalastyle:on println
47+
}
48+
val spark = SparkSession.builder()
49+
.appName("Test")
50+
.getOrCreate()
51+
spark.stop()
52+
}
53+
54+
}

0 commit comments

Comments
 (0)