Skip to content

Commit 92bda0d

Browse files
committed
added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely
1 parent a32fef7 commit 92bda0d

File tree

7 files changed

+66
-36
lines changed

7 files changed

+66
-36
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
256256
*
257257
* @param minPartitions A suggestion value of the minimal splitting number for input data.
258258
*/
259-
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
259+
def binaryFiles(path: String, minPartitions: Int):
260260
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions))
261261

262+
def binaryFiles(path: String):
263+
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions))
264+
262265
/**
263266
* Read a directory of files as DataInputStream from HDFS,
264267
* a local file system (available on all nodes), or any Hadoop-supported file system URI

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
2929
*/
3030

3131
private[spark] object FixedLengthBinaryInputFormat {
32-
3332
/**
3433
* This function retrieves the recordLength by checking the configuration parameter
3534
*
@@ -39,13 +38,10 @@ private[spark] object FixedLengthBinaryInputFormat {
3938
// retrieve record length from configuration
4039
context.getConfiguration.get("recordLength").toInt
4140
}
42-
4341
}
4442

4543
private[spark] class FixedLengthBinaryInputFormat
4644
extends FileInputFormat[LongWritable, BytesWritable] {
47-
48-
4945
/**
5046
* Override of isSplitable to ensure initial computation of the record length
5147
*/
@@ -60,7 +56,6 @@ private[spark] class FixedLengthBinaryInputFormat
6056
} else {
6157
true
6258
}
63-
6459
}
6560

6661
/**
@@ -69,14 +64,11 @@ private[spark] class FixedLengthBinaryInputFormat
6964
* will start at the first byte of a record, and the last byte will the last byte of a record.
7065
*/
7166
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
72-
7367
val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
74-
7568
// If the default size is less than the length of a record, make it equal to it
7669
// Otherwise, make sure the split size is as close to possible as the default size,
7770
// but still contains a complete set of records, with the first record
7871
// starting at the first byte in the split and the last record ending with the last byte
79-
8072
if (defaultSize < recordLength) {
8173
recordLength.toLong
8274
} else {
@@ -91,7 +83,5 @@ private[spark] class FixedLengthBinaryInputFormat
9183
RecordReader[LongWritable, BytesWritable] = {
9284
new FixedLengthBinaryRecordReader
9385
}
94-
9586
var recordLength = -1
96-
9787
}

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,66 +76,48 @@ private[spark] class FixedLengthBinaryRecordReader
7676

7777
// the actual file we will be reading from
7878
val file = fileSplit.getPath
79-
8079
// job configuration
8180
val job = context.getConfiguration
82-
8381
// check compression
8482
val codec = new CompressionCodecFactory(job).getCodec(file)
8583
if (codec != null) {
8684
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
8785
}
88-
8986
// get the record length
9087
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
91-
9288
// get the filesystem
9389
val fs = file.getFileSystem(job)
94-
9590
// open the File
9691
fileInputStream = fs.open(file)
97-
9892
// seek to the splitStart position
9993
fileInputStream.seek(splitStart)
100-
10194
// set our current position
10295
currentPosition = splitStart
103-
10496
}
10597

10698
override def nextKeyValue(): Boolean = {
107-
10899
if (recordKey == null) {
109100
recordKey = new LongWritable()
110101
}
111-
112102
// the key is a linear index of the record, given by the
113103
// position the record starts divided by the record length
114104
recordKey.set(currentPosition / recordLength)
115-
116105
// the recordValue to place the bytes into
117106
if (recordValue == null) {
118107
recordValue = new BytesWritable(new Array[Byte](recordLength))
119108
}
120-
121109
// read a record if the currentPosition is less than the split end
122110
if (currentPosition < splitEnd) {
123-
124111
// setup a buffer to store the record
125112
val buffer = recordValue.getBytes
126-
127113
fileInputStream.read(buffer, 0, recordLength)
128-
129114
// update our current position
130115
currentPosition = currentPosition + recordLength
131-
132116
// return true
133117
return true
134118
}
135-
136119
false
137120
}
138-
139121
var splitStart: Long = 0L
140122
var splitEnd: Long = 0L
141123
var currentPosition: Long = 0L

core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.rdd
1919

