Skip to content

Commit 85f298f

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents dc65374 + e06c7df commit 85f298f

File tree

167 files changed

+5302
-1613
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+5302
-1613
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
771771
See the License for the specific language governing permissions and
772772
limitations under the License.
773773

774+
========================================================================
775+
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
776+
========================================================================
777+
Copyright (C) 2015 Stijn de Gouw
778+
779+
Licensed under the Apache License, Version 2.0 (the "License");
780+
you may not use this file except in compliance with the License.
781+
You may obtain a copy of the License at
782+
783+
http://www.apache.org/licenses/LICENSE-2.0
784+
785+
Unless required by applicable law or agreed to in writing, software
786+
distributed under the License is distributed on an "AS IS" BASIS,
787+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
788+
See the License for the specific language governing permissions and
789+
limitations under the License.
774790

775791
========================================================================
776792
For LimitedInputStream

core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
425425
private void mergeCollapse() {
426426
while (stackSize > 1) {
427427
int n = stackSize - 2;
428-
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
428+
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
429+
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
429430
if (runLen[n - 1] < runLen[n + 1])
430431
n--;
431-
mergeAt(n);
432-
} else if (runLen[n] <= runLen[n + 1]) {
433-
mergeAt(n);
434-
} else {
432+
} else if (runLen[n] > runLen[n + 1]) {
435433
break; // Invariant is established
436434
}
435+
mergeAt(n);
437436
}
438437
}
439438

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -280,15 +280,24 @@ object AccumulatorParam {
280280

281281
// TODO: The multi-thread support in accumulators is kind of lame; check
282282
// if there's a more intuitive way of doing it right
283-
private[spark] object Accumulators {
284-
// Store a WeakReference instead of a StrongReference because this way accumulators can be
285-
// appropriately garbage collected during long-running jobs and release memory
286-
type WeakAcc = WeakReference[Accumulable[_, _]]
287-
val originals = Map[Long, WeakAcc]()
288-
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
289-
override protected def initialValue() = Map[Long, WeakAcc]()
283+
private[spark] object Accumulators extends Logging {
284+
/**
285+
* This global map holds the original accumulator objects that are created on the driver.
286+
* It keeps weak references to these objects so that accumulators can be garbage-collected
287+
* once the RDDs and user-code that reference them are cleaned up.
288+
*/
289+
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
290+
291+
/**
292+
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
293+
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
294+
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
295+
*/
296+
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
297+
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
290298
}
291-
var lastId: Long = 0
299+
300+
private var lastId: Long = 0
292301

293302
def newId(): Long = synchronized {
294303
lastId += 1
@@ -297,16 +306,16 @@ private[spark] object Accumulators {
297306

298307
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
299308
if (original) {
300-
originals(a.id) = new WeakAcc(a)
309+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
301310
} else {
302-
localAccums.get()(a.id) = new WeakAcc(a)
311+
localAccums.get()(a.id) = a
303312
}
304313
}
305314

306315
// Clear the local (non-original) accumulators for the current thread
307316
def clear() {
308317
synchronized {
309-
localAccums.get.clear
318+
localAccums.get.clear()
310319
}
311320
}
312321

@@ -320,12 +329,7 @@ private[spark] object Accumulators {
320329
def values: Map[Long, Any] = synchronized {
321330
val ret = Map[Long, Any]()
322331
for ((id, accum) <- localAccums.get) {
323-
// Since we are now storing weak references, we must check whether the underlying data
324-
// is valid.
325-
ret(id) = accum.get match {
326-
case Some(values) => values.localValue
327-
case None => None
328-
}
332+
ret(id) = accum.localValue
329333
}
330334
return ret
331335
}
@@ -341,6 +345,8 @@ private[spark] object Accumulators {
341345
case None =>
342346
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
343347
}
348+
} else {
349+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
344350
}
345351
}
346352
}

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
105105
cleaningThread.start()
106106
}
107107

108-
/** Stop the cleaner. */
108+
/**
109+
* Stop the cleaning thread and wait until the thread has finished running its current task.
110+
*/
109111
def stop() {
110112
stopped = true
113+
// Interrupt the cleaning thread, but wait until the current task has finished before
114+
// doing so. This guards against the race condition where a cleaning thread may
115+
// potentially clean similarly named variables created by a different SparkContext,
116+
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+
synchronized {
118+
cleaningThread.interrupt()
119+
}
120+
cleaningThread.join()
111121
}
112122

