Skip to content

Commit 08a6957

Browse files
srowentdas
authored andcommitted
SPARK-2548 [STREAMING] JavaRecoverableWordCount is missing
Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe. Author: Sean Owen <[email protected]> Closes #2564 from srowen/SPARK-2548 and squashes the following commits: 0d0bf29 [Sean Owen] Update checkpoint call as in apache/spark#2735 35f23e3 [Sean Owen] Remove old comment about running in standalone mode 179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples
1 parent b758358 commit 08a6957

File tree

3 files changed

+159
-17
lines changed

3 files changed

+159
-17
lines changed

src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.spark.api.java.function.Function2;
2626
import org.apache.spark.api.java.function.PairFunction;
2727
import org.apache.spark.api.java.StorageLevels;
28-
import org.apache.spark.streaming.Duration;
28+
import org.apache.spark.streaming.Durations;
2929
import org.apache.spark.streaming.api.java.JavaDStream;
3030
import org.apache.spark.streaming.api.java.JavaPairDStream;
3131
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -35,8 +35,9 @@
3535

3636
/**
3737
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
38+
*
3839
* Usage: JavaNetworkWordCount <hostname> <port>
39-
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
40+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
4041
*
4142
* To run this on your local machine, you need to first run a Netcat server
4243
* `$ nc -lk 9999`
@@ -56,7 +57,7 @@ public static void main(String[] args) {
5657

5758
// Create the context with a 1 second batch size
5859
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
59-
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
60+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
6061

6162
// Create a JavaReceiverInputDStream on target ip:port and count the
6263
// words in input stream of \n delimited text (eg. generated by 'nc')
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
import java.nio.charset.Charset;
23+
import java.util.Arrays;
24+
import java.util.regex.Pattern;
25+
26+
import scala.Tuple2;
27+
import com.google.common.collect.Lists;
28+
import com.google.common.io.Files;
29+
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.api.java.JavaPairRDD;
32+
import org.apache.spark.api.java.function.FlatMapFunction;
33+
import org.apache.spark.api.java.function.Function2;
34+
import org.apache.spark.api.java.function.PairFunction;
35+
import org.apache.spark.streaming.Durations;
36+
import org.apache.spark.streaming.Time;
37+
import org.apache.spark.streaming.api.java.JavaDStream;
38+
import org.apache.spark.streaming.api.java.JavaPairDStream;
39+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
40+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
41+
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
42+
43+
/**
44+
* Counts words in text encoded with UTF8 received from the network every second.
45+
*
46+
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
47+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
48+
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
49+
* <output-file> file to which the word counts will be appended
50+
*
51+
* <checkpoint-directory> and <output-file> must be absolute paths
52+
*
53+
* To run this on your local machine, you need to first run a Netcat server
54+
*
55+
* `$ nc -lk 9999`
56+
*
57+
* and run the example as
58+
*
59+
* `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
60+
* localhost 9999 ~/checkpoint/ ~/out`
61+
*
62+
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
63+
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
64+
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
65+
* the checkpoint data.
66+
*
67+
* Refer to the online documentation for more details.
68+
*/
69+
public final class JavaRecoverableNetworkWordCount {
70+
private static final Pattern SPACE = Pattern.compile(" ");
71+
72+
private static JavaStreamingContext createContext(String ip,
73+
int port,
74+
String checkpointDirectory,
75+
String outputPath) {
76+
77+
// If you do not see this printed, that means the StreamingContext has been loaded
78+
// from the new checkpoint
79+
System.out.println("Creating new context");
80+
final File outputFile = new File(outputPath);
81+
if (outputFile.exists()) {
82+
outputFile.delete();
83+
}
84+
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
85+
// Create the context with a 1 second batch size
86+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
87+
ssc.checkpoint(checkpointDirectory);
88+
89+
// Create a socket stream on target ip:port and count the
90+
// words in input stream of \n delimited text (eg. generated by 'nc')
91+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
92+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
93+
@Override
94+
public Iterable<String> call(String x) {
95+
return Lists.newArrayList(SPACE.split(x));
96+
}
97+
});
98+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
99+
new PairFunction<String, String, Integer>() {
100+
@Override
101+
public Tuple2<String, Integer> call(String s) {
102+
return new Tuple2<String, Integer>(s, 1);
103+
}
104+
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
105+
@Override
106+
public Integer call(Integer i1, Integer i2) {
107+
return i1 + i2;
108+
}
109+
});
110+
111+
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
112+
@Override
113+
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
114+
String counts = "Counts at time " + time + " " + rdd.collect();
115+
System.out.println(counts);
116+
System.out.println("Appending to " + outputFile.getAbsolutePath());
117+
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
118+
return null;
119+
}
120+
});
121+
122+
return ssc;
123+
}
124+
125+
public static void main(String[] args) {
126+
if (args.length != 4) {
127+
System.err.println("You arguments were " + Arrays.asList(args));
128+
System.err.println(
129+
"Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
130+
" <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
131+
" Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
132+
" HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
133+
" the word counts will be appended\n" +
134+
"\n" +
135+
"In local mode, <master> should be 'local[n]' with n > 1\n" +
136+
"Both <checkpoint-directory> and <output-file> must be absolute paths");
137+
System.exit(1);
138+
}
139+
140+
final String ip = args[0];
141+
final int port = Integer.parseInt(args[1]);
142+
final String checkpointDirectory = args[2];
143+
final String outputPath = args[3];
144+
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
145+
@Override
146+
public JavaStreamingContext create() {
147+
return createContext(ip, port, checkpointDirectory, outputPath);
148+
}
149+
};
150+
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
151+
ssc.start();
152+
ssc.awaitTermination();
153+
}
154+
}

src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import org.apache.spark.util.IntParam
3131
/**
3232
* Counts words in text encoded with UTF8 received from the network every second.
3333
*
34-
* Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
34+
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
3535
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
3636
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
3737
* <output-file> file to which the word counts will be appended
3838
*
39-
* In local mode, <master> should be 'local[n]' with n > 1
4039
* <checkpoint-directory> and <output-file> must be absolute paths
4140
*
42-
*
4341
* To run this on your local machine, you need to first run a Netcat server
4442
*
4543
* `$ nc -lk 9999`
@@ -54,19 +52,8 @@ import org.apache.spark.util.IntParam
5452
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
5553
* the checkpoint data.
5654
*
57-
* To run this example in a local standalone cluster with automatic driver recovery,
58-
*
59-
* `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
60-
* <path-to-examples-jar> \
61-
* org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
62-
* localhost 9999 ~/checkpoint ~/out`
63-
*
64-
* <path-to-examples-jar> would typically be
65-
* <spark-dir>/examples/target/scala-XX/spark-examples....jar
66-
*
6755
* Refer to the online documentation for more details.
6856
*/
69-
7057
object RecoverableNetworkWordCount {
7158

7259
def createContext(ip: String, port: Int, outputPath: String) = {

0 commit comments

Comments
 (0)