20-
/** Allows better control of the partitioning
21-
*
22-
*/
2320
import org.apache.hadoop.conf.{Configurable, Configuration}
2421
import org.apache.hadoop.io.Writable
2522
import org.apache.hadoop.mapreduce._
26-
import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext}
2723
import org.apache.spark.input.StreamFileInputFormat
24+
import org.apache.spark.{Partition, SparkContext}
2825

2926
private[spark] class BinaryFileRDD[T](
3027
sc : SparkContext,

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,6 @@ public void binaryFiles() throws Exception {
844844
// Reusing the wholeText files example
845845
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
846846

847-
848847
String tempDirName = tempDir.getAbsolutePath();
849848
File file1 = new File(tempDirName + "/part-00000");
850849

@@ -866,7 +865,6 @@ public void binaryFilesCaching() throws Exception {
866865
// Reusing the wholeText files example
867866
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
868867

869-
870868
String tempDirName = tempDir.getAbsolutePath();
871869
File file1 = new File(tempDirName + "/part-00000");
872870

@@ -877,7 +875,7 @@ public void binaryFilesCaching() throws Exception {
877875
channel1.write(bbuf);
878876
channel1.close();
879877

880-
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName,3).cache();
878+
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
881879
readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
882880
@Override
883881
public void call(Tuple2<String, PortableDataStream> stringPortableDataStreamTuple2) throws Exception {

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.io.{File, FileWriter}
2121

2222
import org.apache.spark.input.PortableDataStream
23+
import org.apache.spark.storage.StorageLevel
2324

2425
import scala.io.Source
2526

@@ -280,6 +281,37 @@ class FileSuite extends FunSuite with LocalSparkContext {
280281
assert(indata.toArray === testOutput)
281282
}
282283

284+
test("portabledatastream persist disk storage") {
285+
sc = new SparkContext("local", "test")
286+
val outFile = new File(tempDir, "record-bytestream-00000.bin")
287+
val outFileName = outFile.getAbsolutePath()
288+
289+
// create file
290+
val testOutput = Array[Byte](1,2,3,4,5,6)
291+
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
292+
// write data to file
293+
val file = new java.io.FileOutputStream(outFile)
294+
val channel = file.getChannel
295+
channel.write(bbuf)
296+
channel.close()
297+
file.close()
298+
299+
val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
300+
inRdd.foreach{
301+
curData: (String, PortableDataStream) =>
302+
curData._2.toArray() // force the file to read
303+
}
304+
val mappedRdd = inRdd.map{
305+
curData: (String, PortableDataStream) =>
306+
(curData._2.getPath(),curData._2)
307+
}
308+
val (infile: String, indata: PortableDataStream) = mappedRdd.first
309+
310+
// Try reading the output back as an object file
311+
312+
assert(indata.toArray === testOutput)
313+
}
314+
283315
test("portabledatastream flatmap tests") {
284316
sc = new SparkContext("local", "test")
285317
val outFile = new File(tempDir, "record-bytestream-00000.bin")
@@ -348,6 +380,34 @@ class FileSuite extends FunSuite with LocalSparkContext {
348380
assert(indata === testOutput)
349381
}
350382

383+
test ("negative binary record length should raise an exception") {
384+
// a fixed length of 6 bytes
385+
sc = new SparkContext("local", "test")
386+
387+
val outFile = new File(tempDir, "record-bytestream-00000.bin")
388+
val outFileName = outFile.getAbsolutePath()
389+
390+
// create file
391+
val testOutput = Array[Byte](1,2,3,4,5,6)
392+
val testOutputCopies = 10
393+
394+
// write data to file
395+
val file = new java.io.FileOutputStream(outFile)
396+
val channel = file.getChannel
397+
for(i <- 1 to testOutputCopies) {
398+
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
399+
channel.write(bbuf)
400+
}
401+
channel.close()
402+
file.close()
403+
404+
val inRdd = sc.binaryRecords(outFileName, -1)
405+
406+
intercept[SparkException] {
407+
inRdd.count
408+
}
409+
}
410+
351411
test("file caching") {
352412
sc = new SparkContext("local", "test")
353413
val out = new FileWriter(tempDir + "/input")

0 commit comments

Comments
 (0)