Skip to content

Commit d3b9253

Browse files
author
Davies Liu
committed
add tests, mute the exception after stop
1 parent 4297327 commit d3b9253

File tree

2 files changed

+54
-17
lines changed

2 files changed

+54
-17
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
21+
import java.util.concurrent.RejectedExecutionException
2122

2223
import scala.language.existentials
2324
import scala.util.control.NonFatal
@@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
9596
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
9697
serializedData: ByteBuffer) {
9798
var reason : TaskEndReason = UnknownReason
98-
getTaskResultExecutor.execute(new Runnable {
99-
override def run(): Unit = Utils.logUncaughtExceptions {
100-
try {
101-
if (serializedData != null && serializedData.limit() > 0) {
102-
reason = serializer.get().deserialize[TaskEndReason](
103-
serializedData, Utils.getSparkClassLoader)
99+
try {
100+
getTaskResultExecutor.execute(new Runnable {
101+
override def run(): Unit = Utils.logUncaughtExceptions {
102+
try {
103+
if (serializedData != null && serializedData.limit() > 0) {
104+
reason = serializer.get().deserialize[TaskEndReason](
105+
serializedData, Utils.getSparkClassLoader)
106+
}
107+
} catch {
108+
case cnd: ClassNotFoundException =>
109+
// Log an error but keep going here -- the task failed, so not catastrophic if we can't
110+
// deserialize the reason.
111+
val loader = Utils.getContextOrSparkClassLoader
112+
logError(
113+
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
114+
case ex: Exception => {}
104115
}
105-
} catch {
106-
case cnd: ClassNotFoundException =>
107-
// Log an error but keep going here -- the task failed, so not catastrophic if we can't
108-
// deserialize the reason.
109-
val loader = Utils.getContextOrSparkClassLoader
110-
logError(
111-
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
112-
case ex: Exception => {}
116+
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
113117
}
114-
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
115-
}
116-
})
118+
})
119+
} catch {
120+
case e: RejectedExecutionException if sparkEnv.isStopped =>
121+
// ignore it
122+
}
117123
}
118124

119125
def stop() {

python/pyspark/tests.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,37 @@ def test_with_stop(self):
15501550
sc.stop()
15511551
self.assertEqual(SparkContext._active_spark_context, None)
15521552

1553+
def test_progress_api(self):
1554+
with SparkContext() as sc:
1555+
sc.setJobGroup('test_progress_api', '', True)
1556+
1557+
rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
1558+
t = threading.Thread(target=rdd.collect)
1559+
t.daemon = True
1560+
t.start()
1561+
# wait for scheduler to start
1562+
time.sleep(1)
1563+
1564+
tracker = sc.statusTracker()
1565+
jobIds = tracker.getJobIdsForGroup('test_progress_api')
1566+
self.assertEqual(1, len(jobIds))
1567+
job = tracker.getJobInfo(jobIds[0])
1568+
self.assertEqual(1, len(job.stageIds))
1569+
stage = tracker.getStageInfo(job.stageIds[0])
1570+
self.assertEqual(rdd.getNumPartitions(), stage.numTasks)
1571+
1572+
sc.cancelAllJobs()
1573+
t.join()
1574+
# wait for event listener to update the status
1575+
time.sleep(1)
1576+
1577+
job = tracker.getJobInfo(jobIds[0])
1578+
self.assertEqual('FAILED', job.status)
1579+
self.assertEqual([], tracker.getActiveJobsIds())
1580+
self.assertEqual([], tracker.getActiveStageIds())
1581+
1582+
sc.stop()
1583+
15531584

15541585
@unittest.skipIf(not _have_scipy, "SciPy not installed")
15551586
class SciPyTests(PySparkTestCase):

0 commit comments

Comments
 (0)