Skip to content

Commit ed36200

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-4437] update doc for WholeCombineFileRecordReader
update doc for WholeCombineFileRecordReader Author: Davies Liu <[email protected]> Author: Josh Rosen <[email protected]> Closes apache#3301 from davies/fix_doc and squashes the following commits: 1d7422f [Davies Liu] Merge pull request #2 from JoshRosen/whole-text-file-cleanup dc3d21a [Josh Rosen] More genericization in ConfigurableCombineFileRecordReader. 95d13eb [Davies Liu] address comment bf800b9 [Davies Liu] update doc for WholeCombineFileRecordReader
1 parent c246b95 commit ed36200

File tree

2 files changed

+25
-30
lines changed

2 files changed

+25
-30
lines changed

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.input
1919

2020
import scala.collection.JavaConversions._
2121

22-
import org.apache.hadoop.conf.{Configuration, Configurable}
2322
import org.apache.hadoop.fs.Path
2423
import org.apache.hadoop.mapreduce.InputSplit
2524
import org.apache.hadoop.mapreduce.JobContext
@@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat
3837

3938
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
4039

41-
private var conf: Configuration = _
42-
def setConf(c: Configuration) {
43-
conf = c
44-
}
45-
def getConf: Configuration = conf
46-
4740
override def createRecordReader(
4841
split: InputSplit,
4942
context: TaskAttemptContext): RecordReader[String, String] = {
5043

51-
val reader = new WholeCombineFileRecordReader(split, context)
52-
reader.setConf(conf)
44+
val reader =
45+
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
46+
reader.setConf(getConf)
5347
reader
5448
}
5549

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.input
1919

20-
import org.apache.hadoop.conf.{Configuration, Configurable}
20+
import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
2121
import com.google.common.io.{ByteStreams, Closeables}
2222

2323
import org.apache.hadoop.io.Text
@@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
2727
import org.apache.hadoop.mapreduce.RecordReader
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext
2929

30+
31+
/**
32+
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
33+
*/
34+
private[spark] trait Configurable extends HConfigurable {
35+
private var conf: Configuration = _
36+
def setConf(c: Configuration) {
37+
conf = c
38+
}
39+
def getConf: Configuration = conf
40+
}
41+
3042
/**
3143
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
3244
* out in a key-value pair, where the key is the file path and the value is the entire content of
@@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
3850
index: Integer)
3951
extends RecordReader[String, String] with Configurable {
4052

41-
private var conf: Configuration = _
42-
def setConf(c: Configuration) {
43-
conf = c
44-
}
45-
def getConf: Configuration = conf
46-
4753
private[this] val path = split.getPath(index)
4854
private[this] val fs = path.getFileSystem(context.getConfiguration)
4955

@@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(
8793

8894

8995
/**
90-
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
91-
* out in a key-value pair, where the key is the file path and the value is the entire content of
92-
* the file.
96+
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
97+
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
98+
* RecordReaders.
9399
*/
94-
private[spark] class WholeCombineFileRecordReader(
100+
private[spark] class ConfigurableCombineFileRecordReader[K, V](
95101
split: InputSplit,
96-
context: TaskAttemptContext)
97-
extends CombineFileRecordReader[String, String](
102+
context: TaskAttemptContext,
103+
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
104+
extends CombineFileRecordReader[K, V](
98105
split.asInstanceOf[CombineFileSplit],
99106
context,
100-
classOf[WholeTextFileRecordReader]
107+
recordReaderClass
101108
) with Configurable {
102109

103-
private var conf: Configuration = _
104-
def setConf(c: Configuration) {
105-
conf = c
106-
}
107-
def getConf: Configuration = conf
108-
109110
override def initNextRecordReader(): Boolean = {
110111
val r = super.initNextRecordReader()
111112
if (r) {
112-
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
113+
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
113114
}
114115
r
115116
}

0 commit comments

Comments
 (0)