Skip to content

Commit 13b8cb4

Browse files
committed
Updated examples
1 parent 7c37c2c commit 13b8cb4

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public static void main(String[] args) {
6565
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
6666
ssc.checkpoint(".");
6767

68-
// Initial RDD input to trackStateByKey
68+
// Initial state RDD input to mapWithState
6969
@SuppressWarnings("unchecked")
7070
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
7171
new Tuple2<String, Integer>("world", 1));
@@ -90,7 +90,7 @@ public Tuple2<String, Integer> call(String s) {
9090
});
9191

9292
// Update the cumulative count function
93-
final Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> trackStateFunc =
93+
final Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> mappingFunc =
9494
new Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
9595

9696
@Override
@@ -104,7 +104,7 @@ public Optional<Tuple2<String, Integer>> call(Time time, String word, Optional<I
104104

105105
// This will give a Dstream made of state (which is the cumulative count of the words)
106106
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
107-
wordsDstream.mapWithState(StateSpec.function(trackStateFunc).initialState(initialRDD));
107+
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
108108

109109
stateDstream.print();
110110
ssc.start();

examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ object StatefulNetworkWordCount {
4949
val ssc = new StreamingContext(sparkConf, Seconds(1))
5050
ssc.checkpoint(".")
5151

52-
// Initial RDD input to trackStateByKey
52+
// Initial state RDD for mapWithState operation
5353
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
5454

5555
// Create a ReceiverInputDStream on target ip:port and count the
@@ -58,17 +58,17 @@ object StatefulNetworkWordCount {
5858
val words = lines.flatMap(_.split(" "))
5959
val wordDstream = words.map(x => (x, 1))
6060

61-
// Update the cumulative count using updateStateByKey
61+
// Update the cumulative count using mapWithState
6262
// This will give a DStream made of state (which is the cumulative count of the words)
63-
val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
63+
val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
6464
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
6565
val output = (word, sum)
6666
state.update(sum)
6767
Some(output)
6868
}
6969

7070
val stateDstream = wordDstream.mapWithState(
71-
StateSpec.function(trackStateFunc).initialState(initialRDD))
71+
StateSpec.function(mappingFunc).initialState(initialRDD))
7272
stateDstream.print()
7373
ssc.start()
7474
ssc.awaitTermination()

0 commit comments

Comments
 (0)