Skip to content

Commit 7136719

Browse files
kmadermateiz
authored andcommitted
[SPARK-2759][CORE] Generic Binary File Support in Spark
The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the ```def parseByteArray(inArray: Array[Byte]): T``` function. As a trivial example ```ByteInputFormat``` and ```ByteRecordReader``` are included which just return the Array[Byte] from a given file. Finally a RDD for ```BinaryFileInputFormat``` (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the ```SparkContext``` so the functions can be easily used by others. A common use case might be to read in a folder ``` sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData)) ``` Author: Kevin Mader <[email protected]> Author: Kevin Mader <[email protected]> Closes apache#1658 from kmader/master and squashes the following commits: 3c49a30 [Kevin Mader] fixing wholetextfileinput to it has the same setMinPartitions function as in BinaryData files 359a096 [Kevin Mader] making the final corrections suggested by @mateiz and renaming a few functions to make their usage clearer 6379be4 [Kevin Mader] reorganizing code 7b9d181 [Kevin Mader] removing developer API, cleaning up imports 8ac288b [Kevin Mader] fixed a single slightly over 100 character line 92bda0d [Kevin Mader] added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely a32fef7 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change 49174d9 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change c27a8f1 [Kevin Mader] jenkins crashed before running anything last time, so making minor change b348ce1 [Kevin Mader] fixed order in check (prefix only appears on jenkins not when I run unit tests locally) 0588737 [Kevin Mader] filename check in "binary file input as byte array" test now ignores prefixes and suffixes which might get added by Hadoop 4163e38 [Kevin Mader] fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes 19812a8 [Kevin Mader] Fixed the serialization issue with PortableDataStream since neither CombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed. 238c83c [Kevin Mader] fixed several scala-style issues, changed structure of binaryFiles, removed excessive classes added new tests. The caching tests still have a serialization issue, but that should be easily fixed as well. 932a206 [Kevin Mader] Update RawFileInput.scala a01c9cf [Kevin Mader] Update RawFileInput.scala 441f79a [Kevin Mader] fixed a few small comments and dependency 12e7be1 [Kevin Mader] removing imglib from maven (definitely not ready yet) 5deb79e [Kevin Mader] added new portabledatastream to code so that it can be serialized correctly f032bc0 [Kevin Mader] fixed bug in path name, renamed tests bc5c0b9 [Kevin Mader] made minor stylistic adjustments from mateiz df8e528 [Kevin Mader] fixed line lengths and changed java test 9a313d5 [Kevin Mader] making classes that needn't be public private, adding automatic file closure, adding new tests edf5829 [Kevin Mader] fixing line lengths, adding new lines f4841dc [Kevin Mader] un-optimizing imports, silly intellij eacfaa6 [Kevin Mader] Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile 1622935 [Kevin Mader] changing the line lengths to make jenkins happy 1cfa38a [Kevin Mader] added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api 84035f1 [Kevin Mader] adding binary and byte file support spark 81c5f12 [Kevin Mader] Merge pull request #1 from apache/master
1 parent ee29ef3 commit 7136719

File tree

10 files changed

+892
-5
lines changed

10 files changed

+892
-5
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import akka.actor.Props
4141
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4242
import org.apache.spark.broadcast.Broadcast
4343
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
44-
import org.apache.spark.input.WholeTextFileInputFormat
44+
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
4545
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4646
import org.apache.spark.rdd._
4747
import org.apache.spark.scheduler._
@@ -533,6 +533,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
533533
minPartitions).setName(path)
534534
}
535535

