Skip to content

Commit e249846

Browse files
committed
Merge pull request #10 from apache/master
Update
2 parents d26d982 + c764d0a commit e249846

File tree

220 files changed

+6998
-1493
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

220 files changed

+6998
-1493
lines changed

LICENSE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ SUCH DAMAGE.
754754

755755

756756
========================================================================
757-
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
757+
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
758758
========================================================================
759759
Copyright (C) 2008 The Android Open Source Project
760760

@@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
771771
limitations under the License.
772772

773773

774+
========================================================================
775+
For LimitedInputStream
776+
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
777+
========================================================================
778+
Copyright (C) 2007 The Guava Authors
779+
780+
Licensed under the Apache License, Version 2.0 (the "License");
781+
you may not use this file except in compliance with the License.
782+
You may obtain a copy of the License at
783+
784+
http://www.apache.org/licenses/LICENSE-2.0
785+
786+
Unless required by applicable law or agreed to in writing, software
787+
distributed under the License is distributed on an "AS IS" BASIS,
788+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
789+
See the License for the specific language governing permissions and
790+
limitations under the License.
791+
792+
774793
========================================================================
775794
BSD-style licenses
776795
========================================================================

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ and Spark Streaming for stream processing.
1313
## Online Documentation
1414

1515
You can find the latest Spark documentation, including a programming
16-
guide, on the [project web page](http://spark.apache.org/documentation.html).
16+
guide, on the [project web page](http://spark.apache.org/documentation.html)
17+
and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).
1718
This README file only contains basic setup instructions.
1819

1920
## Building Spark

core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ $(function() {
3939
var column = "table ." + $(this).attr("name");
4040
$(column).hide();
4141
});
42+
// Stripe table rows after rows have been hidden to ensure correct striping.
43+
stripeTables();
4244

4345
$("input:checkbox").click(function() {
4446
var column = "table ." + $(this).attr("name");

core/src/main/resources/org/apache/spark/ui/static/table.js

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,3 @@ function stripeTables() {
2828
});
2929
});
3030
}
31-
32-
/* Stripe all tables after pages finish loading. */
33-
$(function() {
34-
stripeTables();
35-
});

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,20 @@ pre {
120120
border: none;
121121
}
122122

