Skip to content

Commit bab1d8b

Browse files
committed
Check for NotSerializableException in submitMissingTasks.
1 parent cf38450 commit bab1d8b

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{NotSerializableException, PrintWriter, StringWriter}
20+
import java.io.{NotSerializableException}
2121
import java.util.Properties
2222
import java.util.concurrent.atomic.AtomicInteger
2323

@@ -35,6 +35,7 @@ import akka.pattern.ask
3535
import akka.util.Timeout
3636

3737
import org.apache.spark._
38+
import org.apache.spark.broadcast.Broadcast
3839
import org.apache.spark.executor.TaskMetrics
3940
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
4041
import org.apache.spark.rdd.RDD
@@ -694,7 +695,21 @@ class DAGScheduler(
694695
// Get our pending tasks and remember them in our pendingTasks entry
695696
stage.pendingTasks.clear()
696697
var tasks = ArrayBuffer[Task[_]]()
697-
val broadcastRddBinary = stage.rdd.createBroadcastBinary()
698+
699+
var broadcastRddBinary: Broadcast[Array[Byte]] = null
700+
try {
701+
broadcastRddBinary = stage.rdd.createBroadcastBinary()
702+
} catch {
703+
case e: NotSerializableException =>
704+
abortStage(stage, "Task not serializable: " + e.toString)
705+
runningStages -= stage
706+
return
707+
case NonFatal(e) =>
708+
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
709+
runningStages -= stage
710+
return
711+
}
712+
698713
if (stage.isShuffleMap) {
699714
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
700715
val locs = getPreferredLocs(stage.rdd, p)

0 commit comments

Comments
 (0)