Skip to content

Commit 2d19f3c

Browse files
author
Marcelo Vanzin
committed
Review feedback.
1 parent 6706d3a commit 2d19f3c

File tree

4 files changed

+17
-24
lines changed

4 files changed

+17
-24
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
117117

118118
replayBus.replay()
119119

120-
// Note that this does not have any effect due to SPARK-2169.
121120
ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
122121

123122
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
@@ -183,29 +182,28 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
183182

184183
lastModifiedTime = newLastModifiedTime
185184

185+
// When there are new logs, merge the new list with the existing one, maintaining
186+
// the expected ordering (descending end time). Maintaining the order is important
187+
// to avoid having to sort the list every time there is a request for the log list.
186188
if (!logInfos.isEmpty) {
187-
var newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
188-
189-
def addToList(info: FsApplicationHistoryInfo) = {
189+
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
190+
def addIfAbsent(info: FsApplicationHistoryInfo) = {
190191
if (!newApps.contains(info.id)) {
191192
newApps += (info.id -> info)
192193
}
193194
}
194195

195-
// Merge the new apps with the existing ones, discarding any duplicates.
196196
val newIterator = logInfos.iterator.buffered
197197
val oldIterator = applications.values.iterator.buffered
198-
199198
while (newIterator.hasNext && oldIterator.hasNext) {
200199
if (newIterator.head.endTime > oldIterator.head.endTime) {
201-
addToList(newIterator.next)
200+
addIfAbsent(newIterator.next)
202201
} else {
203-
addToList(oldIterator.next)
202+
addIfAbsent(oldIterator.next)
204203
}
205204
}
206-
207-
newIterator.foreach(addToList)
208-
oldIterator.foreach(addToList)
205+
newIterator.foreach(addIfAbsent)
206+
oldIterator.foreach(addIfAbsent)
209207

210208
applications = newApps
211209
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.ui.JettyUtils
4242
* (spark.deploy.*).
4343
*/
4444
private[spark]
45-
abstract class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
45+
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
4646
extends SchedulerBackend with Logging
4747
{
4848
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -51,12 +51,12 @@ abstract class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actor
5151
val conf = scheduler.sc.conf
5252
private val timeout = AkkaUtils.askTimeout(conf)
5353
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
54-
// Submit tasks only after (registered resources / total expected resources)
54+
// Submit tasks only after (registered resources / total expected resources)
5555
// is equal to at least this value, that is double between 0 and 1.
5656
var minRegisteredRatio =
5757
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
5858
// Submit tasks after maxRegisteredWaitingTime milliseconds
59-
// if minRegisteredRatio has not yet been reached
59+
// if minRegisteredRatio has not yet been reached
6060
val maxRegisteredWaitingTime =
6161
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
6262
val createTime = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,18 @@ private[spark] class SparkDeploySchedulerBackend(
8888
override def connected(appId: String) {
8989
logInfo("Connected to Spark cluster with app ID " + appId)
9090
this.appId = appId
91-
wakeUpContext()
91+
notifyContext()
9292
}
9393

9494
override def disconnected() {
95-
wakeUpContext()
95+
notifyContext()
9696
if (!stopping) {
9797
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
9898
}
9999
}
100100

101101
override def dead(reason: String) {
102-
wakeUpContext()
102+
notifyContext()
103103
if (!stopping) {
104104
logError("Application has been killed. Reason: " + reason)
105105
scheduler.error(reason)
@@ -127,7 +127,7 @@ private[spark] class SparkDeploySchedulerBackend(
127127
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
128128
}
129129

130-
override def applicationId(): Option[String] = Some(appId)
130+
override def applicationId(): Option[String] = Option(appId)
131131

132132
private def waitForRegistration() = {
133133
registrationLock.synchronized {
@@ -137,7 +137,7 @@ private[spark] class SparkDeploySchedulerBackend(
137137
}
138138
}
139139

140-
private def wakeUpContext() = {
140+
private def notifyContext() = {
141141
registrationLock.synchronized {
142142
registrationDone = true
143143
registrationLock.notifyAll()

project/MimaExcludes.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,6 @@ object MimaExcludes {
108108
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
109109
// Class was missing "@DeveloperApi" annotation in 1.0.
110110
MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
111-
// Class is "private[spark]" but for some reason not being ignored?
112-
Seq(
113-
ProblemFilters.exclude[AbstractClassProblem](
114-
"org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend")
115-
) ++
116111
Seq(
117112
ProblemFilters.exclude[IncompatibleMethTypeProblem](
118113
"org.apache.spark.mllib.tree.impurity.Gini.calculate"),

0 commit comments

Comments
 (0)