@@ -19,42 +19,28 @@ package org.apache.spark.streaming.api.python
19
19
20
20
import java .util .{List => JList , ArrayList => JArrayList , Map => JMap , Collections }
21
21
22
- import org . apache . spark . api . java .{ JavaSparkContext , JavaPairRDD , JavaRDD }
23
- import org . apache . spark . broadcast . Broadcast
22
+ import scala . reflect . ClassTag
23
+
24
24
import org .apache .spark ._
25
- import org .apache .spark .util .Utils
26
- import java .io ._
27
- import scala .Some
28
- import org .apache .spark .streaming .Duration
29
- import scala .util .control .Breaks ._
30
- import org .apache .spark .broadcast .Broadcast
31
- import scala .Some
32
- import org .apache .spark .streaming .Duration
33
25
import org .apache .spark .rdd .RDD
34
- import org .apache .spark .api .python .PythonRDD
35
-
36
-
26
+ import org .apache .spark .api .python ._
27
+ import org .apache .spark .broadcast .Broadcast
37
28
import org .apache .spark .streaming .{Duration , Time }
38
29
import org .apache .spark .streaming .dstream ._
39
30
import org .apache .spark .streaming .api .java ._
40
- import org .apache .spark .rdd .RDD
41
- import org .apache .spark .api .python ._
42
- import org .apache .spark .api .python .PairwiseRDD
43
-
44
31
45
- import scala .reflect .ClassTag
46
32
47
33
48
34
class PythonDStream [T : ClassTag ](
49
- parent : DStream [T ],
50
- command : Array [Byte ],
51
- envVars : JMap [String , String ],
52
- pythonIncludes : JList [String ],
53
- preservePartitoning : Boolean ,
54
- pythonExec : String ,
55
- broadcastVars : JList [Broadcast [Array [Byte ]]],
56
- accumulator : Accumulator [JList [Array [Byte ]]]
57
- ) extends DStream [Array [Byte ]](parent.ssc) {
35
+ parent : DStream [T ],
36
+ command : Array [Byte ],
37
+ envVars : JMap [String , String ],
38
+ pythonIncludes : JList [String ],
39
+ preservePartitoning : Boolean ,
40
+ pythonExec : String ,
41
+ broadcastVars : JList [Broadcast [Array [Byte ]]],
42
+ accumulator : Accumulator [JList [Array [Byte ]]])
43
+ extends DStream [Array [Byte ]](parent.ssc) {
58
44
59
45
override def dependencies = List (parent)
60
46
@@ -70,84 +56,4 @@ class PythonDStream[T: ClassTag](
70
56
}
71
57
}
72
58
val asJavaDStream = JavaDStream .fromDStream(this )
73
-
74
- /**
75
- * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
76
- * operator, so this PythonDStream will be registered as an output stream and there materialized.
77
- * Since serialized Python object is readable by Python, pyprint writes out binary data to
78
- * temporary file and run python script to deserialized and print the first ten elements
79
- */
80
- private [streaming] def ppyprint () {
81
- def foreachFunc = (rdd : RDD [Array [Byte ]], time : Time ) => {
82
- val iter = rdd.take(11 ).iterator
83
-
84
- // make a temporary file
85
- val prefix = " spark"
86
- val suffix = " .tmp"
87
- val tempFile = File .createTempFile(prefix, suffix)
88
- val tempFileStream = new DataOutputStream (new FileOutputStream (tempFile.getAbsolutePath))
89
- // write out serialized python object
90
- PythonRDD .writeIteratorToStream(iter, tempFileStream)
91
- tempFileStream.close()
92
-
93
- // This value has to be passed from python
94
- // val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
95
- val sparkHome = new ProcessBuilder ().environment().get(" SPARK_HOME" )
96
- // val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
97
- // absolute path to the python script is needed to change because we do not use pysparkstreaming
98
- val pb = new ProcessBuilder (pythonExec, sparkHome + " /python/pysparkstreaming/streaming/pyprint.py" , tempFile.getAbsolutePath)
99
- val workerEnv = pb.environment()
100
-
101
- // envVars also need to be pass
102
- // workerEnv.putAll(envVars)
103
- val pythonPath = sparkHome + " /python/" + File .pathSeparator + workerEnv.get(" PYTHONPATH" )
104
- workerEnv.put(" PYTHONPATH" , pythonPath)
105
- val worker = pb.start()
106
- val is = worker.getInputStream()
107
- val isr = new InputStreamReader (is)
108
- val br = new BufferedReader (isr)
109
-
110
- println (" -------------------------------------------" )
111
- println (" Time: " + time)
112
- println (" -------------------------------------------" )
113
-
114
- // print value from python std out
115
- var line = " "
116
- breakable {
117
- while (true ) {
118
- line = br.readLine()
119
- if (line == null ) break()
120
- println(line)
121
- }
122
- }
123
- // delete temporary file
124
- tempFile.delete()
125
- println()
126
-
127
- }
128
- new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
129
- }
130
- }
131
-
132
- /*
133
- private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
134
- DStream[(Long, Array[Byte])](prev.ssc){
135
- override def dependencies = List(prev)
136
-
137
- override def slideDuration: Duration = prev.slideDuration
138
-
139
- override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
140
- prev.getOrCompute(validTime) match{
141
- case Some(rdd)=>Some(rdd)
142
- val pairwiseRDD = new PairwiseRDD(rdd)
143
- Some(pairwiseRDD.asJavaPairRDD.rdd)
144
- case None => None
145
- }
146
- }
147
- val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this)
148
59
}
149
- */
150
-
151
-
152
-
153
-
0 commit comments