Skip to content

Commit 5deb79e

Browse files
committed
added new portabledatastream to code so that it can be serialized correctly
1 parent f032bc0 commit 5deb79e

File tree

5 files changed

+80
-47
lines changed

5 files changed

+80
-47
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
4040
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4141
import org.apache.spark.broadcast.Broadcast
4242
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43-
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat}
43+
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat}
4444
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4545
import org.apache.spark.rdd._
4646
import org.apache.spark.scheduler._
@@ -533,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging {
533533
}
534534

535535
/**
536-
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file
536+
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
537537
* (useful for binary data)
538538
*
539539
*
@@ -544,15 +544,15 @@ class SparkContext(config: SparkConf) extends Logging {
544544
*/
545545
@DeveloperApi
546546
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
547-
RDD[(String, DataInputStream)] = {
547+
RDD[(String, PortableDataStream)] = {
548548
val job = new NewHadoopJob(hadoopConfiguration)
549549
NewFileInputFormat.addInputPath(job, new Path(path))
550550
val updateConf = job.getConfiguration
551551
new BinaryFileRDD(
552552
this,
553553
classOf[StreamInputFormat],
554554
classOf[String],
555-
classOf[DataInputStream],
555+
classOf[PortableDataStream],
556556
updateConf,
557557
minPartitions).setName(path)
558558
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.{Map => JMap}
2323
import java.io.DataInputStream
2424

2525
import org.apache.hadoop.io.{BytesWritable, LongWritable}
26-
import org.apache.spark.input.FixedLengthBinaryInputFormat
26+
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
2727

2828
import scala.collection.JavaConversions
2929
import scala.collection.JavaConversions._
@@ -257,7 +257,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
257257
* @param minPartitions A suggestion value of the minimal splitting number for input data.
258258
*/
259259
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
260-
JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
260+
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
261261

262262
/**
263263
* Read a directory of files as DataInputStream from HDFS,

core/src/main/scala/org/apache/spark/input/RawFileInput.scala

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,40 +51,66 @@ abstract class StreamFileInputFormat[T]
5151
}
5252

5353
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
54-
RecordReader[String,T]
54+
RecordReader[String,T]
5555

5656
}
5757

58+
/**
59+
* A class that allows DataStreams to be serialized and moved around by not creating them
60+
* until they need to be read
61+
* @param split
62+
* @param context
63+
* @param index
64+
*/
65+
class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
66+
extends Serializable {
67+
private var path = ""
68+
private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
69+
private var isOpen = false
70+
71+
def open(): FSDataInputStream= {
72+
val pathp = split.getPath(index)
73+
path = pathp.toString
74+
val fs = pathp.getFileSystem(context.getConfiguration)
75+
fileIn = fs.open(pathp)
76+
isOpen=true
77+
fileIn
78+
}
79+
80+
def close() = {
81+
if (isOpen) {
82+
try {
83+
fileIn.close()
84+
isOpen=false
85+
} catch {
86+
case ioe: java.io.IOException => // do nothing
87+
}
88+
}
89+
}
90+
def getPath(): String = path
91+
}
92+
5893
/**
5994
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
6095
* to reading files out as streams
6196
*/
6297
abstract class StreamBasedRecordReader[T](
63-
split: CombineFileSplit,
64-
context: TaskAttemptContext,
65-
index: Integer)
98+
split: CombineFileSplit,
99+
context: TaskAttemptContext,
100+
index: Integer)
66101
extends RecordReader[String, T] {
67102

68-
private val path = split.getPath(index)
69-
private val fs = path.getFileSystem(context.getConfiguration)
103+
70104

71105
// True means the current file has been processed, then skip it.
72106
private var processed = false
73107

74-
private val key = path.toString
108+
private var key = ""
75109
private var value: T = null.asInstanceOf[T]
76-
// the file to be read when nextkeyvalue is called
77-
private lazy val fileIn: FSDataInputStream = fs.open(path)
110+
78111

79112
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
80-
override def close() = {
81-
// make sure the file is closed
82-
try {
83-
fileIn.close()
84-
} catch {
85-
case ioe: java.io.IOException => // do nothing
86-
}
87-
}
113+
override def close() = {}
88114

89115
override def getProgress = if (processed) 1.0f else 0.0f
90116

@@ -93,10 +119,13 @@ abstract class StreamBasedRecordReader[T](
93119
override def getCurrentValue = value
94120

95121

122+
96123
override def nextKeyValue = {
97124
if (!processed) {
98-
125+
val fileIn = new PortableDataStream(split,context,index)
126+
key = fileIn.getPath
99127
value = parseStream(fileIn)
128+
fileIn.close() // if it has not been open yet, close does nothing
100129
processed = true
101130
true
102131
} else {
@@ -109,29 +138,29 @@ abstract class StreamBasedRecordReader[T](
109138
* @param inStream the stream to be read in
110139
* @return the data formatted as
111140
*/
112-
def parseStream(inStream: DataInputStream): T
141+
def parseStream(inStream: PortableDataStream): T
113142
}
114143

115144
/**
116145
* Reads the record in directly as a stream for other objects to manipulate and handle
117146
*/
118147
private[spark] class StreamRecordReader(
119-
split: CombineFileSplit,
120-
context: TaskAttemptContext,
121-
index: Integer)
122-
extends StreamBasedRecordReader[DataInputStream](split,context,index) {
148+
split: CombineFileSplit,
149+
context: TaskAttemptContext,
150+
index: Integer)
151+
extends StreamBasedRecordReader[PortableDataStream](split,context,index) {
123152

124-
def parseStream(inStream: DataInputStream): DataInputStream = inStream
153+
def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
125154
}
126155

127156
/**
128157
* A class for extracting the information from the file using the
129158
* BinaryRecordReader (as Byte array)
130159
*/
131-
private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
160+
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
132161
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
133162
{
134-
new CombineFileRecordReader[String,DataInputStream](
163+
new CombineFileRecordReader[String,PortableDataStream](
135164
split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]
136165
)
137166
}
@@ -143,12 +172,13 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputSt
143172
* the file as a byte array
144173
*/
145174
abstract class BinaryRecordReader[T](
146-
split: CombineFileSplit,
147-
context: TaskAttemptContext,
148-
index: Integer)
175+
split: CombineFileSplit,
176+
context: TaskAttemptContext,
177+
index: Integer)
149178
extends StreamBasedRecordReader[T](split,context,index) {
150179

151-
def parseStream(inStream: DataInputStream): T = {
180+
def parseStream(inpStream: PortableDataStream): T = {
181+
val inStream = inpStream.open()
152182
val innerBuffer = ByteStreams.toByteArray(inStream)
153183
Closeables.close(inStream, false)
154184
parseByteArray(innerBuffer)
@@ -157,13 +187,14 @@ abstract class BinaryRecordReader[T](
157187
}
158188

159189

190+
160191
private[spark] class ByteRecordReader(
161-
split: CombineFileSplit,
162-
context: TaskAttemptContext,
163-
index: Integer)
192+
split: CombineFileSplit,
193+
context: TaskAttemptContext,
194+
index: Integer)
164195
extends BinaryRecordReader[Array[Byte]](split,context,index) {
165196

166-
def parseByteArray(inArray: Array[Byte]) = inArray
197+
override def parseByteArray(inArray: Array[Byte]) = inArray
167198
}
168199

169200
/**

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
226226

227227
test("binary file input as byte array") {
228228
sc = new SparkContext("local", "test")
229-
val outputDir = new File(tempDir).getAbsolutePath
230-
val outFile = new File(outputDir, "record-bytestream-00000.bin")
231-
val outFileName = outFile.toPath().toString()
229+
val outFile = new File(tempDir, "record-bytestream-00000.bin")
230+
val outFileName = outFile.getAbsolutePath()
232231

233232
// create file
234233
val testOutput = Array[Byte](1,2,3,4,5,6)
@@ -252,10 +251,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
252251
// a fixed length of 6 bytes
253252

254253
sc = new SparkContext("local", "test")
255-
256-
val outputDir = new File(tempDir).getAbsolutePath
257-
val outFile = new File(outputDir, "record-bytestream-00000.bin")
258-
val outFileName = outFile.toPath().toString()
254+
val outFile = new File(tempDir, "record-bytestream-00000.bin")
255+
val outFileName = outFile.getAbsolutePath()
259256

260257
// create file
261258
val testOutput = Array[Byte](1,2,3,4,5,6)

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
<module>bagel</module>
9191
<module>graphx</module>
9292
<module>mllib</module>
93+
<module>imglib</module>
9394
<module>tools</module>
9495
<module>streaming</module>
9596
<module>sql/catalyst</module>
@@ -743,6 +744,10 @@
743744
<artifactId>jackson-mapper-asl</artifactId>
744745
<version>1.8.8</version>
745746
</dependency>
747+
<dependency>
748+
<groupId>io.scif</groupId>
749+
<artifactId>pom-scifio</artifactId>
750+
</dependency>
746751
</dependencies>
747752
</dependencyManagement>
748753

0 commit comments

Comments
 (0)