Skip to content

Commit 4f07163

Browse files
tdasgiwa
authored andcommitted
Implemented DStream.foreachRDD in the Python API using Py4J callback server.
1 parent fe02547 commit 4f07163

File tree

5 files changed

+4
-71
lines changed

5 files changed

+4
-71
lines changed

python/pyspark/java_gateway.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def run(self):
102102
EchoOutputThread(proc.stdout).start()
103103

104104
# Connect to the gateway
105-
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
105+
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False, start_callback_server=True)
106106

107107
# Import the classes used by PySpark
108108
java_import(gateway.jvm, "org.apache.spark.SparkConf")

python/pyspark/streaming/dstream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ def pipeline_func(split, iterator):
445445
self._prev_jdstream = prev._prev_jdstream # maintain the pipeline
446446
self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
447447
self.is_cached = False
448-
self.is_checkpointed = False
449448
self._ssc = prev._ssc
450449
self.ctx = prev.ctx
451450
self.prev = prev
@@ -482,4 +481,5 @@ def _jdstream(self):
482481
return self._jdstream_val
483482

484483
def _is_pipelinable(self):
485-
return not (self.is_cached or self.is_checkpointed)
484+
return not (self.is_cached)
485+

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
5454
dstream.print()
5555
}
5656

57-
/**
58-
* Print the first ten elements of each PythonRDD generated in the PythonDStream. This is an output
59-
* operator, so this PythonDStream will be registered as an output stream and there materialized.
60-
* This function is for PythonAPI.
61-
*/
62-
//TODO move this function to PythonDStream
63-
def pyprint() = dstream.pyprint()
64-
6557
/**
6658
* Return a new DStream in which each RDD has a single element generated by counting each RDD
6759
* of this DStream.

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ class PythonTestInputStream3(ssc_ : JavaStreamingContext)
170170

171171
val asJavaDStream = JavaDStream.fromDStream(this)
172172
}
173+
173174
class PythonForeachDStream(
174175
prev: DStream[Array[Byte]],
175176
foreachFunction: PythonRDDFunction

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -620,66 +620,6 @@ abstract class DStream[T: ClassTag] (
620620
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
621621
}
622622

623-
//TODO: move pyprint to PythonDStream and executed by py4j call back function
624-
/**
625-
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
626-
* operator, so this PythonDStream will be registered as an output stream and there materialized.
627-
* Since serialized Python object is readable by Python, pyprint writes out binary data to
628-
* temporary file and run python script to deserialized and print the first ten elements
629-
*
630-
* Currently call python script directly. We should avoid this
631-
*/
632-
private[streaming] def pyprint() {
633-
def foreachFunc = (rdd: RDD[T], time: Time) => {
634-
val iter = rdd.take(11).iterator
635-
636-
// Generate a temporary file
637-
val prefix = "spark"
638-
val suffix = ".tmp"
639-
val tempFile = File.createTempFile(prefix, suffix)
640-
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
641-
// Write out serialized python object to temporary file
642-
PythonRDD.writeIteratorToStream(iter, tempFileStream)
643-
tempFileStream.close()
644-
645-
// pythonExec should be passed from python. Move pyprint to PythonDStream
646-
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
647-
648-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
649-
// Call python script to deserialize and print result in stdout
650-
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)
651-
val workerEnv = pb.environment()
652-
653-
// envVars also should be pass from python
654-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
655-
workerEnv.put("PYTHONPATH", pythonPath)
656-
val worker = pb.start()
657-
val is = worker.getInputStream()
658-
val isr = new InputStreamReader(is)
659-
val br = new BufferedReader(isr)
660-
661-
println ("-------------------------------------------")
662-
println ("Time: " + time)
663-
println ("-------------------------------------------")
664-
665-
// Print values which is from python std out
666-
var line = ""
667-
breakable {
668-
while (true) {
669-
line = br.readLine()
670-
if (line == null) break()
671-
println(line)
672-
}
673-
}
674-
// Delete temporary file
675-
tempFile.delete()
676-
println()
677-
678-
}
679-
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
680-
}
681-
682-
683623
/**
684624
* Return a new DStream in which each RDD contains all the elements in seen in a
685625
* sliding window of time over this DStream. The new DStream generates RDDs with

0 commit comments

Comments
 (0)