Skip to content

Commit a502e4b

Browse files
committed
[SPARK-7767] [STREAMING] Added test for checkpoint serialization in StreamingContext.start()
Currently, the background checkpointing thread fails silently if the checkpoint is not serializable. It is hard to debug and therefore its best to fail fast at `start()` when checkpointing is enabled and the checkpoint is not serializable. Author: Tathagata Das <[email protected]> Closes #6292 from tdas/SPARK-7767 and squashes the following commits: 51304e6 [Tathagata Das] Addressed comments. c35237b [Tathagata Das] Added test for checkpoint serialization in StreamingContext.start() (cherry picked from commit 3c434cb) Signed-off-by: Tathagata Das <[email protected]>
1 parent 23356dd commit a502e4b

File tree

4 files changed

+89
-36
lines changed

4 files changed

+89
-36
lines changed

core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.spark.Logging
2929

30-
private[serializer] object SerializationDebugger extends Logging {
30+
private[spark] object SerializationDebugger extends Logging {
3131

3232
/**
3333
* Improve the given NotSerializableException with the serialization path leading from the given

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,44 @@ object Checkpoint extends Logging {
102102
Seq.empty
103103
}
104104
}
105+
106+
/** Serialize the checkpoint, or throw any exception that occurs */
107+
def serialize(checkpoint: Checkpoint, conf: SparkConf): Array[Byte] = {
108+
val compressionCodec = CompressionCodec.createCodec(conf)
109+
val bos = new ByteArrayOutputStream()
110+
val zos = compressionCodec.compressedOutputStream(bos)
111+
val oos = new ObjectOutputStream(zos)
112+
Utils.tryWithSafeFinally {
113+
oos.writeObject(checkpoint)
114+
} {
115+
oos.close()
116+
}
117+
bos.toByteArray
118+
}
119+
120+
/** Deserialize a checkpoint from the input stream, or throw any exception that occurs */
121+
def deserialize(inputStream: InputStream, conf: SparkConf): Checkpoint = {
122+
val compressionCodec = CompressionCodec.createCodec(conf)
123+
var ois: ObjectInputStreamWithLoader = null
124+
Utils.tryWithSafeFinally {
125+
126+
// ObjectInputStream uses the last defined user-defined class loader in the stack
127+
// to find classes, which maybe the wrong class loader. Hence, a inherited version
128+
// of ObjectInputStream is used to explicitly use the current thread's default class
129+
// loader to find and load classes. This is a well know Java issue and has popped up
130+
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
131+
val zis = compressionCodec.compressedInputStream(inputStream)
132+
ois = new ObjectInputStreamWithLoader(zis,
133+
Thread.currentThread().getContextClassLoader)
134+
val cp = ois.readObject.asInstanceOf[Checkpoint]
135+
cp.validate()
136+
cp
137+
} {
138+
if (ois != null) {
139+
ois.close()
140+
}
141+
}
142+
}
105143
}
106144

107145

@@ -189,17 +227,10 @@ class CheckpointWriter(
189227
}
190228

191229
def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
192-
val bos = new ByteArrayOutputStream()
193-
val zos = compressionCodec.compressedOutputStream(bos)
194-
val oos = new ObjectOutputStream(zos)
195-
Utils.tryWithSafeFinally {
196-
oos.writeObject(checkpoint)
197-
} {
198-
oos.close()
199-
}
200230
try {
231+
val bytes = Checkpoint.serialize(checkpoint, conf)
201232
executor.execute(new CheckpointWriteHandler(
202-
checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
233+
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
203234
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
204235
} catch {
205236
case rej: RejectedExecutionException =>
@@ -264,25 +295,8 @@ object CheckpointReader extends Logging {
264295
checkpointFiles.foreach(file => {
265296
logInfo("Attempting to load checkpoint from file " + file)
266297
try {
267-
var ois: ObjectInputStreamWithLoader = null
268-
var cp: Checkpoint = null
269-
Utils.tryWithSafeFinally {
270-
val fis = fs.open(file)
271-
// ObjectInputStream uses the last defined user-defined class loader in the stack
272-
// to find classes, which maybe the wrong class loader. Hence, a inherited version
273-
// of ObjectInputStream is used to explicitly use the current thread's default class
274-
// loader to find and load classes. This is a well know Java issue and has popped up
275-
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
276-
val zis = compressionCodec.compressedInputStream(fis)
277-
ois = new ObjectInputStreamWithLoader(zis,
278-
Thread.currentThread().getContextClassLoader)
279-
cp = ois.readObject.asInstanceOf[Checkpoint]
280-
} {
281-
if (ois != null) {
282-
ois.close()
283-
}
284-
}
285-
cp.validate()
298+
val fis = fs.open(file)
299+
val cp = Checkpoint.deserialize(fis, conf)
286300
logInfo("Checkpoint successfully loaded from file " + file)
287301
logInfo("Checkpoint was generated at time " + cp.checkpointTime)
288302
return Some(cp)

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import java.io.InputStream
20+
import java.io.{InputStream, NotSerializableException}
2121
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
2222

2323
import scala.collection.Map
@@ -35,6 +35,7 @@ import org.apache.spark._
3535
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3636
import org.apache.spark.input.FixedLengthBinaryInputFormat
3737
import org.apache.spark.rdd.{RDD, RDDOperationScope}
38+
import org.apache.spark.serializer.SerializationDebugger
3839
import org.apache.spark.storage.StorageLevel
3940
import org.apache.spark.streaming.StreamingContextState._
4041
import org.apache.spark.streaming.dstream._
@@ -235,6 +236,10 @@ class StreamingContext private[streaming] (
235236
}
236237
}
237238

239+
private[streaming] def isCheckpointingEnabled: Boolean = {
240+
checkpointDir != null
241+
}
242+
238243
private[streaming] def initialCheckpoint: Checkpoint = {
239244
if (isCheckpointPresent) cp_ else null
240245
}
@@ -523,11 +528,26 @@ class StreamingContext private[streaming] (
523528
assert(graph != null, "Graph is null")
524529
graph.validate()
525530

526-
assert(
527-
checkpointDir == null || checkpointDuration != null,
531+
require(
532+
!isCheckpointingEnabled || checkpointDuration != null,
528533
"Checkpoint directory has been set, but the graph checkpointing interval has " +
529534
"not been set. Please use StreamingContext.checkpoint() to set the interval."
530535
)
536+
537+
// Verify whether the DStream checkpoint is serializable
538+
if (isCheckpointingEnabled) {
539+
val checkpoint = new Checkpoint(this, Time.apply(0))
540+
try {
541+
Checkpoint.serialize(checkpoint, conf)
542+
} catch {
543+
case e: NotSerializableException =>
544+
throw new NotSerializableException(
545+
"DStream checkpointing has been enabled but the DStreams with their functions " +
546+
"are not serializable\nSerialization stack:\n" +
547+
SerializationDebugger.find(checkpoint).map("\t- " + _).mkString("\n")
548+
)
549+
}
550+
}
531551
}
532552

533553
/**

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import java.io.File
20+
import java.io.{File, NotSerializableException}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import org.apache.commons.io.FileUtils
24-
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
25-
import org.scalatest.concurrent.Timeouts
2624
import org.scalatest.concurrent.Eventually._
25+
import org.scalatest.concurrent.Timeouts
2726
import org.scalatest.exceptions.TestFailedDueToTimeoutException
2827
import org.scalatest.time.SpanSugar._
28+
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
2929

30-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
3130
import org.apache.spark.storage.StorageLevel
3231
import org.apache.spark.streaming.dstream.DStream
3332
import org.apache.spark.streaming.receiver.Receiver
3433
import org.apache.spark.util.Utils
34+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
3535

3636

3737
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {
@@ -132,6 +132,25 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
132132
}
133133
}
134134

135+
test("start with non-seriazable DStream checkpoints") {
136+
val checkpointDir = Utils.createTempDir()
137+
ssc = new StreamingContext(conf, batchDuration)
138+
ssc.checkpoint(checkpointDir.getAbsolutePath)
139+
addInputStream(ssc).foreachRDD { rdd =>
140+
// Refer to this.appName from inside closure so that this closure refers to
141+
// the instance of StreamingContextSuite, and is therefore not serializable
142+
rdd.count() + appName
143+
}
144+
145+
// Test whether start() fails early when checkpointing is enabled
146+
val exception = intercept[NotSerializableException] {
147+
ssc.start()
148+
}
149+
assert(exception.getMessage().contains("DStreams with their functions are not serializable"))
150+
assert(ssc.getState() !== StreamingContextState.ACTIVE)
151+
assert(StreamingContext.getActive().isEmpty)
152+
}
153+
135154
test("start multiple times") {
136155
ssc = new StreamingContext(master, appName, batchDuration)
137156
addInputStream(ssc).register()

0 commit comments

Comments
 (0)