Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 51304e6

Browse files
committed
Addressed comments.
1 parent c35237b commit 51304e6

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import java.io.{NotSerializableException, InputStream}
20+
import java.io.{InputStream, NotSerializableException}
2121
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
2222

2323
import scala.collection.Map
2424
import scala.collection.mutable.Queue
2525
import scala.reflect.ClassTag
26-
import scala.util.control.NonFatal
2726

2827
import akka.actor.{Props, SupervisorStrategy}
2928
import org.apache.hadoop.conf.Configuration
@@ -43,7 +42,7 @@ import org.apache.spark.streaming.dstream._
4342
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
4443
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
4544
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
46-
import org.apache.spark.util.{Utils, CallSite}
45+
import org.apache.spark.util.CallSite
4746

4847
/**
4948
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -542,7 +541,7 @@ class StreamingContext private[streaming] (
542541
Checkpoint.serialize(checkpoint, conf)
543542
} catch {
544543
case e: NotSerializableException =>
545-
throw new IllegalArgumentException(
544+
throw new NotSerializableException(
546545
"DStream checkpointing has been enabled but the DStreams with their functions " +
547546
"are not serializable\nSerialization stack:\n" +
548547
SerializationDebugger.find(checkpoint).map("\t- " + _).mkString("\n")

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import java.io.File
20+
import java.io.{File, NotSerializableException}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import org.apache.commons.io.FileUtils
24-
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
25-
import org.scalatest.concurrent.Timeouts
2624
import org.scalatest.concurrent.Eventually._
25+
import org.scalatest.concurrent.Timeouts
2726
import org.scalatest.exceptions.TestFailedDueToTimeoutException
2827
import org.scalatest.time.SpanSugar._
28+
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
2929

30-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
3130
import org.apache.spark.storage.StorageLevel
3231
import org.apache.spark.streaming.dstream.DStream
3332
import org.apache.spark.streaming.receiver.Receiver
3433
import org.apache.spark.util.Utils
34+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
3535

3636

3737
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {
@@ -143,9 +143,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
143143
}
144144

145145
// Test whether start() fails early when checkpointing is enabled
146-
intercept[IllegalArgumentException] {
146+
val exception = intercept[NotSerializableException] {
147147
ssc.start()
148148
}
149+
assert(exception.getMessage().contains("DStreams with their functions are not serializable"))
149150
assert(ssc.getState() !== StreamingContextState.ACTIVE)
150151
assert(StreamingContext.getActive().isEmpty)
151152
}

0 commit comments

Comments
 (0)