Skip to content

Commit 6ca4a44

Browse files
author
yzhou2001
committed
Merge pull request #1 from apache/master
A merge of latest changes since creation
2 parents 0c7b452 + a75bc7a commit 6ca4a44

File tree

27 files changed

+321
-87
lines changed

27 files changed

+321
-87
lines changed

bin/spark-shell.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ rem
1919

2020
set SPARK_HOME=%~dp0..
2121

22-
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
22+
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell

bin/spark-sql

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,30 +65,30 @@ while (($#)); do
6565
case $1 in
6666
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
6767
ensure_arg_number $# 2
68-
CLI_ARGS+=($1); shift
69-
CLI_ARGS+=($1); shift
68+
CLI_ARGS+=("$1"); shift
69+
CLI_ARGS+=("$1"); shift
7070
;;
7171

7272
-e)
7373
ensure_arg_number $# 2
74-
CLI_ARGS+=($1); shift
75-
CLI_ARGS+=(\"$1\"); shift
74+
CLI_ARGS+=("$1"); shift
75+
CLI_ARGS+=("$1"); shift
7676
;;
7777

7878
-s | --silent)
79-
CLI_ARGS+=($1); shift
79+
CLI_ARGS+=("$1"); shift
8080
;;
8181

8282
-v | --verbose)
8383
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
84-
CLI_ARGS+=($1)
85-
SUBMISSION_ARGS+=($1); shift
84+
CLI_ARGS+=("$1")
85+
SUBMISSION_ARGS+=("$1"); shift
8686
;;
8787

8888
*)
89-
SUBMISSION_ARGS+=($1); shift
89+
SUBMISSION_ARGS+=("$1"); shift
9090
;;
9191
esac
9292
done
9393

94-
eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
94+
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_ARGS[@]}" spark-internal "${CLI_ARGS[@]}"

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ private[spark] class ApplicationInfo(
4646

4747
init()
4848

49+
private def readObject(in: java.io.ObjectInputStream): Unit = {
50+
in.defaultReadObject()
51+
init()
52+
}
53+
4954
private def init() {
5055
state = ApplicationState.WAITING
5156
executors = new mutable.HashMap[Int, ExecutorInfo]

core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.source.Source
2323

2424
class ApplicationSource(val application: ApplicationInfo) extends Source {
25-
val metricRegistry = new MetricRegistry()
26-
val sourceName = "%s.%s.%s".format("application", application.desc.name,
25+
override val metricRegistry = new MetricRegistry()
26+
override val sourceName = "%s.%s.%s".format("application", application.desc.name,
2727
System.currentTimeMillis())
2828

2929
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {

core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.source.Source
2323

2424
private[spark] class MasterSource(val master: Master) extends Source {
25-
val metricRegistry = new MetricRegistry()
26-
val sourceName = "master"
25+
override val metricRegistry = new MetricRegistry()
26+
override val sourceName = "master"
2727

2828
// Gauge for worker numbers in cluster
2929
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {

core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.source.Source
2323

2424
private[spark] class WorkerSource(val worker: Worker) extends Source {
25-
val sourceName = "worker"
26-
val metricRegistry = new MetricRegistry()
25+
override val sourceName = "worker"
26+
override val metricRegistry = new MetricRegistry()
2727

2828
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
2929
override def getValue: Int = worker.executors.size

core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
3535
})
3636
}
3737

38-
val metricRegistry = new MetricRegistry()
38+
override val metricRegistry = new MetricRegistry()
39+
3940
// TODO: It would be nice to pass the application name here
40-
val sourceName = "executor.%s".format(executorId)
41+
override val sourceName = "executor.%s".format(executorId)
4142

4243
// Gauge for executor thread pool's actively executing task counts
4344
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {

core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@ import com.codahale.metrics.MetricRegistry
2121
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
2222

2323
private[spark] class JvmSource extends Source {
24-
val sourceName = "jvm"
25-
val metricRegistry = new MetricRegistry()
24+
override val sourceName = "jvm"
25+
override val metricRegistry = new MetricRegistry()
2626

27-
val gcMetricSet = new GarbageCollectorMetricSet
28-
val memGaugeSet = new MemoryUsageGaugeSet
29-
30-
metricRegistry.registerAll(gcMetricSet)
31-
metricRegistry.registerAll(memGaugeSet)
27+
metricRegistry.registerAll(new GarbageCollectorMetricSet)
28+
metricRegistry.registerAll(new MemoryUsageGaugeSet)
3229
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class DAGScheduler(
121121

122122
private[scheduler] var eventProcessActor: ActorRef = _
123123

124+
/** If enabled, we may run certain actions like take() and first() locally. */
125+
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
126+
124127
private def initializeEventProcessActor() {
125128
// blocking the thread until supervisor is started, which ensures eventProcessActor is
126129
// not null before any job is submitted
@@ -732,7 +735,9 @@ class DAGScheduler(
732735
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
733736
logInfo("Parents of final stage: " + finalStage.parents)
734737
logInfo("Missing parents: " + getMissingParentStages(finalStage))
735-
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
738+
val shouldRunLocally =
739+
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
740+
if (shouldRunLocally) {
736741
// Compute very short actions like first() or take() with no parent stages locally.
737742
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
738743
runLocally(job)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import org.apache.spark.metrics.source.Source
2424

2525
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
2626
extends Source {
27-
val metricRegistry = new MetricRegistry()
28-
val sourceName = "%s.DAGScheduler".format(sc.appName)
27+
override val metricRegistry = new MetricRegistry()
28+
override val sourceName = "%s.DAGScheduler".format(sc.appName)
2929

3030
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
3131
override def getValue: Int = dagScheduler.failedStages.size

0 commit comments

Comments
 (0)