Skip to content

Commit 6d29b7a

Browse files
committed
Fix example
1 parent 6f5c694 commit 6d29b7a

File tree

3 files changed

+10
-13
lines changed

3 files changed

+10
-13
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,19 @@ public Tuple2<String, Integer> call(String s) {
9090
});
9191

9292
// Update the cumulative count function
93-
final Function3<String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> mappingFunc =
94-
new Function3<String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
93+
final Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
94+
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
9595

9696
@Override
97-
public Optional<Tuple2<String, Integer>> call(String word, Optional<Integer> one, State<Integer> state) {
97+
public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
9898
int sum = one.or(0) + (state.exists() ? state.get() : 0);
9999
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
100100
state.update(sum);
101-
return Optional.of(output);
101+
return output;
102102
}
103103
};
104104

105-
// This will give a Dstream made of state (which is the cumulative count of the words)
105+
// DStream made of get cumulative counts that get updated in every batch
106106
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
107107
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
108108

streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
6161
* };
6262
*
6363
* JavaMapWithStateDStream<String, Integer, Integer, String> mapWithStateDStream =
64-
* keyValueDStream.<Integer, String>mapWithState(
65-
* StateSpec.function(mappingFunction).numPartitions(10));
64+
* keyValueDStream.mapWithState(StateSpec.function(mappingFunc));
6665
* }}}
6766
*
6867
* @tparam KeyType Class of the state key
@@ -137,9 +136,8 @@ sealed abstract class StateSpec[KeyType, ValueType, StateType, MappedType] exten
137136
* };
138137
*
139138
* JavaMapWithStateDStream<String, Integer, Integer, String> mapWithStateDStream =
140-
* keyValueDStream.<Integer, String>mapWithState(
141-
* StateSpec.function(mappingFunction).numPartitions(10));
142-
* }}}
139+
* keyValueDStream.mapWithState(StateSpec.function(mappingFunc));
140+
*}}}
143141
*/
144142
@Experimental
145143
object StateSpec {

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,9 +449,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
449449
* };
450450
*
451451
* JavaMapWithStateDStream<String, Integer, Integer, String> mapWithStateDStream =
452-
* keyValueDStream.<Integer, String>mapWithState(
453-
* StateSpec.function(mappingFunction).numPartitions(10));
454-
* }}}
452+
* keyValueDStream.mapWithState(StateSpec.function(mappingFunc));
453+
*}}}
455454
*
456455
* @param spec Specification of this transformation
457456
* @tparam StateType Class type of the state data

0 commit comments

Comments
 (0)