Skip to content

Commit 4e9f778

Browse files
committed
Changed signature
1 parent 13b8cb4 commit 4e9f778

File tree

9 files changed

+81
-63
lines changed

9 files changed

+81
-63
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ public Tuple2<String, Integer> call(String s) {
9090
});
9191

9292
// Update the cumulative count function
93-
final Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> mappingFunc =
94-
new Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
93+
final Function3<String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> mappingFunc =
94+
new Function3<String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
9595

9696
@Override
97-
public Optional<Tuple2<String, Integer>> call(Time time, String word, Optional<Integer> one, State<Integer> state) {
97+
public Optional<Tuple2<String, Integer>> call(String word, Optional<Integer> one, State<Integer> state) {
9898
int sum = one.or(0) + (state.exists() ? state.get() : 0);
9999
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
100100
state.update(sum);

examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ object StatefulNetworkWordCount {
6060

6161
// Update the cumulative count using mapWithState
6262
// This will give a DStream made of state (which is the cumulative count of the words)
63-
val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
63+
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
6464
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
6565
val output = (word, sum)
6666
state.update(sum)
67-
Some(output)
67+
output
6868
}
6969

7070
val stateDstream = wordDstream.mapWithState(

streaming/src/main/scala/org/apache/spark/streaming/State.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import org.apache.spark.annotation.Experimental
2929
*
3030
* Scala example of using `State`:
3131
* {{{
32-
* // A mapping function that maintains an integer state and return a String
33-
* def mappingFunction(data: Option[Int], state: State[Int]): Option[String] = {
32+
* // A mapping function that maintains an integer state and returns a String
33+
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
3434
* // Check if state exists
3535
* if (state.exists) {
3636
* val existingState = state.get // Get the existing state
@@ -52,12 +52,12 @@ import org.apache.spark.annotation.Experimental
5252
*
5353
* Java example of using `State`:
5454
* {{{
55-
* // A mapping function that maintains an integer state and return a String
56-
* Function2<Optional<Integer>, State<Integer>, Optional<String>> mappingFunction =
57-
* new Function2<Optional<Integer>, State<Integer>, Optional<String>>() {
55+
* // A mapping function that maintains an integer state and returns a String
56+
* Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
57+
* new Function3<String, Optional<Integer>, State<Integer>, String>() {
5858
*
5959
* @Override
60-
* public Optional<String> call(Optional<Integer> one, State<Integer> state) {
60+
* public String call(String key, Optional<Integer> value, State<Integer> state) {
6161
* if (state.exists()) {
6262
* int existingState = state.get(); // Get the existing state
6363
* boolean shouldRemove = ...; // Decide whether to remove the state

streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.streaming
2020
import com.google.common.base.Optional
2121
import org.apache.spark.annotation.Experimental
2222
import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
23-
import org.apache.spark.api.java.function.{Function2 => JFunction2, Function4 => JFunction4}
23+
import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4}
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.util.ClosureCleaner
2626
import org.apache.spark.{HashPartitioner, Partitioner}
@@ -37,8 +37,10 @@ import org.apache.spark.{HashPartitioner, Partitioner}
3737
*
3838
* Example in Scala:
3939
* {{{
40-
* def mappingFunction(data: Option[ValueType], wrappedState: State[StateType]): MappedType = {
41-
* ...
40+
* // A mapping function that maintains an integer state and return a String
41+
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
42+
* // Use state.exists(), state.get(), state.update() and state.remove()
43+
* // to manage state, and return the necessary string
4244
* }
4345
*
4446
* val spec = StateSpec.function(mappingFunction).numPartitions(10)
@@ -48,12 +50,19 @@ import org.apache.spark.{HashPartitioner, Partitioner}
4850
*
4951
* Example in Java:
5052
* {{{
51-
* StateSpec<KeyType, ValueType, StateType, MappedType> spec =
52-
* StateSpec.<KeyType, ValueType, StateType, MappedType>function(mappingFunction)
53-
* .numPartition(10);
53+
* // A mapping function that maintains an integer state and return a string
54+
* Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
55+
* new Function3<String, Optional<Integer>, State<Integer>, String>() {
56+
* @Override
57+
* public Optional<String> call(Optional<Integer> value, State<Integer> state) {
58+
* // Use state.exists(), state.get(), state.update() and state.remove()
59+
* // to manage state, and return the necessary string
60+
* }
61+
* };
5462
*
55-
* JavaMapWithStateDStream<KeyType, ValueType, StateType, MappedType> mapWithStateDStream =
56-
* javaPairDStream.<StateType, MappedType>mapWithState(spec);
63+
* JavaMapWithStateDStream<Integer, Integer, Integer, String> mapWithStateDStream =
64+
* keyValueDStream.<Integer, String>mapWithState(
65+
* StateSpec.function(mappingFunction).numPartitions(10));
5766
* }}}
5867
*
5968
* @tparam KeyType Class of the state key
@@ -84,8 +93,8 @@ sealed abstract class StateSpec[KeyType, ValueType, StateType, MappedType] exten
8493

8594
/**
8695
* Set the duration after which the state of an idle key will be removed. A key and its state is
87-
* considered idle if it has not received any data for at least the given duration. The state
88-
* tracking function will be called one final time on the idle states that are going to be
96+
* considered idle if it has not received any data for at least the given duration. The
97+
* mapping function will be called one final time on the idle states that are going to be
8998
* removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
9099
* to `true` in that call.
91100
*/
@@ -104,8 +113,10 @@ sealed abstract class StateSpec[KeyType, ValueType, StateType, MappedType] exten
104113
*
105114
* Example in Scala:
106115
* {{{
107-
* def mappingFunction(data: Option[ValueType], wrappedState: State[StateType]): MappedType = {
108-
* ...
116+
* // A mapping function that maintains an integer state and return a String
117+
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
118+
* // Use state.exists(), state.get(), state.update() and state.remove()
119+
* // to manage state, and return the necessary string
109120
* }
110121
*
111122
* val spec = StateSpec.function(mappingFunction).numPartitions(10)
@@ -115,12 +126,19 @@ sealed abstract class StateSpec[KeyType, ValueType, StateType, MappedType] exten
115126
*
116127
* Example in Java:
117128
* {{{
118-
* StateSpec<KeyType, ValueType, StateType, MappedType> spec =
119-
* StateSpec.<KeyType, ValueType, StateType, MappedType>function(mappingFunction)
120-
* .numPartition(10);
129+
* // A mapping function that maintains an integer state and return a string
130+
* Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
131+
* new Function3<String, Optional<Integer>, State<Integer>, String>() {
132+
* @Override
133+
* public Optional<String> call(Optional<Integer> value, State<Integer> state) {
134+
* // Use state.exists(), state.get(), state.update() and state.remove()
135+
* // to manage state, and return the necessary string
136+
* }
137+
* };
121138
*
122-
* JavaMapWithStateDStream<KeyType, ValueType, StateType, MappedType> mapWithStateDStream =
123-
* javaPairDStream.<StateType, MappedType>mapWithState(spec);
139+
* JavaMapWithStateDStream<Integer, Integer, Integer, String> mapWithStateDStream =
140+
* keyValueDStream.<Integer, String>mapWithState(
141+
* StateSpec.function(mappingFunction).numPartitions(10));
124142
* }}}
125143
*/
126144
@Experimental
@@ -156,12 +174,12 @@ object StateSpec {
156174
* @tparam MappedType Class of the mapped data
157175
*/
158176
def function[KeyType, ValueType, StateType, MappedType](
159-
mappingFunction: (Option[ValueType], State[StateType]) => MappedType
177+
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
160178
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
161179
ClosureCleaner.clean(mappingFunction, checkSerializable = true)
162180
val wrappedFunction =
163-
(time: Time, key: Any, value: Option[ValueType], state: State[StateType]) => {
164-
Some(mappingFunction(value, state))
181+
(time: Time, key: KeyType, value: Option[ValueType], state: State[StateType]) => {
182+
Some(mappingFunction(key, value, state))
165183
}
166184
new StateSpecImpl(wrappedFunction)
167185
}
@@ -181,11 +199,11 @@ object StateSpec {
181199
def function[KeyType, ValueType, StateType, MappedType](mappingFunction:
182200
JFunction4[Time, KeyType, Optional[ValueType], State[StateType], Optional[MappedType]]):
183201
StateSpec[KeyType, ValueType, StateType, MappedType] = {
184-
val trackingFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => {
202+
val wrappedFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => {
185203
val t = mappingFunction.call(time, k, JavaUtils.optionToOptional(v), s)
186204
Option(t.orNull)
187205
}
188-
StateSpec.function(trackingFunc)
206+
StateSpec.function(wrappedFunc)
189207
}
190208

191209
/**
@@ -200,12 +218,12 @@ object StateSpec {
200218
* @tparam MappedType Class of the mapped data
201219
*/
202220
def function[KeyType, ValueType, StateType, MappedType](
203-
mappingFunction: JFunction2[Optional[ValueType], State[StateType], MappedType]):
221+
mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
204222
StateSpec[KeyType, ValueType, StateType, MappedType] = {
205-
val trackingFunc = (v: Option[ValueType], s: State[StateType]) => {
206-
mappingFunction.call(Optional.fromNullable(v.get), s)
223+
val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
224+
mappingFunction.call(k, Optional.fromNullable(v.get), s)
207225
}
208-
StateSpec.function(trackingFunc)
226+
StateSpec.function(wrappedFunc)
209227
}
210228
}
211229

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
439439
* Example of using `mapWithState`:
440440
* {{{
441441
* // A mapping function that maintains an integer state and return a string
442-
* Function2<Optional<Integer>, State<Integer>, Optional<String>> mappingFunction =
443-
* new Function2<Optional<Integer>, State<Integer>, Optional<String>>() {
442+
* Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
443+
* new Function3<String, Optional<Integer>, State<Integer>, String>() {
444444
* @Override
445445
* public Optional<String> call(Optional<Integer> value, State<Integer> state) {
446446
* // Use state.exists(), state.get(), state.update() and state.remove()
@@ -455,7 +455,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
455455
*
456456
* @param spec Specification of this transformation
457457
* @tparam StateType Class type of the state data
458-
* @tparam MappedType Class type of mapped data
458+
* @tparam MappedType Class type of the mapped data
459459
*/
460460
@Experimental
461461
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]):

streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import org.apache.spark.streaming.dstream.InternalMapWithStateDStream._
3434
* Additionally, it also gives access to the stream of state snapshots, that is, the state data of
3535
* all keys after a batch has updated them.
3636
*
37-
* @tparam KeyType Class of the state key
38-
* @tparam ValueType Class of the state value
37+
* @tparam KeyType Class of the key
38+
* @tparam ValueType Class of the value
3939
* @tparam StateType Class of the state data
4040
* @tparam MappedType Class of the mapped data
4141
*/

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,20 +360,20 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
360360
*
361361
* Example of using `mapWithState`:
362362
* {{{
363-
* // A mapping function that maintains an integer state and return a string
364-
* def mappingFunction(data: Option[Int], wrappedState: State[Int]): String = {
365-
* // Use state.exists(), state.get(), state.update() and state.remove()
366-
* // to manage state, and return the necessary string
363+
* // A mapping function that maintains an integer state and return a String
364+
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
365+
* // Use state.exists(), state.get(), state.update() and state.remove()
366+
* // to manage state, and return the necessary string
367367
* }
368368
*
369369
* val spec = StateSpec.function(mappingFunction).numPartitions(10)
370370
*
371-
* val mapWithStateDStream = keyValueDStream.mapWithState[Int, String](spec)
371+
* val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
372372
* }}}
373373
*
374374
* @param spec Specification of this transformation
375-
* @tparam StateType Class type of the state
376-
* @tparam MappedType Class type of the tranformed data return by the tracking function
375+
* @tparam StateType Class type of the state data
376+
* @tparam MappedType Class type of the mapped data
377377
*/
378378
@Experimental
379379
def mapWithState[StateType: ClassTag, MappedType: ClassTag](

streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
import org.apache.spark.HashPartitioner;
3939
import org.apache.spark.api.java.JavaPairRDD;
40-
import org.apache.spark.api.java.function.Function2;
40+
import org.apache.spark.api.java.function.Function3;
4141
import org.apache.spark.api.java.function.Function4;
4242
import org.apache.spark.streaming.api.java.JavaPairDStream;
4343
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
@@ -78,11 +78,11 @@ public Optional<Double> call(
7878

7979
JavaPairDStream<String, Boolean> stateSnapshots = stateDstream.stateSnapshots();
8080

81-
final Function2<Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
82-
new Function2<Optional<Integer>, State<Boolean>, Double>() {
81+
final Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
82+
new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
8383

8484
@Override
85-
public Double call(Optional<Integer> one, State<Boolean> state) {
85+
public Double call(String key, Optional<Integer> one, State<Boolean> state) {
8686
// Use all State's methods here
8787
state.exists();
8888
state.get();
@@ -148,11 +148,11 @@ public void testBasicFunction() {
148148
new Tuple2<String, Integer>("c", 1))
149149
);
150150

151-
Function2<Optional<Integer>, State<Integer>, Integer> mappingFunc =
152-
new Function2<Optional<Integer>, State<Integer>, Integer>() {
151+
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
152+
new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
153153

154154
@Override
155-
public Integer call(Optional<Integer> value, State<Integer> state) throws Exception {
155+
public Integer call(String key, Optional<Integer> value, State<Integer> state) throws Exception {
156156
int sum = value.or(0) + (state.exists() ? state.get() : 0);
157157
state.update(sum);
158158
return sum;
@@ -172,7 +172,7 @@ private <K, S, T> void testOperation(
172172
List<Set<Tuple2<K, S>>> expectedStateSnapshots) {
173173
int numBatches = expectedOutputs.size();
174174
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
175-
JavaMapWithStateDStream<K, Integer, S, T> trackeStateStream =
175+
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
176176
JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
177177
@Override
178178
public Tuple2<K, Integer> call(K x) throws Exception {
@@ -182,7 +182,7 @@ public Tuple2<K, Integer> call(K x) throws Exception {
182182

183183
final List<Set<T>> collectedOutputs =
184184
Collections.synchronizedList(Lists.<Set<T>>newArrayList());
185-
trackeStateStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
185+
mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
186186
@Override
187187
public Void call(JavaRDD<T> rdd) throws Exception {
188188
collectedOutputs.add(Sets.newHashSet(rdd.collect()));
@@ -191,7 +191,7 @@ public Void call(JavaRDD<T> rdd) throws Exception {
191191
});
192192
final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
193193
Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList());
194-
trackeStateStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
194+
mapWithStateDStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
195195
@Override
196196
public Void call(JavaPairRDD<K, S> rdd) throws Exception {
197197
collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));

streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class MapWithStateSuite extends SparkFunSuite
164164
)
165165

166166
// state maintains running count, and updated count is returned
167-
val mappingFunc = (value: Option[Int], state: State[Int]) => {
167+
val mappingFunc = (key: String, value: Option[Int], state: State[Int]) => {
168168
val sum = value.getOrElse(0) + state.getOption.getOrElse(0)
169169
state.update(sum)
170170
sum
@@ -221,7 +221,7 @@ class MapWithStateSuite extends SparkFunSuite
221221
test("mapWithState - type inferencing and class tags") {
222222

223223
// Simple track state function with value as Int, state as Double and mapped type as Double
224-
val simpleFunc = (value: Option[Int], state: State[Double]) => {
224+
val simpleFunc = (key: String, value: Option[Int], state: State[Double]) => {
225225
0L
226226
}
227227

@@ -451,7 +451,7 @@ class MapWithStateSuite extends SparkFunSuite
451451

452452
try {
453453
val inputStream = new TestInputStream(ssc, Seq.empty[Seq[Int]], 2).map(_ -> 1)
454-
val dummyFunc = (value: Option[Int], state: State[Int]) => 0
454+
val dummyFunc = (key: Int, value: Option[Int], state: State[Int]) => 0
455455
val mapWithStateStream = inputStream.mapWithState(StateSpec.function(dummyFunc))
456456
val internalmapWithStateStream = mapWithStateStream invokePrivate privateMethod()
457457

@@ -505,7 +505,7 @@ class MapWithStateSuite extends SparkFunSuite
505505

506506
val checkpointDuration = batchDuration * (stateData.size / 2)
507507

508-
val runningCount = (value: Option[Int], state: State[Int]) => {
508+
val runningCount = (key: String, value: Option[Int], state: State[Int]) => {
509509
state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
510510
state.get()
511511
}

0 commit comments

Comments
 (0)