Skip to content

Commit b6d8001

Browse files
committed
Update RecoverableNetworkWordCount.scala
Ok, I've added ssc.checkpoint(checkpointDirectory) to createContext. First, I wasn't sure that the checkpoin is initiated when the context is recreated from checkpoinDirector. That's why I put it outside createContext.
1 parent 96fe274 commit b6d8001

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam
6969

7070
object RecoverableNetworkWordCount {
7171

72-
def createContext(ip: String, port: Int, outputPath: String) = {
72+
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
7373

7474
// If you do not see this printed, that means the StreamingContext has been loaded
7575
// from the new checkpoint
@@ -79,6 +79,7 @@ object RecoverableNetworkWordCount {
7979
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
8080
// Create the context with a 1 second batch size
8181
val ssc = new StreamingContext(sparkConf, Seconds(1))
82+
ssc.checkpoint(checkpointDirectory)
8283

8384
// Create a socket stream on target ip:port and count the
8485
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -114,9 +115,8 @@ object RecoverableNetworkWordCount {
114115
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
115116
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
116117
() => {
117-
createContext(ip, port, outputPath)
118+
createContext(ip, port, outputPath, checkpointDirectory)
118119
})
119-
ssc.checkpoint(checkpointDirectory)
120120
ssc.start()
121121
ssc.awaitTermination()
122122
}

0 commit comments

Comments
 (0)