Skip to content

Commit 401033f

Browse files
committed
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into kill
Conflicts: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
2 parents 7a7bdd2 + 1d6abe3 commit 401033f

File tree

6 files changed

+62
-16
lines changed

6 files changed

+62
-16
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
570570
* // In a separate thread:
571571
* sc.cancelJobGroup("some_job_to_cancel");
572572
* }}}
573+
*
574+
* If interruptOnCancel is set to true for the job group, then job cancellation will result
575+
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
576+
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
577+
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
578+
*/
579+
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit =
580+
sc.setJobGroup(groupId, description, interruptOnCancel)
581+
582+
/**
583+
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
584+
* different value or cleared.
585+
*
586+
* @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
587+
* This method sets interruptOnCancel to false.
573588
*/
574589
def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
575590

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore
2121

2222
import scala.concurrent.Await
2323
import scala.concurrent.ExecutionContext.Implicits.global
24-
import scala.concurrent.duration.Duration
24+
import scala.concurrent.duration._
2525
import scala.concurrent.future
2626

2727
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -130,13 +130,43 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
130130
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
131131
}
132132

133+
// Block until both tasks of job A have started and cancel job A.
134+
sem.acquire(2)
135+
133136
sc.clearJobGroup()
134137
val jobB = sc.parallelize(1 to 100, 2).countAsync()
138+
sc.cancelJobGroup("jobA")
139+
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
140+
assert(e.getMessage contains "cancel")
141+
142+
// Once A is cancelled, job B should finish fairly quickly.
143+
assert(jobB.get() === 100)
144+
}
145+
146+
test("job group with interruption") {
147+
sc = new SparkContext("local[2]", "test")
148+
149+
// Add a listener to release the semaphore once any tasks are launched.
150+
val sem = new Semaphore(0)
151+
sc.addSparkListener(new SparkListener {
152+
override def onTaskStart(taskStart: SparkListenerTaskStart) {
153+
sem.release()
154+
}
155+
})
156+
157+
// jobA is the one to be cancelled.
158+
val jobA = future {
159+
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
160+
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
161+
}
135162

136163
// Block until both tasks of job A have started and cancel job A.
137164
sem.acquire(2)
165+
166+
sc.clearJobGroup()
167+
val jobB = sc.parallelize(1 to 100, 2).countAsync()
138168
sc.cancelJobGroup("jobA")
139-
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
169+
val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
140170
assert(e.getMessage contains "cancel")
141171

142172
// Once A is cancelled, job B should finish fairly quickly.

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
5050
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
5151
assert(counter.count === 0)
5252

53-
// Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
53+
// Starting listener bus should flush all buffered events
5454
bus.start()
55-
Thread.sleep(1000)
55+
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
5656
assert(counter.count === 5)
5757

5858
// After listener bus has stopped, posting events should not increment counter
@@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
177177
listener.stageInfos.clear()
178178

179179
rdd3.count()
180+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
180181
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
181182
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
182183
stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
4646
* @note vertex ids are unique.
4747
* @return an RDD containing the vertices in this graph
4848
*/
49-
val vertices: VertexRDD[VD]
49+
@transient val vertices: VertexRDD[VD]
5050

5151
/**
5252
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
5959
* along with their vertex data.
6060
*
6161
*/
62-
val edges: EdgeRDD[ED]
62+
@transient val edges: EdgeRDD[ED]
6363

6464
/**
6565
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
7777
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
7878
* }}}
7979
*/
80-
val triplets: RDD[EdgeTriplet[VD, ED]]
80+
@transient val triplets: RDD[EdgeTriplet[VD, ED]]
8181

8282
/**
8383
* Caches the vertices and edges associated with this graph at the specified storage level.

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,28 @@ import scala.util.Random
3434
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
3535

3636
/** The number of edges in the graph. */
37-
lazy val numEdges: Long = graph.edges.count()
37+
@transient lazy val numEdges: Long = graph.edges.count()
3838

3939
/** The number of vertices in the graph. */
40-
lazy val numVertices: Long = graph.vertices.count()
40+
@transient lazy val numVertices: Long = graph.vertices.count()
4141

4242
/**
4343
* The in-degree of each vertex in the graph.
4444
* @note Vertices with no in-edges are not returned in the resulting RDD.
4545
*/
46-
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
46+
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
4747

4848
/**
4949
* The out-degree of each vertex in the graph.
5050
* @note Vertices with no out-edges are not returned in the resulting RDD.
5151
*/
52-
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
52+
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
5353

5454
/**
5555
* The degree of each vertex in the graph.
5656
* @note Vertices with no edges are not returned in the resulting RDD.
5757
*/
58-
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
58+
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
5959

6060
/**
6161
* Computes the neighboring vertex degrees.

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
3434
*/
3535
private[graphx]
3636
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
37-
val srcIds: Array[VertexId],
38-
val dstIds: Array[VertexId],
39-
val data: Array[ED],
40-
val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
37+
@transient val srcIds: Array[VertexId],
38+
@transient val dstIds: Array[VertexId],
39+
@transient val data: Array[ED],
40+
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
4141

4242
/**
4343
* Reverse all the edges in this partition.

0 commit comments

Comments
 (0)