Skip to content

Commit 795b2cd

Browse files
committed
broke something
1 parent 1e126bf commit 795b2cd

File tree

4 files changed

+132
-14
lines changed

4 files changed

+132
-14
lines changed

python/pyspark/streaming/context.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,49 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
142142

143143
def _testInputStream(self, test_inputs, numSlices=None):
144144
"""
145+
<<<<<<< HEAD
145146
This function is only for test.
146147
This implementation is inspired by QueStream implementation.
147148
Give list of RDD to generate DStream which contains the RDD.
149+
=======
150+
Generate multiple files to make "stream" in Scala side for test.
151+
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
152+
153+
QueStream maybe good way to implement this function
154+
"""
155+
numSlices = numSlices or self._sc.defaultParallelism
156+
# Calling the Java parallelize() method with an ArrayList is too slow,
157+
# because it sends O(n) Py4J commands. As an alternative, serialized
158+
# objects are written to a file and loaded through textFile().
159+
160+
tempFiles = list()
161+
for test_input in test_inputs:
162+
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
163+
164+
# Make sure we distribute data evenly if it's smaller than self.batchSize
165+
if "__len__" not in dir(test_input):
166+
test_input = list(test_input) # Make it a list so we can compute its length
167+
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
168+
if batchSize > 1:
169+
serializer = BatchedSerializer(self._sc._unbatched_serializer,
170+
batchSize)
171+
else:
172+
serializer = self._sc._unbatched_serializer
173+
serializer.dump_stream(test_input, tempFile)
174+
tempFile.close()
175+
tempFiles.append(tempFile.name)
176+
177+
jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
178+
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
179+
jtempFiles,
180+
numSlices).asJavaDStream()
181+
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
182+
183+
def _testInputStream2(self, test_inputs, numSlices=None):
184+
"""
185+
This is inpired by QueStream implementation. Give list of RDD and generate DStream
186+
which contain the RDD.
187+
>>>>>>> broke something
148188
"""
149189
test_rdds = list()
150190
test_rdd_deserializers = list()
@@ -156,4 +196,10 @@ def _testInputStream(self, test_inputs, numSlices=None):
156196
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
157197
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
158198

199+
<<<<<<< HEAD
159200
return DStream(jinput_stream, self, test_rdd_deserializers[0])
201+
=======
202+
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
203+
dstream._test_switch_dserializer(test_rdd_deserializers)
204+
return dstream
205+
>>>>>>> broke something

python/pyspark/streaming/dstream.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from collections import defaultdict
1919
from itertools import chain, ifilter, imap
20+
import time
2021
import operator
2122

2223
from pyspark.serializers import NoOpSerializer,\
@@ -428,20 +429,6 @@ def saveAsTextFile(rdd, time):
428429
# TODO: implemtnt rightOuterJoin
429430

430431

431-
# TODO: implement groupByKey
432-
# TODO: impelment union
433-
# TODO: implement cache
434-
# TODO: implement persist
435-
# TODO: implement repertitions
436-
# TODO: implement saveAsTextFile
437-
# TODO: implement cogroup
438-
# TODO: implement join
439-
# TODO: implement countByValue
440-
# TODO: implement leftOuterJoin
441-
# TODO: implemtnt rightOuterJoin
442-
443-
444-
445432
class PipelinedDStream(DStream):
446433
def __init__(self, prev, func, preservesPartitioning=False):
447434
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/worker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ def main(infile, outfile):
8686
(func, deserializer, serializer) = command
8787
init_time = time.time()
8888
iterator = deserializer.load_stream(infile)
89+
print "deserializer in worker: %s" % str(deserializer)
90+
iterator, walk = itertools.tee(iterator)
91+
if isinstance(walk, int):
92+
print "this is int"
93+
print walk
94+
else:
95+
try:
96+
print list(walk)
97+
except:
98+
print list(walk)
8999
serializer.dump_stream(func(split_index, iterator), outfile)
90100
except Exception:
91101
try:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,78 @@ class PythonTransformedDStream(
206206
}
207207
*/
208208

209+
<<<<<<< HEAD
210+
=======
211+
/**
212+
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
213+
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
214+
* returns the i_th element at the i_th batch under manual clock.
215+
*/
216+
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
217+
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){
218+
219+
def start() {}
220+
221+
def stop() {}
222+
223+
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
224+
logInfo("Computing RDD for time " + validTime)
225+
inputFiles.foreach(logInfo(_))
226+
// make a temporary file
227+
// make empty RDD
228+
val prefix = "spark"
229+
val suffix = ".tmp"
230+
val tempFile = File.createTempFile(prefix, suffix)
231+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
232+
logInfo("Index: " + index)
233+
234+
val selectedInputFile: String = {
235+
if (inputFiles.isEmpty){
236+
tempFile.getAbsolutePath
237+
}else if (index < inputFiles.size()) {
238+
inputFiles.get(index)
239+
} else {
240+
tempFile.getAbsolutePath
241+
}
242+
}
243+
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
244+
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
245+
Some(rdd)
246+
}
247+
248+
val asJavaDStream = JavaDStream.fromDStream(this)
249+
}
250+
251+
/**
252+
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
253+
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
254+
* returns the i_th element at the i_th batch under manual clock.
255+
* This implementation is close to QueStream
256+
*/
257+
258+
class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
259+
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {
260+
261+
def start() {}
262+
263+
def stop() {}
264+
265+
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
266+
val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]]
267+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
268+
val selectedRDD = {
269+
if (inputRDDs.isEmpty) {
270+
emptyRDD
271+
} else if (index < inputRDDs.size()) {
272+
inputRDDs.get(index).rdd
273+
} else {
274+
emptyRDD
275+
}
276+
}
277+
278+
Some(selectedRDD)
279+
}
280+
281+
val asJavaDStream = JavaDStream.fromDStream(this)
282+
}
283+
>>>>>>> broke something

0 commit comments

Comments
 (0)