Skip to content

Commit 2364340

Browse files
aarondavpwendell
authored andcommitted
[SPARK-2938] Support SASL authentication in NettyBlockTransferService
Also lays the groundwork for supporting it inside the external shuffle service. Author: Aaron Davidson <[email protected]> Closes #3087 from aarondav/sasl and squashes the following commits: 3481718 [Aaron Davidson] Delete rogue println 44f8410 [Aaron Davidson] Delete documentation - muahaha! eb9f065 [Aaron Davidson] Improve documentation and add end-to-end test at Spark-level a6b95f1 [Aaron Davidson] Address comments 785bbde [Aaron Davidson] Cleanup 79973cb [Aaron Davidson] Remove unused file 151b3c5 [Aaron Davidson] Add docs, timeout config, better failure handling f6177d7 [Aaron Davidson] Cleanup SASL state upon connection termination 7b42adb [Aaron Davidson] Add unit tests 8191bcb [Aaron Davidson] [SPARK-2938] Support SASL authentication in NettyBlockTransferService
1 parent 9cba88c commit 2364340

37 files changed

+1257
-392
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
2222
import org.apache.hadoop.io.Text
2323

2424
import org.apache.spark.deploy.SparkHadoopUtil
25+
import org.apache.spark.network.sasl.SecretKeyHolder
2526

2627
/**
2728
* Spark class responsible for security.
@@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
8485
* Authenticator installed in the SecurityManager to how it does the authentication
8586
* and in this case gets the user name and password from the request.
8687
*
87-
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
88+
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
8889
* exchange messages. For this we use the Java SASL
8990
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
9091
* as the authentication mechanism. This means the shared secret is not passed
@@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
9899
* of protection they want. If we support those, the messages will also have to
99100
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
100101
*
101-
* Since the connectionManager does asynchronous messages passing, the SASL
102+
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
102103
* authentication is a bit more complex. A ConnectionManager can be both a client
103104
* and a Server, so for a particular connection is has to determine what to do.
104105
* A ConnectionId was added to be able to track connections and is used to
@@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
107108
* and waits for the response from the server and does the handshake before sending
108109
* the real message.
109110
*
111+
* The NettyBlockTransferService ensures that SASL authentication is performed
112+
* synchronously prior to any other communication on a connection. This is done in
113+
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
114+
*
110115
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
111116
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
112117
* properly. For non-Yarn deployments, users can write a filter to go through a
@@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
139144
* can take place.
140145
*/
141146

142-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
147+
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
143148

144149
// key used to store the spark secret in the Hadoop UGI
145150
private val sparkSecretLookupKey = "sparkCookie"
@@ -337,4 +342,16 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
337342
* @return the secret key as a String if authentication is enabled, otherwise returns null
338343
*/
339344
def getSecretKey(): String = secretKey
345+
346+
override def getSaslUser(appId: String): String = {
347+
val myAppId = sparkConf.getAppId
348+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
349+
getSaslUser()
350+
}
351+
352+
override def getSecretKey(appId: String): String = {
353+
val myAppId = sparkConf.getAppId
354+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
355+
getSecretKey()
356+
}
340357
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
217217
*/
218218
getAll.filter { case (k, _) => isAkkaConf(k) }
219219

220+
/**
221+
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
222+
* from the start in the Executor.
223+
*/
224+
def getAppId: String = get("spark.app.id")
225+
220226
/** Does the configuration contain a given parameter? */
221227
def contains(key: String): Boolean = settings.contains(key)
222228

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
313313
val applicationId: String = taskScheduler.applicationId()
314314
conf.set("spark.app.id", applicationId)
315315

316+
env.blockManager.initialize(applicationId)
317+
316318
val metricsSystem = env.metricsSystem
317319

318320
// The metrics system for Driver need to be set spark.app.id to app ID.

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ object SparkEnv extends Logging {
276276
val blockTransferService =
277277
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278278
case "netty" =>
279-
new NettyBlockTransferService(conf)
279+
new NettyBlockTransferService(conf, securityManager)
280280
case "nio" =>
281281
new NioBlockTransferService(conf, securityManager)
282282
}
@@ -285,6 +285,7 @@ object SparkEnv extends Logging {
285285
"BlockManagerMaster",
286286
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
287287

288+
// NB: blockManager is not valid until initialize() is called later.
288289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
289290
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290291

core/src/main/scala/org/apache/spark/SparkSaslClient.scala

Lines changed: 0 additions & 147 deletions
This file was deleted.

0 commit comments

Comments
 (0)