Skip to content

Commit 2290b6b

Browse files
author
Hung Lin
committed
[SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup()
Author: Hung Lin <[email protected]>
1 parent 424e987 commit 2290b6b

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
433433
// Thread Local variable that can be used by users to pass information down the stack
434434
private val localProperties = new InheritableThreadLocal[Properties] {
435435
override protected def childValue(parent: Properties): Properties = new Properties(parent)
436+
override protected def initialValue(): Properties = new Properties()
436437
}
437438

438439
/**
@@ -474,9 +475,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
474475
* Spark fair scheduler pool.
475476
*/
476477
def setLocalProperty(key: String, value: String) {
477-
if (localProperties.get() == null) {
478-
localProperties.set(new Properties())
479-
}
480478
if (value == null) {
481479
localProperties.get.remove(key)
482480
} else {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ class DAGScheduler(
493493
callSite: CallSite,
494494
allowLocal: Boolean,
495495
resultHandler: (Int, U) => Unit,
496-
properties: Properties = null): JobWaiter[U] = {
496+
properties: Properties): JobWaiter[U] = {
497497
// Check to make sure we are not launching a task on a partition that does not exist.
498498
val maxPartitions = rdd.partitions.length
499499
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -522,7 +522,7 @@ class DAGScheduler(
522522
callSite: CallSite,
523523
allowLocal: Boolean,
524524
resultHandler: (Int, U) => Unit,
525-
properties: Properties = null): Unit = {
525+
properties: Properties): Unit = {
526526
val start = System.nanoTime
527527
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
528528
waiter.awaitResult() match {
@@ -542,7 +542,7 @@ class DAGScheduler(
542542
evaluator: ApproximateEvaluator[U, R],
543543
callSite: CallSite,
544544
timeout: Long,
545-
properties: Properties = null): PartialResult[R] = {
545+
properties: Properties): PartialResult[R] = {
546546
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
547547
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
548548
val partitions = (0 until rdd.partitions.size).toArray
@@ -689,7 +689,7 @@ class DAGScheduler(
689689
// Cancel all jobs belonging to this job group.
690690
// First finds all active jobs with this group id, and then kill stages for them.
691691
val activeInGroup = activeJobs.filter(activeJob =>
692-
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
692+
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
693693
val jobIds = activeInGroup.map(_.jobId)
694694
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
695695
submitWaitingStages()
@@ -736,7 +736,7 @@ class DAGScheduler(
736736
allowLocal: Boolean,
737737
callSite: CallSite,
738738
listener: JobListener,
739-
properties: Properties = null) {
739+
properties: Properties) {
740740
var finalStage: ResultStage = null
741741
try {
742742
// New stage creation may throw an exception if, for example, jobs are run on a

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21+
import java.util.concurrent.TimeUnit
2122

2223
import com.google.common.base.Charsets._
2324
import com.google.common.io.Files
2425

2526
import org.scalatest.FunSuite
2627

2728
import org.apache.hadoop.io.BytesWritable
28-
2929
import org.apache.spark.util.Utils
3030

31+
import scala.concurrent.Await
32+
import scala.concurrent.duration.Duration
33+
3134
class SparkContextSuite extends FunSuite with LocalSparkContext {
3235

3336
test("Only one SparkContext may be active at a time") {
@@ -173,4 +176,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
173176
sc.stop()
174177
}
175178
}
179+
180+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
181+
try {
182+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
183+
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
184+
sc.cancelJobGroup("nonExistGroupId")
185+
Await.ready(future, Duration(2, TimeUnit.SECONDS))
186+
187+
// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
188+
// SparkContext to shutdown, so the following assertion will fail.
189+
assert(sc.parallelize(1 to 10).count() == 10L)
190+
} finally {
191+
sc.stop()
192+
}
193+
}
176194
}

0 commit comments

Comments
 (0)