Skip to content

Commit bfbab28

Browse files
committed
Merge branch 'master' of github.com:apache/spark into configure-ports
Conflicts: docs/spark-standalone.md
2 parents de1b207 + 1c5555a commit bfbab28

File tree

24 files changed

+1108
-193
lines changed

24 files changed

+1108
-193
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,19 @@ import org.apache.spark.deploy.SparkHadoopUtil
4141
* secure the UI if it has data that other users should not be allowed to see. The javax
4242
* servlet filter specified by the user can authenticate the user and then once the user
4343
* is logged in, Spark can compare that user versus the view acls to make sure they are
44-
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
44+
* authorized to view the UI. The configs 'spark.acls.enable' and 'spark.ui.view.acls'
4545
* control the behavior of the acls. Note that the person who started the application
4646
* always has view access to the UI.
4747
*
48+
* Spark has a set of modify acls (`spark.modify.acls`) that controls which users have permission
49+
* to modify a single application. This would include things like killing the application. By
50+
* default the person who started the application has modify access. For modify access through
51+
* the UI, you must have a filter that does authentication in place for the modify acls to work
52+
* properly.
53+
*
54+
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
55+
* who always have permission to view or modify the Spark application.
56+
*
4857
* Spark does not currently support encryption after authentication.
4958
*
5059
* At this point spark has multiple communication protocols that need to be secured and
@@ -137,18 +146,32 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
137146
private val sparkSecretLookupKey = "sparkCookie"
138147

139148
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
140-
private var uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
149+
// keep spark.ui.acls.enable for backwards compatibility with 1.0
150+
private var aclsOn = sparkConf.getOption("spark.acls.enable").getOrElse(
151+
sparkConf.get("spark.ui.acls.enable", "false")).toBoolean
152+
153+
// admin acls should be set before view or modify acls
154+
private var adminAcls: Set[String] =
155+
stringToSet(sparkConf.get("spark.admin.acls", ""))
141156

142157
private var viewAcls: Set[String] = _
158+
159+
// list of users who have permission to modify the application. This should
160+
// apply to both UI and CLI for things like killing the application.
161+
private var modifyAcls: Set[String] = _
162+
143163
// always add the current user and SPARK_USER to the viewAcls
144-
private val defaultAclUsers = Seq[String](System.getProperty("user.name", ""),
164+
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
145165
Option(System.getenv("SPARK_USER")).getOrElse(""))
166+
146167
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
168+
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
147169

148170
private val secretKey = generateSecretKey()
149171
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
150-
"; ui acls " + (if (uiAclsOn) "enabled" else "disabled") +
151-
"; users with view permissions: " + viewAcls.toString())
172+
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
173+
"; users with view permissions: " + viewAcls.toString() +
174+
"; users with modify permissions: " + modifyAcls.toString())
152175

153176
// Set our own authenticator to properly negotiate user/password for HTTP connections.
154177
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
@@ -169,18 +192,51 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
169192
)
170193
}
171194

172-
private[spark] def setViewAcls(defaultUsers: Seq[String], allowedUsers: String) {
173-
viewAcls = (defaultUsers ++ allowedUsers.split(',')).map(_.trim()).filter(!_.isEmpty).toSet
195+
/**
196+
* Split a comma separated String, filter out any empty items, and return a Set of strings
197+
*/
198+
private def stringToSet(list: String): Set[String] = {
199+
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
200+
}
201+
202+
/**
203+
* Admin acls should be set before the view or modify acls. If you modify the admin
204+
* acls you should also set the view and modify acls again to pick up the changes.
205+
*/
206+
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
207+
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
174208
logInfo("Changing view acls to: " + viewAcls.mkString(","))
175209
}
176210

177-
private[spark] def setViewAcls(defaultUser: String, allowedUsers: String) {
178-
setViewAcls(Seq[String](defaultUser), allowedUsers)
211+
def setViewAcls(defaultUser: String, allowedUsers: String) {
212+
setViewAcls(Set[String](defaultUser), allowedUsers)
213+
}
214+
215+
def getViewAcls: String = viewAcls.mkString(",")
216+
217+
/**
218+
* Admin acls should be set before the view or modify acls. If you modify the admin
219+
* acls you should also set the view and modify acls again to pick up the changes.
220+
*/
221+
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
222+
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
223+
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
224+
}
225+
226+
def getModifyAcls: String = modifyAcls.mkString(",")
227+
228+
/**
229+
* Admin acls should be set before the view or modify acls. If you modify the admin
230+
* acls you should also set the view and modify acls again to pick up the changes.
231+
*/
232+
def setAdminAcls(adminUsers: String) {
233+
adminAcls = stringToSet(adminUsers)
234+
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
179235
}
180236

