Skip to content

Commit 7d05109

Browse files
Ken TakagiwaKen Takagiwa
authored andcommitted
merge with remote branch
2 parents ae464e0 + 69e9cd3 commit 7d05109

File tree

3 files changed

+30
-1
lines changed

3 files changed

+30
-1
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
133133
"""
134134
return PipelinedDStream(self, f, preservesPartitioning)
135135

136+
def _defaultReducePartitions(self):
137+
"""
138+
139+
"""
140+
# hard code to avoid the error
141+
if self.ctx._conf.contains("spark.default.parallelism"):
142+
return self.ctx.defaultParallelism
143+
else:
144+
return self.getNumPartitions()
145+
146+
return self._jdstream.partitions().size()
147+
148+
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
149+
"""
150+
151+
"""
152+
return PipelinedDStream(self, f, preservesPartitioning)
153+
136154
def _defaultReducePartitions(self):
137155
"""
138156

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,4 @@ DStream[Array[Byte]](prev.ssc){
8484
}
8585
}
8686
val asJavaDStream = JavaDStream.fromDStream(this)
87-
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
8887
}

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
<<<<<<< HEAD
12
/*
23
4+
=======
5+
>>>>>>> 69e9cd33a58b880f96cc9c3e5e62eaa415c49843
36
package org.apache.spark.streaming.api.python
47
58
import org.apache.spark.Accumulator
@@ -12,8 +15,16 @@ import org.apache.spark.streaming.dstream.DStream
1215
1316
import scala.reflect.ClassTag
1417
18+
<<<<<<< HEAD
1519
class PythonTransformedDStream[T: ClassTag](
1620
parent: DStream[T],
21+
=======
22+
/**
23+
* Created by ken on 7/15/14.
24+
*/
25+
class PythonTransformedDStream[T: ClassTag](
26+
parents: Seq[DStream[T]],
27+
>>>>>>> 69e9cd33a58b880f96cc9c3e5e62eaa415c49843
1728
command: Array[Byte],
1829
envVars: JMap[String, String],
1930
pythonIncludes: JList[String],
@@ -29,6 +40,7 @@ class PythonTransformedDStream[T: ClassTag](
2940
3041
//pythonDStream compute
3142
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
43+
<<<<<<< HEAD
3244
3345
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
3446
// parents.map(_.getOrCompute(validTime).orNull).to

0 commit comments

Comments
 (0)