Skip to content

Commit 359a096

Browse files
committed
making the final corrections suggested by @mateiz and renaming a few functions to make their usage clearer
1 parent 6379be4 commit 359a096

File tree

5 files changed

+17
-52
lines changed

5 files changed

+17
-52
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,6 @@ class SparkContext(config: SparkConf) extends Logging {
518518
*
519519
* @param minPartitions A suggestion value of the minimal splitting number for input data.
520520
*
521-
* @note Care must be taken to close the files afterwards
522521
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
523522
*/
524523
@DeveloperApi

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -263,39 +263,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
263263
JavaPairRDD[String,PortableDataStream] =
264264
new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions))
265265

266-
/**
267-
* Read a directory of files as DataInputStream from HDFS,
268-
* a local file system (available on all nodes), or any Hadoop-supported file system URI
269-
* as a byte array. Each file is read as a single record and returned in a
270-
* key-value pair, where the key is the path of each file, the value is the content of each.
271-
*
272-
* <p> For example, if you have the following files:
273-
* {{{
274-
* hdfs://a-hdfs-path/part-00000
275-
* hdfs://a-hdfs-path/part-00001
276-
* ...
277-
* hdfs://a-hdfs-path/part-nnnnn
278-
* }}}
279-
*
280-
* Do
281-
* `JavaPairRDD<String,DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
282-
*
283-
* <p> then `rdd` contains
284-
* {{{
285-
* (a-hdfs-path/part-00000, its content)
286-
* (a-hdfs-path/part-00001, its content)
287-
* ...
288-
* (a-hdfs-path/part-nnnnn, its content)
289-
* }}}
290-
*
291-
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
292-
*
293-
* @param minPartitions A suggestion value of the minimal splitting number for input data.
294-
*/
295-
def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions):
296-
JavaPairRDD[String, Array[Byte]] =
297-
new JavaPairRDD(sc.binaryFiles(path,minPartitions).mapValues(_.toArray()))
298-
299266
/**
300267
* Load data from a flat binary file, assuming each record is a set of numbers
301268
* with the specified numerical format (see ByteBuffer), and the number of

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ private[spark] abstract class StreamFileInputFormat[T]
3535
extends CombineFileInputFormat[String, T] {
3636
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
3737
/**
38-
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
38+
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
39+
* which is set through setMaxSplitSize
3940
*/
40-
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
41+
def setMinPartitions(context: JobContext, minPartitions: Int) {
4142
val files = listStatus(context)
4243
val totalLen = files.map { file =>
4344
if (file.isDir) 0L else file.getLen

core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import org.apache.hadoop.conf.{Configurable, Configuration}
20+
import org.apache.hadoop.conf.{ Configurable, Configuration }
2121
import org.apache.hadoop.io.Writable
2222
import org.apache.hadoop.mapreduce._
2323
import org.apache.spark.input.StreamFileInputFormat
24-
import org.apache.spark.{Partition, SparkContext}
24+
import org.apache.spark.{ Partition, SparkContext }
2525

2626
private[spark] class BinaryFileRDD[T](
27-
sc : SparkContext,
28-
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
29-
keyClass: Class[String],
30-
valueClass: Class[T],
31-
@transient conf: Configuration,
32-
minPartitions: Int)
27+
sc: SparkContext,
28+
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
29+
keyClass: Class[String],
30+
valueClass: Class[T],
31+
@transient conf: Configuration,
32+
minPartitions: Int)
3333
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
3434

35-
3635
override def getPartitions: Array[Partition] = {
3736
val inputFormat = inputFormatClass.newInstance
3837
inputFormat match {
@@ -41,13 +40,12 @@ private[spark] class BinaryFileRDD[T](
4140
case _ =>
4241
}
4342
val jobContext = newJobContext(conf, jobId)
44-
inputFormat.setMaxSplitSize(jobContext, minPartitions)
43+
inputFormat.setMinPartitions(jobContext, minPartitions)
4544
val rawSplits = inputFormat.getSplits(jobContext).toArray
4645
val result = new Array[Partition](rawSplits.size)
4746
for (i <- 0 until rawSplits.size) {
4847
result(i) = new NewHadoopPartition(
49-
id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]
50-
)
48+
id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
5149
}
5250
result
5351
}

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
243243
file.close()
244244

245245
val inRdd = sc.binaryFiles(outFileName)
246-
val (infile: String, indata: PortableDataStream) = inRdd.first
246+
val (infile: String, indata: PortableDataStream) = inRdd.collect.head
247247

248248
// Make sure the name and array match
249249
assert(infile.contains(outFileName)) // a prefix may get added
@@ -274,7 +274,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
274274
curData: (String, PortableDataStream) =>
275275
(curData._2.getPath(),curData._2)
276276
}
277-
val (infile: String, indata: PortableDataStream) = mappedRdd.first
277+
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
278278

279279
// Try reading the output back as an object file
280280

@@ -305,7 +305,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
305305
curData: (String, PortableDataStream) =>
306306
(curData._2.getPath(),curData._2)
307307
}
308-
val (infile: String, indata: PortableDataStream) = mappedRdd.first
308+
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
309309

310310
// Try reading the output back as an object file
311311

@@ -376,7 +376,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
376376
assert(inRdd.count == testOutputCopies)
377377

378378
// now just compare the first one
379-
val indata: Array[Byte] = inRdd.first
379+
val indata: Array[Byte] = inRdd.collect.head
380380
assert(indata === testOutput)
381381
}
382382

0 commit comments

Comments
 (0)