Skip to content

Commit fd24f23

Browse files
mccheahash211
authored andcommitted
Allow custom annotations on the driver pod. (apache#163)
1 parent e7f78cb commit fd24f23

File tree

4 files changed

+69
-27
lines changed

4 files changed

+69
-27
lines changed

docs/running-on-kubernetes.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ from the other deployment modes. See the [configuration page](configuration.html
187187
for bookkeeping purposes.
188188
</td>
189189
</tr>
190+
<tr>
191+
<td><code>spark.kubernetes.driver.annotations</code></td>
192+
<td>(none)</td>
193+
<td>
194+
Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value
195+
pairs, where each annotation is in the format <code>key=value</code>.
196+
</td>
197+
</tr>
190198
<tr>
191199
<td><code>spark.kubernetes.driverSubmitTimeout</code></td>
192200
<td>60s</td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package org.apache.spark.deploy.kubernetes
1818

1919
import java.io.File
2020
import java.security.SecureRandom
21-
import java.util
2221
import java.util.concurrent.{CountDownLatch, TimeUnit}
2322

2423
import com.google.common.io.Files
@@ -73,6 +72,7 @@ private[spark] class Client(
7372

7473
private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
7574
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
75+
private val customAnnotations = sparkConf.get(KUBERNETES_DRIVER_ANNOTATIONS)
7676

7777
private val kubernetesResourceCleaner = new KubernetesResourceCleaner
7878

@@ -90,7 +90,18 @@ private[spark] class Client(
9090
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
9191
s" is a directory.")
9292
}
93-
val parsedCustomLabels = parseCustomLabels(customLabels)
93+
val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key,
94+
"labels")
95+
parsedCustomLabels.keys.foreach { key =>
96+
require(key != SPARK_APP_ID_LABEL, "Label with key" +
97+
s" $SPARK_APP_ID_LABEL cannot be used in" +
98+
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
99+
" internal configuration.")
100+
}
101+
val parsedCustomAnnotations = parseKeyValuePairs(
102+
customAnnotations,
103+
KUBERNETES_DRIVER_ANNOTATIONS.key,
104+
"annotations")
94105
var k8ConfBuilder = new K8SConfigBuilder()
95106
.withApiVersion("v1")
96107
.withMasterUrl(master)
@@ -134,6 +145,7 @@ private[spark] class Client(
134145
val (driverPod, driverService) = launchDriverKubernetesComponents(
135146
kubernetesClient,
136147
parsedCustomLabels,
148+
parsedCustomAnnotations,
137149
submitServerSecret,
138150
sslConfiguration)
139151
configureOwnerReferences(
@@ -215,14 +227,15 @@ private[spark] class Client(
215227

216228
private def launchDriverKubernetesComponents(
217229
kubernetesClient: KubernetesClient,
218-
parsedCustomLabels: Map[String, String],
230+
customLabels: Map[String, String],
231+
customAnnotations: Map[String, String],
219232
submitServerSecret: Secret,
220233
sslConfiguration: SslConfiguration): (Pod, Service) = {
221234
val driverKubernetesSelectors = (Map(
222235
SPARK_DRIVER_LABEL -> kubernetesAppId,
223236
SPARK_APP_ID_LABEL -> kubernetesAppId,
224237
SPARK_APP_NAME_LABEL -> appName)
225-
++ parsedCustomLabels).asJava
238+
++ customLabels)
226239
val endpointsReadyFuture = SettableFuture.create[Endpoints]
227240
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
228241
val serviceReadyFuture = SettableFuture.create[Service]
@@ -249,6 +262,7 @@ private[spark] class Client(
249262
val driverPod = createDriverPod(
250263
kubernetesClient,
251264
driverKubernetesSelectors,
265+
customAnnotations,
252266
submitServerSecret,
253267
sslConfiguration)
254268
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
@@ -342,7 +356,7 @@ private[spark] class Client(
342356

343357
private def createDriverService(
344358
kubernetesClient: KubernetesClient,
345-
driverKubernetesSelectors: java.util.Map[String, String],
359+
driverKubernetesSelectors: Map[String, String],
346360
submitServerSecret: Secret): Service = {
347361
val driverSubmissionServicePort = new ServicePortBuilder()
348362
.withName(SUBMISSION_SERVER_PORT_NAME)
@@ -352,19 +366,20 @@ private[spark] class Client(
352366
kubernetesClient.services().createNew()
353367
.withNewMetadata()
354368
.withName(kubernetesAppId)
355-
.withLabels(driverKubernetesSelectors)
369+
.withLabels(driverKubernetesSelectors.asJava)
356370
.endMetadata()
357371
.withNewSpec()
358372
.withType("NodePort")
359-
.withSelector(driverKubernetesSelectors)
373+
.withSelector(driverKubernetesSelectors.asJava)
360374
.withPorts(driverSubmissionServicePort)
361375
.endSpec()
362376
.done()
363377
}
364378

365379
private def createDriverPod(
366380
kubernetesClient: KubernetesClient,
367-
driverKubernetesSelectors: util.Map[String, String],
381+
driverKubernetesSelectors: Map[String, String],
382+
customAnnotations: Map[String, String],
368383
submitServerSecret: Secret,
369384
sslConfiguration: SslConfiguration): Pod = {
370385
val containerPorts = buildContainerPorts()
@@ -376,7 +391,8 @@ private[spark] class Client(
376391
kubernetesClient.pods().createNew()
377392
.withNewMetadata()
378393
.withName(kubernetesAppId)
379-
.withLabels(driverKubernetesSelectors)
394+
.withLabels(driverKubernetesSelectors.asJava)
395+
.withAnnotations(customAnnotations.asJava)
380396
.endMetadata()
381397
.withNewSpec()
382398
.withRestartPolicy("Never")
@@ -601,20 +617,19 @@ private[spark] class Client(
601617
connectTimeoutMillis = 5000)
602618
}
603619

604-
private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
605-
maybeLabels.map(labels => {
606-
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
607-
label.split("=", 2).toSeq match {
620+
private def parseKeyValuePairs(
621+
maybeKeyValues: Option[String],
622+
configKey: String,
623+
keyValueType: String): Map[String, String] = {
624+
maybeKeyValues.map(keyValues => {
625+
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
626+
keyValue.split("=", 2).toSeq match {
608627
case Seq(k, v) =>
609-
require(k != SPARK_APP_ID_LABEL, "Label with key" +
610-
s" $SPARK_APP_ID_LABEL cannot be used in" +
611-
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
612-
" internal configuration.")
613628
(k, v)
614629
case _ =>
615-
throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
616-
" must be a comma-separated list of key-value pairs, with format <key>=<value>." +
617-
s" Got label: $label. All labels: $labels")
630+
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
631+
s" comma-separated list of key-value pairs, with format <key>=<value>." +
632+
s" Got value: $keyValue. All values: $keyValues")
618633
}
619634
}).toMap
620635
}).getOrElse(Map.empty[String, String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,16 @@ package object config {
118118
.stringConf
119119
.createOptional
120120

121+
private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
122+
ConfigBuilder("spark.kubernetes.driver.annotations")
123+
.doc("""
124+
| Custom annotations that will be added to the driver pod.
125+
| This should be a comma-separated list of annotation key-value
126+
| pairs, where each annotation is in the format key=value.
127+
""".stripMargin)
128+
.stringConf
129+
.createOptional
130+
121131
private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
122132
ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
123133
.doc("""

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
229229
expectationsForStaticAllocation(sparkMetricsService)
230230
}
231231

232-
test("Run with custom labels") {
232+
test("Run with custom labels and annotations") {
233233
val args = Array(
234234
"--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
235235
"--deploy-mode", "cluster",
@@ -246,26 +246,35 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
246246
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
247247
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
248248
"--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value",
249+
"--conf", "spark.kubernetes.driver.annotations=" +
250+
"annotation1=annotation1value," +
251+
"annotation2=annotation2value",
249252
"--conf", "spark.kubernetes.submit.waitAppCompletion=false",
250253
EXAMPLES_JAR_FILE.getAbsolutePath)
251254
SparkSubmit.main(args)
252-
val driverPodLabels = minikubeKubernetesClient
255+
val driverPodMetadata = minikubeKubernetesClient
253256
.pods
254257
.withLabel("spark-app-name", "spark-pi")
255258
.list()
256259
.getItems
257260
.get(0)
258261
.getMetadata
259-
.getLabels
262+
val driverPodLabels = driverPodMetadata.getLabels
260263
// We can't match all of the selectors directly since one of the selectors is based on the
261264
// launch time.
262-
assert(driverPodLabels.size == 5, "Unexpected number of pod labels.")
263-
assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" +
265+
assert(driverPodLabels.size === 5, "Unexpected number of pod labels.")
266+
assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" +
264267
" spark-app-name label.")
265268
assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" +
266269
" spark-app-id label (should be prefixed with the app name).")
267-
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
268-
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
270+
assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1")
271+
assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2")
272+
val driverPodAnnotations = driverPodMetadata.getAnnotations
273+
assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.")
274+
assert(driverPodAnnotations.get("annotation1") === "annotation1value",
275+
"Unexpected value for annotation1")
276+
assert(driverPodAnnotations.get("annotation2") === "annotation2value",
277+
"Unexpected value for annotation2")
269278
}
270279

271280
test("Enable SSL on the driver submit server") {

0 commit comments

Comments
 (0)