Skip to content

Commit 0d0bf29

Browse files
committed
Update checkpoint call as in apache#2735
1 parent 35f23e3 commit 0d0bf29

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@
6969
public final class JavaRecoverableNetworkWordCount {
7070
private static final Pattern SPACE = Pattern.compile(" ");
7171

72-
private static JavaStreamingContext createContext(String ip, int port, String outputPath) {
72+
private static JavaStreamingContext createContext(String ip,
73+
int port,
74+
String checkpointDirectory,
75+
String outputPath) {
7376

7477
// If you do not see this printed, that means the StreamingContext has been loaded
7578
// from the new checkpoint
@@ -81,6 +84,7 @@ private static JavaStreamingContext createContext(String ip, int port, String ou
8184
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
8285
// Create the context with a 1 second batch size
8386
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
87+
ssc.checkpoint(checkpointDirectory);
8488

8589
// Create a socket stream on target ip:port and count the
8690
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -135,12 +139,12 @@ public static void main(String[] args) {
135139

136140
final String ip = args[0];
137141
final int port = Integer.parseInt(args[1]);
138-
String checkpointDirectory = args[2];
142+
final String checkpointDirectory = args[2];
139143
final String outputPath = args[3];
140144
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
141145
@Override
142146
public JavaStreamingContext create() {
143-
return createContext(ip, port, outputPath);
147+
return createContext(ip, port, checkpointDirectory, outputPath);
144148
}
145149
};
146150
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);

0 commit comments

Comments
 (0)