Skip to content

Commit 3f56e78

Browse files
committed
[SPARK-26304][SS] Add default value to spark.kafka.sasl.kerberos.service.name parameter
1 parent 169d9ad commit 3f56e78

File tree

5 files changed

+4
-49
lines changed

5 files changed

+4
-49
lines changed

core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,11 @@ private[spark] object KafkaTokenUtil extends Logging {
143143
}
144144

145145
private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
146-
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
147-
require(serviceName.nonEmpty, "Kerberos service name must be defined")
148-
149146
val params =
150147
s"""
151148
|${getKrb5LoginModuleName} required
152149
| useKeyTab=true
153-
| serviceName="${serviceName.get}"
150+
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
154151
| keyTab="${sparkConf.get(KEYTAB).get}"
155152
| principal="${sparkConf.get(PRINCIPAL).get}";
156153
""".stripMargin.replace("\n", "")
@@ -166,7 +163,7 @@ private[spark] object KafkaTokenUtil extends Logging {
166163
s"""
167164
|${getKrb5LoginModuleName} required
168165
| useTicketCache=true
169-
| serviceName="${serviceName.get}";
166+
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
170167
""".stripMargin.replace("\n", "")
171168
logDebug(s"Krb ticket cache JAAS params: $params")
172169
params

core/src/main/scala/org/apache/spark/internal/config/Kafka.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[spark] object Kafka {
4040
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
4141
"documentation. Only used to obtain delegation token.")
4242
.stringConf
43-
.createOptional
43+
.createWithDefault("kafka")
4444

4545
val TRUSTSTORE_LOCATION =
4646
ConfigBuilder("spark.kafka.ssl.truststore.location")

core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
3636
private val keyStorePassword = "keyStoreSecret"
3737
private val keyPassword = "keySecret"
3838
private val keytab = "/path/to/keytab"
39-
private val kerberosServiceName = "kafka"
4039
private val principal = "[email protected]"
4140

4241
private var sparkConf: SparkConf = null
@@ -96,7 +95,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
9695
sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
9796
sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
9897
sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
99-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
10098

10199
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
102100

@@ -119,7 +117,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
119117
sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
120118
sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
121119
sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
122-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
123120

124121
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
125122

@@ -143,7 +140,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
143140
sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
144141
sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
145142
sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
146-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
147143

148144
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
149145

@@ -177,7 +173,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
177173
sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
178174
sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
179175
sparkConf.set(KEYTAB, keytab)
180-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
181176
sparkConf.set(PRINCIPAL, principal)
182177

183178
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
@@ -195,7 +190,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
195190
test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") {
196191
sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
197192
sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
198-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
199193

200194
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
201195

@@ -218,22 +212,4 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
218212

219213
assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
220214
}
221-
222-
test("getKeytabJaasParams with keytab no service should throw exception") {
223-
sparkConf.set(KEYTAB, keytab)
224-
225-
val thrown = intercept[IllegalArgumentException] {
226-
KafkaTokenUtil.getKeytabJaasParams(sparkConf)
227-
}
228-
229-
assert(thrown.getMessage contains "Kerberos service name must be defined")
230-
}
231-
232-
test("getTicketCacheJaasParams without service should throw exception") {
233-
val thrown = intercept[IllegalArgumentException] {
234-
KafkaTokenUtil.getTicketCacheJaasParams(sparkConf)
235-
}
236-
237-
assert(thrown.getMessage contains "Kerberos service name must be defined")
238-
}
239215
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import org.apache.hadoop.security.UserGroupInformation
21-
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
2221
import org.apache.kafka.common.security.scram.ScramLoginModule
2322

2423
import org.apache.spark.SparkConf
@@ -35,8 +34,6 @@ private[kafka010] object KafkaSecurityHelper extends Logging {
3534
def getTokenJaasParams(sparkConf: SparkConf): String = {
3635
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
3736
KafkaTokenUtil.TOKEN_SERVICE)
38-
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
39-
require(serviceName.isDefined, "Kerberos service name must be defined")
4037
val username = new String(token.getIdentifier)
4138
val password = new String(token.getPassword)
4239

@@ -45,7 +42,7 @@ private[kafka010] object KafkaSecurityHelper extends Logging {
4542
s"""
4643
|$loginModuleName required
4744
| tokenauth=true
48-
| serviceName="${serviceName.get}"
45+
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
4946
| username="$username"
5047
| password="$password";
5148
""".stripMargin.replace("\n", "")

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@ import org.scalatest.BeforeAndAfterEach
2626
import org.apache.spark.{SparkConf, SparkFunSuite}
2727
import org.apache.spark.deploy.security.KafkaTokenUtil
2828
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
29-
import org.apache.spark.internal.config._
3029

3130
class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
32-
private val keytab = "/path/to/keytab"
33-
private val kerberosServiceName = "kafka"
34-
private val principal = "[email protected]"
3531
private val tokenId = "tokenId" + UUID.randomUUID().toString
3632
private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
3733

@@ -76,19 +72,8 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
7672
assert(KafkaSecurityHelper.isTokenAvailable())
7773
}
7874

79-
test("getTokenJaasParams with token no service should throw exception") {
80-
addTokenToUGI()
81-
82-
val thrown = intercept[IllegalArgumentException] {
83-
KafkaSecurityHelper.getTokenJaasParams(sparkConf)
84-
}
85-
86-
assert(thrown.getMessage contains "Kerberos service name must be defined")
87-
}
88-
8975
test("getTokenJaasParams with token should return scram module") {
9076
addTokenToUGI()
91-
sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
9277

9378
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
9479

0 commit comments

Comments
 (0)