113123
/** Register a RDD for cleanup when it is garbage collected. */
@@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
140150
try {
141151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
142152
.map(_.asInstanceOf[CleanupTaskWeakReference])
143-
reference.map(_.task).foreach { task =>
144-
logDebug("Got cleaning task " + task)
145-
referenceBuffer -= reference.get
146-
task match {
147-
case CleanRDD(rddId) =>
148-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
149-
case CleanShuffle(shuffleId) =>
150-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
151-
case CleanBroadcast(broadcastId) =>
152-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153-
case CleanAccum(accId) =>
154-
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
153+
// Synchronize here to avoid being interrupted on stop()
154+
synchronized {
155+
reference.map(_.task).foreach { task =>
156+
logDebug("Got cleaning task " + task)
157+
referenceBuffer -= reference.get
158+
task match {
159+
case CleanRDD(rddId) =>
160+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+
case CleanShuffle(shuffleId) =>
162+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+
case CleanBroadcast(broadcastId) =>
164+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+
case CleanAccum(accId) =>
166+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+
}
155168
}
156169
}
157170
} catch {
171+
case ie: InterruptedException if stopped => // ignore
158172
case e: Exception => logError("Error in cleaning thread", e)
159173
}
160174
}
@@ -188,10 +202,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
188202
/** Perform broadcast cleanup. */
189203
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
190204
try {
191-
logDebug("Cleaning broadcast " + broadcastId)
205+
logDebug(s"Cleaning broadcast $broadcastId")
192206
broadcastManager.unbroadcast(broadcastId, true, blocking)
193207
listeners.foreach(_.broadcastCleaned(broadcastId))
194-
logInfo("Cleaned broadcast " + broadcastId)
208+
logDebug(s"Cleaned broadcast $broadcastId")
195209
} catch {
196210
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
197211
}

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,86 @@
1717

1818
package org.apache.spark
1919

20-
import akka.actor.Actor
20+
import scala.concurrent.duration._
21+
import scala.collection.mutable
22+
23+
import akka.actor.{Actor, Cancellable}
24+
2125
import org.apache.spark.executor.TaskMetrics
2226
import org.apache.spark.storage.BlockManagerId
23-
import org.apache.spark.scheduler.TaskScheduler
27+
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
2428
import org.apache.spark.util.ActorLogReceive
2529

2630
/**
2731
* A heartbeat from executors to the driver. This is a shared message used by several internal
28-
* components to convey liveness or execution information for in-progress tasks.
32+
* components to convey liveness or execution information for in-progress tasks. It will also
33+
* expire the hosts that have not heartbeated for more than spark.network.timeout.
2934
*/
3035
private[spark] case class Heartbeat(
3136
executorId: String,
3237
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3338
blockManagerId: BlockManagerId)
3439

40+
private[spark] case object ExpireDeadHosts
41+
3542
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3643

3744
/**
3845
* Lives in the driver to receive heartbeats from executors..
3946
*/
40-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
47+
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
4148
extends Actor with ActorLogReceive with Logging {
4249

50+
// executor ID -> timestamp of when the last heartbeat from this executor was received
51+
private val executorLastSeen = new mutable.HashMap[String, Long]
52+
53+
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
54+
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
55+
56+
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
57+
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
58+
59+
private var timeoutCheckingTask: Cancellable = null
60+
61+
override def preStart(): Unit = {
62+
import context.dispatcher
63+
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
64+
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
65+
super.preStart()
66+
}
67+
4368
override def receiveWithLogging = {
4469
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
45-
val response = HeartbeatResponse(
46-
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
70+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
71+
executorId, taskMetrics, blockManagerId)
72+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
73+
executorLastSeen(executorId) = System.currentTimeMillis()
4774
sender ! response
75+
case ExpireDeadHosts =>
76+
expireDeadHosts()
77+
}
78+
79+
private def expireDeadHosts(): Unit = {
80+
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
81+
val now = System.currentTimeMillis()
82+
for ((executorId, lastSeenMs) <- executorLastSeen) {
83+
if (now - lastSeenMs > executorTimeoutMs) {
84+
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
85+
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
86+
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
87+
"timed out after ${now - lastSeenMs} ms"))
88+
if (sc.supportDynamicAllocation) {
89+
sc.killExecutor(executorId)
90+
}
91+
executorLastSeen.remove(executorId)
92+
}
93+
}
94+
}
95+
96+
override def postStop(): Unit = {
97+
if (timeoutCheckingTask != null) {
98+
timeoutCheckingTask.cancel()
99+
}
100+
super.postStop()
48101
}
49102
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6868
if (value == null) {
6969
throw new NullPointerException("null value for " + key)
7070
}
71-
settings.put(translateConfKey(key, warn = true), value)
71+
settings.put(key, value)
7272
this
7373
}
7474

@@ -140,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
140140

141141
/** Set a parameter if it isn't already configured */
142142
def setIfMissing(key: String, value: String): SparkConf = {
143-
settings.putIfAbsent(translateConfKey(key, warn = true), value)
143+
settings.putIfAbsent(key, value)
144144
this
145145
}
146146

@@ -176,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
176176

177177
/** Get a parameter as an Option */
178178
def getOption(key: String): Option[String] = {
179-
Option(settings.get(translateConfKey(key)))
179+
Option(settings.get(key))
180180
}
181181

182182
/** Get all parameters as a list of pairs */
@@ -229,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
229229
def getAppId: String = get("spark.app.id")
230230

231231
/** Does the configuration contain a given parameter? */
232-
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
232+
def contains(key: String): Boolean = settings.containsKey(key)
233233

234234
/** Copy this object */
235235
override def clone: SparkConf = {
@@ -343,6 +343,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
343343
}
344344
}
345345
}
346+
347+
// Warn against the use of deprecated configs
348+
deprecatedConfigs.values.foreach { dc =>
349+
if (contains(dc.oldName)) {
350+
dc.warn()
351+
}
352+
}
346353
}
347354

348355
/**
@@ -362,7 +369,13 @@ private[spark] object SparkConf extends Logging {
362369
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
363370
"1.3"),
364371
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
365-
"Use spark.{driver,executor}.userClassPathFirst instead."))
372+
"Use spark.{driver,executor}.userClassPathFirst instead."),
373+
DeprecatedConfig("spark.history.fs.updateInterval",
374+
"spark.history.fs.update.interval.seconds",
375+
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
376+
DeprecatedConfig("spark.history.updateInterval",
377+
"spark.history.fs.update.interval.seconds",
378+
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
366379
configs.map { x => (x.oldName, x) }.toMap
367380
}
368381

@@ -401,7 +414,7 @@ private[spark] object SparkConf extends Logging {
401414
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
402415
* only once for each key.
403416
*/
404-
def translateConfKey(userKey: String, warn: Boolean = false): String = {
417+
private def translateConfKey(userKey: String, warn: Boolean = false): String = {
405418
deprecatedConfigs.get(userKey)
406419
.map { deprecatedKey =>
407420
if (warn) {

0 commit comments

Comments
 (0)