536+
537+
/**
538+
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
539+
* (useful for binary data)
540+
*
541+
* For example, if you have the following files:
542+
* {{{
543+
* hdfs://a-hdfs-path/part-00000
544+
* hdfs://a-hdfs-path/part-00001
545+
* ...
546+
* hdfs://a-hdfs-path/part-nnnnn
547+
* }}}
548+
*
549+
* Do
550+
* `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
551+
*
552+
* then `rdd` contains
553+
* {{{
554+
* (a-hdfs-path/part-00000, its content)
555+
* (a-hdfs-path/part-00001, its content)
556+
* ...
557+
* (a-hdfs-path/part-nnnnn, its content)
558+
* }}}
559+
*
560+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
561+
*
562+
* @note Small files are preferred; very large files may cause bad performance.
563+
*/
564+
@Experimental
565+
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
566+
RDD[(String, PortableDataStream)] = {
567+
val job = new NewHadoopJob(hadoopConfiguration)
568+
NewFileInputFormat.addInputPath(job, new Path(path))
569+
val updateConf = job.getConfiguration
570+
new BinaryFileRDD(
571+
this,
572+
classOf[StreamInputFormat],
573+
classOf[String],
574+
classOf[PortableDataStream],
575+
updateConf,
576+
minPartitions).setName(path)
577+
}
578+
579+
/**
580+
* Load data from a flat binary file, assuming the length of each record is constant.
581+
*
582+
* @param path Directory to the input data files
583+
* @param recordLength The length at which to split the records
584+
* @return An RDD of data with values, represented as byte arrays
585+
*/
586+
@Experimental
587+
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
588+
: RDD[Array[Byte]] = {
589+
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
590+
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
591+
classOf[FixedLengthBinaryInputFormat],
592+
classOf[LongWritable],
593+
classOf[BytesWritable],
594+
conf=conf)
595+
val data = br.map{ case (k, v) => v.getBytes}
596+
data
597+
}
598+
536599
/**
537600
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
538601
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import java.io.Closeable
2121
import java.util
2222
import java.util.{Map => JMap}
2323

24+
import java.io.DataInputStream
25+
26+
import org.apache.hadoop.io.{BytesWritable, LongWritable}
27+
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
28+
2429
import scala.collection.JavaConversions
2530
import scala.collection.JavaConversions._
2631
import scala.language.implicitConversions
@@ -32,7 +37,8 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
3237
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3338

3439
import org.apache.spark._
35-
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
40+
import org.apache.spark.SparkContext._
41+
import org.apache.spark.annotation.Experimental
3642
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3743
import org.apache.spark.broadcast.Broadcast
3844
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
@@ -202,6 +208,8 @@ class JavaSparkContext(val sc: SparkContext)
202208
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
203209
sc.textFile(path, minPartitions)
204210

211+
212+
205213
/**
206214
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
207215
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
@@ -245,6 +253,78 @@ class JavaSparkContext(val sc: SparkContext)
245253
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
246254
new JavaPairRDD(sc.wholeTextFiles(path))
247255

256+
/**
257+
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
258+
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
259+
* record and returned in a key-value pair, where the key is the path of each file,
260+
* the value is the content of each file.
261+
*
262+
* For example, if you have the following files:
263+
* {{{
264+
* hdfs://a-hdfs-path/part-00000
265+
* hdfs://a-hdfs-path/part-00001
266+
* ...
267+
* hdfs://a-hdfs-path/part-nnnnn
268+
* }}}
269+
*
270+
* Do
271+
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
272+
*
273+
* then `rdd` contains
274+
* {{{
275+
* (a-hdfs-path/part-00000, its content)
276+
* (a-hdfs-path/part-00001, its content)
277+
* ...
278+
* (a-hdfs-path/part-nnnnn, its content)
279+
* }}}
280+
*
281+
* @note Small files are preferred; very large files but may cause bad performance.
282+
*
283+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
284+
*/
285+
def binaryFiles(path: String, minPartitions: Int): JavaPairRDD[String, PortableDataStream] =
286+
new JavaPairRDD(sc.binaryFiles(path, minPartitions))
287+
288+
/**
289+
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
290+
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
291+
* record and returned in a key-value pair, where the key is the path of each file,
292+
* the value is the content of each file.
293+
*
294+
* For example, if you have the following files:
295+
* {{{
296+
* hdfs://a-hdfs-path/part-00000
297+
* hdfs://a-hdfs-path/part-00001
298+
* ...
299+
* hdfs://a-hdfs-path/part-nnnnn
300+
* }}}
301+
*
302+
* Do
303+
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
304+
*
305+
* then `rdd` contains
306+
* {{{
307+
* (a-hdfs-path/part-00000, its content)
308+
* (a-hdfs-path/part-00001, its content)
309+
* ...
310+
* (a-hdfs-path/part-nnnnn, its content)
311+
* }}}
312+
*
313+
* @note Small files are preferred; very large files but may cause bad performance.
314+
*/
315+
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
316+
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
317+
318+
/**
319+
* Load data from a flat binary file, assuming the length of each record is constant.
320+
*
321+
* @param path Directory to the input data files
322+
* @return An RDD of data with values, represented as byte arrays
323+
*/
324+
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
325+
new JavaRDD(sc.binaryRecords(path, recordLength))
326+
}
327+
248328
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
249329
*
250330
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.io.{BytesWritable, LongWritable}
22+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
23+
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
24+
25+
/**
26+
* Custom Input Format for reading and splitting flat binary files that contain records,
27+
* each of which are a fixed size in bytes. The fixed record size is specified through
28+
* a parameter recordLength in the Hadoop configuration.
29+
*/
30+
private[spark] object FixedLengthBinaryInputFormat {
31+
/** Property name to set in Hadoop JobConfs for record length */
32+
val RECORD_LENGTH_PROPERTY = "org.apache.spark.input.FixedLengthBinaryInputFormat.recordLength"
33+
34+
/** Retrieves the record length property from a Hadoop configuration */
35+
def getRecordLength(context: JobContext): Int = {
36+
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
37+
}
38+
}
39+
40+
private[spark] class FixedLengthBinaryInputFormat
41+
extends FileInputFormat[LongWritable, BytesWritable] {
42+
43+
private var recordLength = -1
44+
45+
/**
46+
* Override of isSplitable to ensure initial computation of the record length
47+
*/
48+
override def isSplitable(context: JobContext, filename: Path): Boolean = {
49+
if (recordLength == -1) {
50+
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
51+
}
52+
if (recordLength <= 0) {
53+
println("record length is less than 0, file cannot be split")
54+
false
55+
} else {
56+
true
57+
}
58+
}
59+
60+
/**
61+
* This input format overrides computeSplitSize() to make sure that each split
62+
* only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader
63+
* will start at the first byte of a record, and the last byte will the last byte of a record.
64+
*/
65+
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
66+
val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
67+
// If the default size is less than the length of a record, make it equal to it
68+
// Otherwise, make sure the split size is as close to possible as the default size,
69+
// but still contains a complete set of records, with the first record
70+
// starting at the first byte in the split and the last record ending with the last byte
71+
if (defaultSize < recordLength) {
72+
recordLength.toLong
73+
} else {
74+
(Math.floor(defaultSize / recordLength) * recordLength).toLong
75+
}
76+
}
77+
78+
/**
79+
* Create a FixedLengthBinaryRecordReader
80+
*/
81+
override def createRecordReader(split: InputSplit, context: TaskAttemptContext)
82+
: RecordReader[LongWritable, BytesWritable] = {
83+
new FixedLengthBinaryRecordReader
84+
}
85+
}

0 commit comments

Comments
 (0)