181-
private[spark] def setUIAcls(aclSetting: Boolean) {
182-
uiAclsOn = aclSetting
183-
logInfo("Changing acls enabled to: " + uiAclsOn)
237+
def setAcls(aclSetting: Boolean) {
238+
aclsOn = aclSetting
239+
logInfo("Changing acls enabled to: " + aclsOn)
184240
}
185241

186242
/**
@@ -224,22 +280,39 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
224280
* Check to see if Acls for the UI are enabled
225281
* @return true if UI authentication is enabled, otherwise false
226282
*/
227-
def uiAclsEnabled(): Boolean = uiAclsOn
283+
def aclsEnabled(): Boolean = aclsOn
228284

229285
/**
230286
* Checks the given user against the view acl list to see if they have
231-
* authorization to view the UI. If the UI acls must are disabled
232-
* via spark.ui.acls.enable, all users have view access.
287+
* authorization to view the UI. If the UI acls are disabled
288+
* via spark.acls.enable, all users have view access. If the user is null
289+
* it is assumed authentication is off and all users have access.
233290
*
234291
* @param user to see if is authorized
235292
* @return true is the user has permission, otherwise false
236293
*/
237294
def checkUIViewPermissions(user: String): Boolean = {
238-
logDebug("user=" + user + " uiAclsEnabled=" + uiAclsEnabled() + " viewAcls=" +
295+
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
239296
viewAcls.mkString(","))
240-
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
297+
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
241298
}
242299

300+
/**
301+
* Checks the given user against the modify acl list to see if they have
302+
* authorization to modify the application. If the UI acls are disabled
303+
* via spark.acls.enable, all users have modify access. If the user is null
304+
* it is assumed authentication isn't turned on and all users have access.
305+
*
306+
* @param user to see if is authorized
307+
* @return true is the user has permission, otherwise false
308+
*/
309+
def checkModifyPermissions(user: String): Boolean = {
310+
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
311+
modifyAcls.mkString(","))
312+
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
313+
}
314+
315+
243316
/**
244317
* Check to see if authentication for the Spark communication protocols is enabled
245318
* @return true if authentication is enabled, otherwise false

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
238238
}
239239
}
240240

241+
// Validate memory fractions
242+
val memoryKeys = Seq(
243+
"spark.storage.memoryFraction",
244+
"spark.shuffle.memoryFraction",
245+
"spark.shuffle.safetyFraction",
246+
"spark.storage.unrollFraction",
247+
"spark.storage.safetyFraction")
248+
for (key <- memoryKeys) {
249+
val value = getDouble(key, 0.5)
250+
if (value > 1 || value < 0) {
251+
throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
252+
}
253+
}
254+
241255
// Check for legacy configs
242256
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
243257
val warning =

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.ConnectionManager
3535
import org.apache.spark.scheduler.LiveListenerBus
3636
import org.apache.spark.serializer.Serializer
37-
import org.apache.spark.shuffle.ShuffleManager
37+
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
3838
import org.apache.spark.storage._
3939
import org.apache.spark.util.{AkkaUtils, Utils}
4040

@@ -65,12 +65,9 @@ class SparkEnv (
6565
val httpFileServer: HttpFileServer,
6666
val sparkFilesDir: String,
6767
val metricsSystem: MetricsSystem,
68+
val shuffleMemoryManager: ShuffleMemoryManager,
6869
val conf: SparkConf) extends Logging {
6970

70-
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
71-
// All accesses should be manually synchronized
72-
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
73-
7471
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
7572

7673
// A general, soft-reference map for metadata needed during HadoopRDD split computation
@@ -252,6 +249,8 @@ object SparkEnv extends Logging {
252249
val shuffleManager = instantiateClass[ShuffleManager](
253250
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
254251

252+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
253+
255254
// Warn about deprecated spark.cache.class property
256255
if (conf.contains("spark.cache.class")) {
257256
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
@@ -273,6 +272,7 @@ object SparkEnv extends Logging {
273272
httpFileServer,
274273
sparkFilesDir,
275274
metricsSystem,
275+
shuffleMemoryManager,
276276
conf)
277277
}
278278

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
189189

190190
if (ui != null) {
191191
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
192-
ui.getSecurityManager.setUIAcls(uiAclsEnabled)
192+
ui.getSecurityManager.setAcls(uiAclsEnabled)
193+
// make sure to set admin acls before view acls so properly picked up
194+
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
193195
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
194196
}
195197
(appInfo, ui)

core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
3838
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
3939
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
4040
}
41-
if (conf.contains("master.ui.port")) {
42-
webUiPort = conf.get("master.ui.port").toInt
41+
if (conf.contains("spark.master.ui.port")) {
42+
webUiPort = conf.get("spark.master.ui.port").toInt
4343
}
4444

4545
parse(args.toList)

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ private[spark] object WorkerWebUI {
5959
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
6060

6161
def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
62-
requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
62+
requestedPort.getOrElse(conf.getInt("spark.worker.ui.port", WorkerWebUI.DEFAULT_PORT))
6363
}
6464
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,7 @@ private[spark] class Executor(
277277
}
278278
} finally {
279279
// Release memory used by this thread for shuffles
280-
val shuffleMemoryMap = env.shuffleMemoryMap
281-
shuffleMemoryMap.synchronized {
282-
shuffleMemoryMap.remove(Thread.currentThread().getId)
283-
}
280+
env.shuffleMemoryManager.releaseMemoryForThisThread()
284281
// Release memory used by this thread for unrolling blocks
285282
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
286283
runningTasks.remove(taskId)

core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
2929
var startTime = -1L
3030
var endTime = -1L
3131
var viewAcls = ""
32-
var enableViewAcls = false
32+
var adminAcls = ""
3333

3434
def applicationStarted = startTime != -1
3535

@@ -55,7 +55,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
5555
val environmentDetails = environmentUpdate.environmentDetails
5656
val allProperties = environmentDetails("Spark Properties").toMap
5757
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
58-
enableViewAcls = allProperties.getOrElse("spark.ui.acls.enable", "false").toBoolean
58+
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
5959
}
6060
}
6161
}

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ class KryoSerializer(conf: SparkConf)
4747
with Logging
4848
with Serializable {
4949

50-
private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
50+
private val bufferSize =
51+
(conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) * 1024 * 1024).toInt
52+
5153
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
5254
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
5355
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)

0 commit comments

Comments
 (0)