Skip to content

Commit 9cde7c9

Browse files
committed
WIP added test case
1 parent bd3ba53 commit 9cde7c9

File tree

8 files changed

+57
-13
lines changed

8 files changed

+57
-13
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,6 @@ private[spark] object PythonRDD extends Logging {
350350
} catch {
351351
case eof: EOFException => {}
352352
}
353-
println("RDDDD ==================")
354-
println(objs)
355353
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
356354
}
357355

examples/src/main/python/streaming/test_oprations.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@
99
conf = SparkConf()
1010
conf.setAppName("PythonStreamingNetworkWordCount")
1111
ssc = StreamingContext(conf=conf, duration=Seconds(1))
12-
ssc.checkpoint("/tmp/spark_ckp")
1312

14-
test_input = ssc._testInputStream([[1],[1],[1]])
15-
# ssc.checkpoint("/tmp/spark_ckp")
16-
fm_test = test_input.flatMap(lambda x: x.split(" "))
17-
mapped_test = fm_test.map(lambda x: (x, 1))
13+
test_input = ssc._testInputStream([1,2,3])
14+
class buff:
15+
pass
16+
17+
fm_test = test_input.map(lambda x: (x, 1))
18+
fm_test.test_output(buff)
1819

19-
20-
mapped_test.print_()
2120
ssc.start()
22-
# ssc.awaitTermination()
23-
# ssc.stop()
21+
while True:
22+
ssc.awaitTermination(50)
23+
try:
24+
buff.result
25+
break
26+
except AttributeError:
27+
pass
28+
29+
ssc.stop()
30+
print buff.result

python/pyspark/streaming/dstream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def pyprint(self):
236236
operator, so this DStream will be registered as an output stream and there materialized.
237237
"""
238238
def takeAndPrint(rdd, time):
239+
print "take and print ==================="
239240
taken = rdd.take(11)
240241
print "-------------------------------------------"
241242
print "Time: %s" % (str(time))
@@ -420,7 +421,6 @@ def saveAsTextFile(rdd, time):
420421
# TODO: implemtnt rightOuterJoin
421422

422423

423-
424424
class PipelinedDStream(DStream):
425425
def __init__(self, prev, func, preservesPartitioning=False):
426426
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming_tests.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,17 @@ def tearDown(self):
444444
def tearDownClass(cls):
445445
PySparkStreamingTestCase.tearDownClass()
446446

447+
start_time = time.time()
448+
while True:
449+
current_time = time.time()
450+
# check time out
451+
if (current_time - start_time) > self.timeout:
452+
self.ssc.stop()
453+
break
454+
self.ssc.awaitTermination(50)
455+
if buff.result is not None:
456+
break
457+
return buff.result
447458

448459
if __name__ == "__main__":
449460
unittest.main()

python/pyspark/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def main(infile, outfile):
6161
SparkFiles._is_running_on_worker = True
6262

6363
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
64-
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
64+
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
6565
num_python_includes = read_int(infile)
6666
for _ in range(num_python_includes):
6767
filename = utf8_deserializer.loads(infile)

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
5454
dstream.print()
5555
}
5656

57+
def print(label: String = null): Unit = {
58+
dstream.print(label)
59+
}
60+
61+
def outputToFile(): Unit = {
62+
dstream.outputToFile()
63+
}
64+
65+
5766
/**
5867
* Return a new DStream in which each RDD has a single element generated by counting each RDD
5968
* of this DStream.

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class PythonDStream[T: ClassTag](
5555
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
5656
parent.getOrCompute(validTime) match{
5757
case Some(rdd) =>
58+
logInfo("RDD ID in python DStream ===========")
59+
logInfo("RDD id " + rdd.id)
5860
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
5961
Some(pythonRDD.asJavaRDD.rdd)
6062
case None => None

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,23 @@ abstract class DStream[T: ClassTag] (
620620
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
621621
}
622622

623+
624+
def print(label: String = null) {
625+
def foreachFunc = (rdd: RDD[T], time: Time) => {
626+
val first11 = rdd.take(11)
627+
println ("-------------------------------------------")
628+
println ("Time: " + time)
629+
println ("-------------------------------------------")
630+
if(label != null){
631+
println (label)
632+
}
633+
first11.take(10).foreach(println)
634+
if (first11.size > 10) println("...")
635+
println()
636+
}
637+
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
638+
}
639+
623640
/**
624641
* Return a new DStream in which each RDD contains all the elements in seen in a
625642
* sliding window of time over this DStream. The new DStream generates RDDs with

0 commit comments

Comments
 (0)