123+
.stacktrace-details {
124+
max-height: 300px;
125+
overflow-y: auto;
126+
margin: 0;
127+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
128+
}
129+
130+
.stacktrace-details.collapsed {
131+
max-height: 0;
132+
padding-top: 0;
133+
padding-bottom: 0;
134+
border: none;
135+
}
136+
123137
span.expand-additional-metrics {
124138
cursor: pointer;
125139
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
6666
// Lower and upper bounds on the number of executors. These are required.
6767
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
6868
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
69-
verifyBounds()
7069

7170
// How long there must be backlogged tasks for before an addition is triggered
7271
private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
7776
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
7877

7978
// How long an executor must be idle for before it is removed
80-
private val removeThresholdSeconds = conf.getLong(
79+
private val executorIdleTimeout = conf.getLong(
8180
"spark.dynamicAllocation.executorIdleTimeout", 600)
8281

82+
// During testing, the methods to actually kill and add executors are mocked out
83+
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
84+
85+
validateSettings()
86+
8387
// Number of executors to add in the next round
8488
private var numExecutorsToAdd = 1
8589

@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
103107
// Polling loop interval (ms)
104108
private val intervalMillis: Long = 100
105109

106-
// Whether we are testing this class. This should only be used internally.
107-
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
108-
109110
// Clock used to schedule when executors should be added and removed
110111
private var clock: Clock = new RealClock
111112

112113
/**
113-
* Verify that the lower and upper bounds on the number of executors are valid.
114+
* Verify that the settings specified through the config are valid.
114115
* If not, throw an appropriate exception.
115116
*/
116-
private def verifyBounds(): Unit = {
117+
private def validateSettings(): Unit = {
117118
if (minNumExecutors < 0 || maxNumExecutors < 0) {
118119
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
119120
}
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
124125
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
125126
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
126127
}
128+
if (schedulerBacklogTimeout <= 0) {
129+
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
130+
}
131+
if (sustainedSchedulerBacklogTimeout <= 0) {
132+
throw new SparkException(
133+
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
134+
}
135+
if (executorIdleTimeout <= 0) {
136+
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
137+
}
138+
// Require external shuffle service for dynamic allocation
139+
// Otherwise, we may lose shuffle files when killing executors
140+
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
141+
throw new SparkException("Dynamic allocation of executors requires the external " +
142+
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
143+
}
127144
}
128145

129146
/**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
254271
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
255272
if (removeRequestAcknowledged) {
256273
logInfo(s"Removing executor $executorId because it has been idle for " +
257-
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
274+
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
258275
executorsPendingToRemove.add(executorId)
259276
true
260277
} else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
329346
private def onExecutorIdle(executorId: String): Unit = synchronized {
330347
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
331348
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
332-
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
333-
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
349+
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
350+
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
334351
}
335352
}
336353

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
2222
import org.apache.hadoop.io.Text
2323

2424
import org.apache.spark.deploy.SparkHadoopUtil
25+
import org.apache.spark.network.sasl.SecretKeyHolder
2526

2627
/**
2728
* Spark class responsible for security.
@@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
8485
* Authenticator installed in the SecurityManager to how it does the authentication
8586
* and in this case gets the user name and password from the request.
8687
*
87-
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
88+
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
8889
* exchange messages. For this we use the Java SASL
8990
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
9091
* as the authentication mechanism. This means the shared secret is not passed
@@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
9899
* of protection they want. If we support those, the messages will also have to
99100
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
100101
*
101-
* Since the connectionManager does asynchronous messages passing, the SASL
102+
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
102103
* authentication is a bit more complex. A ConnectionManager can be both a client
103104
* and a Server, so for a particular connection is has to determine what to do.
104105
* A ConnectionId was added to be able to track connections and is used to
@@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
107108
* and waits for the response from the server and does the handshake before sending
108109
* the real message.
109110
*
111+
* The NettyBlockTransferService ensures that SASL authentication is performed
112+
* synchronously prior to any other communication on a connection. This is done in
113+
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
114+
*
110115
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
111116
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
112117
* properly. For non-Yarn deployments, users can write a filter to go through a
@@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
139144
* can take place.
140145
*/
141146

142-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
147+
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
143148

144149
// key used to store the spark secret in the Hadoop UGI
145150
private val sparkSecretLookupKey = "sparkCookie"
@@ -337,4 +342,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
337342
* @return the secret key as a String if authentication is enabled, otherwise returns null
338343
*/
339344
def getSecretKey(): String = secretKey
345+
346+
// Default SecurityManager only has a single secret key, so ignore appId.
347+
override def getSaslUser(appId: String): String = getSaslUser()
348+
override def getSecretKey(appId: String): String = getSecretKey()
340349
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
217217
*/
218218
getAll.filter { case (k, _) => isAkkaConf(k) }
219219

220+
/**
221+
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
222+
* from the start in the Executor.
223+
*/
224+
def getAppId: String = get("spark.app.id")
225+
220226
/** Does the configuration contain a given parameter? */
221227
def contains(key: String): Boolean = settings.contains(key)
222228

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
313313
val applicationId: String = taskScheduler.applicationId()
314314
conf.set("spark.app.id", applicationId)
315315

316+
env.blockManager.initialize(applicationId)
317+
316318
val metricsSystem = env.metricsSystem
317319

318320
// The metrics system for Driver need to be set spark.app.id to app ID.
@@ -558,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
558560

559561

560562
/**
563+
* :: Experimental ::
564+
*
561565
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
562566
* (useful for binary data)
563567
*
@@ -600,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
600604
}
601605

602606
/**
607+
* :: Experimental ::
608+
*
603609
* Load data from a flat binary file, assuming the length of each record is constant.
604610
*
605611
* @param path Directory to the input data files

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ object SparkEnv extends Logging {
276276
val blockTransferService =
277277
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278278
case "netty" =>
279-
new NettyBlockTransferService(conf)
279+
new NettyBlockTransferService(conf, securityManager)
280280
case "nio" =>
281281
new NioBlockTransferService(conf, securityManager)
282282
}
@@ -285,8 +285,9 @@ object SparkEnv extends Logging {
285285
"BlockManagerMaster",
286286
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
287287

288+
// NB: blockManager is not valid until initialize() is called later.
288289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
289-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
290291

291292
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
292293

0 commit comments

Comments
 (0)