Skip to content

[SPARK-2938] Support SASL authentication in NettyBlockTransferService #3087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.sasl.SecretKeyHolder

/**
* Spark class responsible for security.
Expand Down Expand Up @@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
Expand All @@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the connectionManager does asynchronous messages passing, the SASL
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
Expand All @@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
* and waits for the response from the server and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
* synchronously prior to any other communication on a connection. This is done in
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
Expand Down Expand Up @@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* can take place.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the documentation above. Also please update docs/security.md


private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down Expand Up @@ -337,4 +342,16 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = secretKey

override def getSaslUser(appId: String): String = {
val myAppId = sparkConf.getAppId
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
getSaslUser()
}

override def getSecretKey(appId: String): String = {
val myAppId = sparkConf.getAppId
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
getSecretKey()
}
}
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*/
getAll.filter { case (k, _) => isAkkaConf(k) }

/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
*/
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

env.blockManager.initialize(applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
new NettyBlockTransferService(conf, securityManager)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
Expand All @@ -285,6 +285,7 @@ object SparkEnv extends Logging {
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)

Expand Down
147 changes: 0 additions & 147 deletions core/src/main/scala/org/apache/spark/SparkSaslClient.scala

This file was deleted.

Loading