Skip to content

Commit 15a333c

Browse files
authored
Do not add the MountSmallLocalFilesStep when there's no submitter local files (apache#557)
1 parent 5fd1304 commit 15a333c

File tree

4 files changed

+51
-21
lines changed

4 files changed

+51
-21
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,27 @@ private[spark] class DriverConfigurationStepsOrchestrator(
131131
submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ =>
132132
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
133133
}.getOrElse {
134-
// Else - use a small files bootstrap that submits the local files via a secret.
135-
// Then, indicate to the outer block that the init-container should not handle
136-
// those local files simply by filtering them out.
137-
val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
138-
val smallFilesSecretName = s"$kubernetesAppId-submitted-files"
139-
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
134+
// Otherwise, if there are any submitter local files, use a small files bootstrap that
135+
// submits the local files via a secret. If this is the case, indicate to the outer
136+
// block that the init-container should not handle those local files simply by filtering
137+
// them out.
138+
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
139+
if (submitterLocalFiles.nonEmpty) {
140+
val nonSubmitterLocalFiles = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
141+
val smallFilesSecretName = s"$kubernetesAppId-submitted-files"
142+
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
140143
smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)
141-
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
142-
sparkFiles,
143-
smallFilesSecretName,
144-
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
145-
mountSmallFilesBootstrap)
146-
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
147-
sparkFilesWithoutLocal.toArray,
148-
Some(mountSmallLocalFilesStep))
144+
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
145+
submitterLocalFiles.toSeq,
146+
smallFilesSecretName,
147+
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
148+
mountSmallFilesBootstrap)
149+
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
150+
nonSubmitterLocalFiles.toArray,
151+
Some(mountSmallLocalFilesStep))
152+
} else {
153+
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
154+
}
149155
}
150156

151157
val initContainerBootstrapStep =
@@ -169,6 +175,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
169175
initContainerConfigMapName,
170176
INIT_CONTAINER_CONFIG_MAP_KEY))
171177
} else Option.empty[DriverConfigurationStep]
178+
172179
(submittedLocalFilesDownloadPath,
173180
mountSmallFilesWithoutInitContainerStep.toSeq ++
174181
initContainerBootstrapStep.toSeq)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStep.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSmallFilesB
2727
import org.apache.spark.util.Utils
2828

2929
private[spark] class MountSmallLocalFilesStep(
30-
sparkFiles: Seq[String],
30+
submitterLocalFiles: Seq[String],
3131
smallFilesSecretName: String,
3232
smallFilesSecretMountPath: String,
3333
mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep {
3434

3535
import MountSmallLocalFilesStep._
3636
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
37-
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
38-
.map(localFileUri => new File(Utils.resolveURI(localFileUri).getPath))
37+
val localFiles = submitterLocalFiles.map { localFileUri =>
38+
new File(Utils.resolveURI(localFileUri).getPath)
39+
}
3940
val totalSizeBytes = localFiles.map(_.length()).sum
4041
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
4142
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
128128
}
129129

130130

131-
test("Only local files without a resource staging server.") {
131+
test("Only submitter local files without a resource staging server.") {
132132
val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt")
133133
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
134134
val orchestrator = new DriverConfigurationStepsOrchestrator(
@@ -151,6 +151,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
151151
classOf[MountSmallLocalFilesStep])
152152
}
153153

154+
test("No submitter local files without a resource staging server") {
155+
val sparkConf = new SparkConf(false).set(
156+
"spark.files", "hdfs://localhost:9000/var/foo.txt,https://localhost:8080/var/bar.txt")
157+
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
158+
val orchestrator = new DriverConfigurationStepsOrchestrator(
159+
NAMESPACE,
160+
APP_ID,
161+
LAUNCH_TIME,
162+
mainAppResource,
163+
APP_NAME,
164+
MAIN_CLASS,
165+
APP_ARGS,
166+
Seq.empty[String],
167+
sparkConf)
168+
validateStepTypes(
169+
orchestrator,
170+
classOf[BaseDriverConfigurationStep],
171+
classOf[DriverServiceBootstrapStep],
172+
classOf[DriverKubernetesCredentialsStep],
173+
classOf[DependencyResolutionStep],
174+
classOf[LocalDirectoryMountConfigurationStep],
175+
classOf[InitContainerBootstrapStep])
176+
}
177+
154178
test("Submission steps with driver secrets to mount") {
155179
val sparkConf = new SparkConf(false)
156180
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be
3838
private val SECOND_TEMP_FILE_NAME = "file2.txt"
3939
private val FIRST_TEMP_FILE_CONTENTS = "123"
4040
private val SECOND_TEMP_FILE_CONTENTS = "456"
41-
private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt"
4241
private val SECRET_NAME = "secret"
4342

4443
private var tempFolder: File = _
@@ -61,8 +60,7 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be
6160
tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS)
6261
val sparkFiles = Seq(
6362
s"file://${firstTempFile.getAbsolutePath}",
64-
secondTempFile.getAbsolutePath,
65-
REMOTE_FILE_URI)
63+
secondTempFile.getAbsolutePath)
6664
val configurationStep = new MountSmallLocalFilesStep(
6765
sparkFiles,
6866
SECRET_NAME,

0 commit comments

Comments
 (0)