Skip to content

Commit 0b8b7d0

Browse files
Ken Takagiwagiwa
authored andcommitted
reduceByKey is working
1 parent d25d5cf commit 0b8b7d0

File tree

4 files changed

+41
-11
lines changed

4 files changed

+41
-11
lines changed
1.53 KB
Binary file not shown.

python/pyspark/streaming/dstream.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
118118
keyed = PipelinedDStream(self, add_shuffle_key)
119119
keyed._bypass_serializer = True
120120
with _JavaStackTrace(self.ctx) as st:
121-
#JavaDStream
122-
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
123121
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
124-
id(partitionFunc))
125-
jdstream = pairDStream.partitionBy(partitioner).values()
122+
id(partitionFunc))
123+
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
126124
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
127125
# This is required so that id(partitionFunc) remains unique, even if
128126
# partitionFunc is a lambda:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,30 @@ class PythonDStream[T: ClassTag](
5959
val asJavaDStream = JavaDStream.fromDStream(this)
6060
}
6161

62+
63+
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
64+
DStream[Array[Byte]](prev.ssc){
65+
override def dependencies = List(prev)
66+
67+
override def slideDuration: Duration = prev.slideDuration
68+
69+
override def compute(validTime:Time):Option[RDD[Array[Byte]]]={
70+
prev.getOrCompute(validTime) match{
71+
case Some(rdd)=>Some(rdd)
72+
val pairwiseRDD = new PairwiseRDD(rdd)
73+
/*
74+
* This is equivalent to following python code
75+
* with _JavaStackTrace(self.context) as st:
76+
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
77+
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
78+
* id(partitionFunc))
79+
* jrdd = pairRDD.partitionBy(partitioner).values()
80+
* rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
81+
*/
82+
Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
83+
case None => None
84+
}
85+
}
86+
val asJavaDStream = JavaDStream.fromDStream(this)
87+
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
88+
}

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
/*
2+
13
package org.apache.spark.streaming.api.python
24
35
import org.apache.spark.Accumulator
@@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
1012
1113
import scala.reflect.ClassTag
1214
13-
/**
14-
* Created by ken on 7/15/14.
15-
*/
1615
class PythonTransformedDStream[T: ClassTag](
17-
parents: Seq[DStream[T]],
16+
parent: DStream[T],
1817
command: Array[Byte],
1918
envVars: JMap[String, String],
2019
pythonIncludes: JList[String],
@@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
3029
3130
//pythonDStream compute
3231
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33-
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34-
Some()
32+
33+
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+
// parents.map(_.getOrCompute(validTime).orNull).to
35+
// parent = parents.head.asInstanceOf[RDD]
36+
// Some()
3537
}
36-
val asJavaDStream = JavaDStream.fromDStream(this)
38+
39+
val asJavaDStream = JavaDStream.fromDStream(this)
3740
}
41+
42+
*/

0 commit comments

Comments
 (0)