Skip to content

Commit c23865c

Browse files
committed
Merge branch 'master' into profiler
2 parents 15d6f18 + 885d162 commit c23865c

File tree

81 files changed

+1494
-684
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1494
-684
lines changed

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
2424
import org.apache.spark._
2525
import org.apache.spark.storage.StorageLevel
2626

27-
import scala.language.postfixOps
28-
2927
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
3028
class TestMessage(val targetId: String) extends Message[String] with Serializable
3129

bin/compute-classpath.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
4343
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
4444
"classes ahead of assembly." >&2
4545
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
46+
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
4647
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
4748
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
4849
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"

core/pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,33 @@
351351
</execution>
352352
</executions>
353353
</plugin>
354+
<!--
355+
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
356+
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
357+
the runtime classpath.
358+
-->
359+
<plugin>
360+
<groupId>org.apache.maven.plugins</groupId>
361+
<artifactId>maven-dependency-plugin</artifactId>
362+
<executions>
363+
<execution>
364+
<id>copy-dependencies</id>
365+
<phase>package</phase>
366+
<goals>
367+
<goal>copy-dependencies</goal>
368+
</goals>
369+
<configuration>
370+
<outputDirectory>${project.build.directory}</outputDirectory>
371+
<overWriteReleases>false</overWriteReleases>
372+
<overWriteSnapshots>false</overWriteSnapshots>
373+
<overWriteIfNewer>true</overWriteIfNewer>
374+
<useSubDirectoryPerType>true</useSubDirectoryPerType>
375+
<includeArtifactIds>guava</includeArtifactIds>
376+
<silent>true</silent>
377+
</configuration>
378+
</execution>
379+
</executions>
380+
</plugin>
354381
</plugins>
355382

356383
<resources>

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
162162

163163
// always add the current user and SPARK_USER to the viewAcls
164164
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
165-
Option(System.getenv("SPARK_USER")).getOrElse(""))
165+
Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
166166

167167
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
168168
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
220220
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
221221

222222
// Initialize the Spark UI, registering all associated listeners
223-
private[spark] val ui = new SparkUI(this)
224-
ui.bind()
223+
private[spark] val ui: Option[SparkUI] =
224+
if (conf.getBoolean("spark.ui.enabled", true)) {
225+
Some(new SparkUI(this))
226+
} else {
227+
// For tests, do not enable the UI
228+
None
229+
}
230+
ui.foreach(_.bind())
225231

226232
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
227233
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
@@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
990996
/** Shut down the SparkContext. */
991997
def stop() {
992998
postApplicationEnd()
993-
ui.stop()
999+
ui.foreach(_.stop())
9941000
// Do this only if not stopped already - best case effort.
9951001
// prevent NPE if stopped more than once.
9961002
val dagSchedulerCopy = dagScheduler

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{File, PrintStream}
21-
import java.lang.reflect.InvocationTargetException
21+
import java.lang.reflect.{Modifier, InvocationTargetException}
2222
import java.net.URL
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -323,7 +323,9 @@ object SparkSubmit {
323323
}
324324

325325
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
326-
326+
if (!Modifier.isStatic(mainMethod.getModifiers)) {
327+
throw new IllegalStateException("The main method in the given main class must be static")
328+
}
327329
try {
328330
mainMethod.invoke(null, childArgs.toArray)
329331
} catch {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,13 +487,25 @@ private[spark] class Master(
487487
if (state != RecoveryState.ALIVE) { return }
488488

489489
// First schedule drivers, they take strict precedence over applications
490-
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
491-
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
492-
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
490+
// Randomization helps balance drivers
491+
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
492+
val aliveWorkerNum = shuffledAliveWorkers.size
493+
var curPos = 0
494+
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
495+
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
496+
// start from the last worker that was assigned a driver, and continue onwards until we have
497+
// explored all alive workers.
498+
curPos = (curPos + 1) % aliveWorkerNum
499+
val startPos = curPos
500+
var launched = false
501+
while (curPos != startPos && !launched) {
502+
val worker = shuffledAliveWorkers(curPos)
493503
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
494504
launchDriver(worker, driver)
495505
waitingDrivers -= driver
506+
launched = true
496507
}
508+
curPos = (curPos + 1) % aliveWorkerNum
497509
}
498510
}
499511

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,16 @@ private[spark] class Executor(
360360
if (!taskRunner.attemptedTask.isEmpty) {
361361
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
362362
metrics.updateShuffleReadMetrics
363-
tasksMetrics += ((taskRunner.taskId, metrics))
363+
if (isLocal) {
364+
// JobProgressListener will hold an reference of it during
365+
// onExecutorMetricsUpdate(), then JobProgressListener can not see
366+
// the changes of metrics any more, so make a deep copy of it
367+
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
368+
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
369+
} else {
370+
// It will be copied by serialization
371+
tasksMetrics += ((taskRunner.taskId, metrics))
372+
}
364373
}
365374
}
366375
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
292292
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
293293
conf.set("spark.ui.filters", filterName)
294294
conf.set(s"spark.$filterName.params", filterParams)
295-
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
295+
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
296296
}
297297
}
298298
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import org.apache.hadoop.conf.Configuration
2120
import org.apache.hadoop.fs.{Path, FileSystem}
2221

2322
import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(
4746

4847
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
4948
val fs = FileSystem.get(conf)
49+
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
5050

5151
logInfo("Writing to HDFS file: " + driverFilePath)
5252
logInfo("Writing Akka address: " + driverUrl)
53-
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
53+
logInfo("Writing Spark UI Address: " + appUIAddress)
5454

5555
// Create temporary file to prevent race condition where executors get empty driverUrl file
5656
val temp = fs.create(tmpPath, true)
5757
temp.writeUTF(driverUrl)
5858
temp.writeInt(maxCores)
59-
temp.writeUTF(sc.ui.appUIAddress)
59+
temp.writeUTF(appUIAddress)
6060
temp.close()
6161

6262
// "Atomic" rename

0 commit comments

Comments
 (0)