17
17
18
18
package org .apache .spark .scheduler
19
19
20
- import java .io .{ NotSerializableException }
20
+ import java .io .NotSerializableException
21
21
import java .util .Properties
22
22
import java .util .concurrent .atomic .AtomicInteger
23
23
@@ -696,10 +696,25 @@ class DAGScheduler(
696
696
stage.pendingTasks.clear()
697
697
var tasks = ArrayBuffer [Task [_]]()
698
698
699
+ val properties = if (jobIdToActiveJob.contains(jobId)) {
700
+ jobIdToActiveJob(stage.jobId).properties
701
+ } else {
702
+ // this stage will be assigned to "default" pool
703
+ null
704
+ }
705
+
706
+ runningStages += stage
707
+ // SparkListenerStageSubmitted should be posted before testing whether tasks are
708
+ // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
709
+ // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
710
+ // event.
711
+ listenerBus.post(SparkListenerStageSubmitted (stage.info, properties))
712
+
699
713
var broadcastRddBinary : Broadcast [Array [Byte ]] = null
700
714
try {
701
715
broadcastRddBinary = stage.rdd.createBroadcastBinary()
702
716
} catch {
717
+ // In the case of a failure during serialization, abort the stage.
703
718
case e : NotSerializableException =>
704
719
abortStage(stage, " Task not serializable: " + e.toString)
705
720
runningStages -= stage
@@ -727,21 +742,7 @@ class DAGScheduler(
727
742
}
728
743
}
729
744
730
- val properties = if (jobIdToActiveJob.contains(jobId)) {
731
- jobIdToActiveJob(stage.jobId).properties
732
- } else {
733
- // this stage will be assigned to "default" pool
734
- null
735
- }
736
-
737
745
if (tasks.size > 0 ) {
738
- runningStages += stage
739
- // SparkListenerStageSubmitted should be posted before testing whether tasks are
740
- // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
741
- // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
742
- // event.
743
- listenerBus.post(SparkListenerStageSubmitted (stage.info, properties))
744
-
745
746
// Preemptively serialize a task to make sure it can be serialized. We are catching this
746
747
// exception here because it would be fairly hard to catch the non-serializable exception
747
748
// down the road, where we have several different implementations for local scheduler and
@@ -766,6 +767,9 @@ class DAGScheduler(
766
767
new TaskSet (tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
767
768
stage.info.submissionTime = Some (clock.getTime())
768
769
} else {
770
+ // Because we posted SparkListenerStageSubmitted earlier, we should post
771
+ // SparkListenerStageCompleted here in case there are no tasks to run.
772
+ listenerBus.post(SparkListenerStageCompleted (stage.info))
769
773
logDebug(" Stage " + stage + " is actually done; %b %d %d" .format(
770
774
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
771
775
runningStages -= stage
0 commit comments