Skip to content

Commit e3a50be

Browse files
author
Andrew Korzhuev
committed
Add config option for passing through k8s Pod.spec.imagePullSecrets
This will allow users to access images from private registries.
1 parent 6ac4fba commit e3a50be

File tree

5 files changed

+27
-7
lines changed

5 files changed

+27
-7
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ private[spark] object Config extends Logging {
5454
.checkValues(Set("Always", "Never", "IfNotPresent"))
5555
.createWithDefault("IfNotPresent")
5656

57+
val IMAGE_PULL_SECRET =
58+
ConfigBuilder("spark.kubernetes.imagePullSecret")
59+
.doc("Specifies the Kubernetes image secret used to access private image registry.")
60+
.stringConf
61+
.createOptional
62+
5763
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
5864
"spark.kubernetes.authenticate.driver"
5965
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
20-
21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
22-
20+
import io.fabric8.kubernetes.api.model._
2321
import org.apache.spark.{SparkConf, SparkException}
2422
import org.apache.spark.deploy.k8s.Config._
2523
import org.apache.spark.deploy.k8s.Constants._
@@ -51,6 +49,8 @@ private[spark] class BasicDriverConfigurationStep(
5149
.get(DRIVER_CONTAINER_IMAGE)
5250
.getOrElse(throw new SparkException("Must specify the driver container image"))
5351

52+
private val imagePullSecret = sparkConf.get(IMAGE_PULL_SECRET)
53+
5454
// CPU settings
5555
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
5656
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
@@ -132,6 +132,8 @@ private[spark] class BasicDriverConfigurationStep(
132132
case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
133133
}
134134

135+
val imagePullSecrets = imagePullSecret.map(new LocalObjectReference(_)).toList
136+
135137
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
136138
.editOrNewMetadata()
137139
.withName(driverPodName)
@@ -141,6 +143,7 @@ private[spark] class BasicDriverConfigurationStep(
141143
.withNewSpec()
142144
.withRestartPolicy("Never")
143145
.withNodeSelector(nodeSelector.asJava)
146+
.withImagePullSecrets(imagePullSecrets.asJava)
144147
.endSpec()
145148
.build()
146149

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ private[spark] class ExecutorPodFactory(
6868
.get(EXECUTOR_CONTAINER_IMAGE)
6969
.getOrElse(throw new SparkException("Must specify the executor container image"))
7070
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
71+
private val imagePullSecret = sparkConf.get(IMAGE_PULL_SECRET)
7172
private val blockManagerPort = sparkConf
7273
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
7374

@@ -98,6 +99,8 @@ private[spark] class ExecutorPodFactory(
9899
nodeToLocalTaskCount: Map[String, Int]): Pod = {
99100
val name = s"$executorPodNamePrefix-exec-$executorId"
100101

102+
val imagePullSecrets = imagePullSecret.map(new LocalObjectReference(_)).toList
103+
101104
// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
102105
// name as the hostname. This preserves uniqueness since the end of name contains
103106
// executorId
@@ -193,6 +196,7 @@ private[spark] class ExecutorPodFactory(
193196
.withHostname(hostname)
194197
.withRestartPolicy("Never")
195198
.withNodeSelector(nodeSelector.asJava)
199+
.withImagePullSecrets(imagePullSecrets.asJava)
196200
.endSpec()
197201
.build()
198202

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
20-
21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
22-
20+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, LocalObjectReference, PodBuilder}
2321
import org.apache.spark.{SparkConf, SparkFunSuite}
2422
import org.apache.spark.deploy.k8s.Config._
2523
import org.apache.spark.deploy.k8s.Constants._
@@ -51,6 +49,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
5149
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
5250
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
5351
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
52+
.set(IMAGE_PULL_SECRET, "imagePullSecret")
5453

5554
val submissionStep = new BasicDriverConfigurationStep(
5655
APP_ID,
@@ -103,7 +102,11 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
103102
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
104103
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
105104
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
106-
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
105+
106+
val driverPodSpec = preparedDriverSpec.driverPod.getSpec
107+
assert(driverPodSpec.getRestartPolicy === "Never")
108+
assert(driverPodSpec.getImagePullSecrets.size() === 1)
109+
assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret")
107110

108111
val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
109112
val expectedSparkConf = Map(

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
3333
private val driverPodUid: String = "driver-uid"
3434
private val executorPrefix: String = "base"
3535
private val executorImage: String = "executor-image"
36+
private val imagePullSecret: String = "imagePullSecret"
3637
private val driverPod = new PodBuilder()
3738
.withNewMetadata()
3839
.withName(driverPodName)
@@ -54,6 +55,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
5455
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
5556
.set(CONTAINER_IMAGE, executorImage)
5657
.set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
58+
.set(IMAGE_PULL_SECRET, imagePullSecret)
5759
}
5860

5961
test("basic executor pod has reasonable defaults") {
@@ -74,6 +76,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
7476
assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
7577
assert(executor.getSpec.getContainers.get(0).getResources
7678
.getLimits.get("memory").getAmount === "1408Mi")
79+
assert(executor.getSpec.getImagePullSecrets.size() === 1)
80+
assert(executor.getSpec.getImagePullSecrets.get(0).getName === imagePullSecret)
7781

7882
// The pod has no node selector, volumes.
7983
assert(executor.getSpec.getNodeSelector.isEmpty)

0 commit comments

Comments
 (0)