Skip to content

Commit 3b6d7b0

Browse files
Ken TakagiwaKen Takagiwa
authored andcommitted
implementing transform function in Python
1 parent 571d52d commit 3b6d7b0

File tree

4 files changed

+41
-3
lines changed

4 files changed

+41
-3
lines changed

python/pyspark/mllib/_common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0):
164164
nb = len(ba) - offset
165165
if nb < 5:
166166
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
167-
"which is too short" % nb)
167+
"which is too short" % nb)
168168
if ba[offset] == DENSE_VECTOR_MAGIC:
169169
return _deserialize_dense_vector(ba, offset)
170170
elif ba[offset] == SPARSE_VECTOR_MAGIC:

python/pyspark/streaming/dstream.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def _mergeCombiners(iterator):
9292
return combiners.iteritems()
9393
return shuffled.mapPartitions(_mergeCombiners)
9494

95-
9695
def partitionBy(self, numPartitions, partitionFunc=None):
9796
"""
9897
Return a copy of the DStream partitioned using the specified partitioner.
@@ -141,7 +140,6 @@ def _defaultReducePartitions(self):
141140
142141
"""
143142
# hard code to avoid the error
144-
return 2
145143
if self.ctx._conf.contains("spark.default.parallelism"):
146144
return self.ctx.defaultParallelism
147145
else:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.spark.streaming.api.python
2+
3+
import org.apache.spark.Accumulator
4+
import org.apache.spark.api.python.PythonRDD
5+
import org.apache.spark.broadcast.Broadcast
6+
import org.apache.spark.rdd.RDD
7+
import org.apache.spark.streaming.api.java.JavaDStream
8+
import org.apache.spark.streaming.{Time, Duration}
9+
import org.apache.spark.streaming.dstream.DStream
10+
11+
import scala.reflect.ClassTag
12+
13+
/**
14+
* Created by ken on 7/15/14.
15+
*/
16+
class PythonTransformedDStream[T: ClassTag](
17+
parents: Seq[DStream[T]],
18+
command: Array[Byte],
19+
envVars: JMap[String, String],
20+
pythonIncludes: JList[String],
21+
preservePartitoning: Boolean,
22+
pythonExec: String,
23+
broadcastVars: JList[Broadcast[Array[Byte]]],
24+
accumulator: Accumulator[JList[Array[Byte]]]
25+
) extends DStream[Array[Byte]](parent.ssc) {
26+
27+
override def dependencies = List(parent)
28+
29+
override def slideDuration: Duration = parent.slideDuration
30+
31+
//pythonDStream compute
32+
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33+
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+
Some()
35+
}
36+
val asJavaDStream = JavaDStream.fromDStream(this)
37+
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,12 @@ abstract class DStream[T: ClassTag] (
561561
// because the DStream is reachable from the outer object here, and because
562562
// DStreams can't be serialized with closures, we can't proactively check
563563
// it for serializability and so we pass the optional false to SparkContext.clean
564+
565+
// serialized python
564566
val cleanedF = context.sparkContext.clean(transformFunc, false)
565567
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
566568
assert(rdds.length == 1)
569+
// if transformfunc is fine, it is okay
567570
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
568571
}
569572
new TransformedDStream[U](Seq(this), realTransformFunc)

0 commit comments

Comments
 (0)