Skip to content

Commit dc3d21a

Browse files
committed
More genericization in ConfigurableCombineFileRecordReader.
1 parent 95d13eb commit dc3d21a

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ private[spark] class WholeTextFileInputFormat
4141
split: InputSplit,
4242
context: TaskAttemptContext): RecordReader[String, String] = {
4343

44-
val reader = new ConfigurableCombineFileRecordReader(split, context)
44+
val reader =
45+
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
4546
reader.setConf(getConf)
4647
reader
4748
}

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,23 @@ private[spark] class WholeTextFileRecordReader(
9494

9595
/**
9696
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
97-
* that could pass Hadoop configuration to WholeTextFileRecordReader.
97+
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
98+
* RecordReaders.
9899
*/
99-
private[spark] class ConfigurableCombineFileRecordReader(
100+
private[spark] class ConfigurableCombineFileRecordReader[K, V](
100101
split: InputSplit,
101-
context: TaskAttemptContext)
102-
extends CombineFileRecordReader[String, String](
102+
context: TaskAttemptContext,
103+
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
104+
extends CombineFileRecordReader[K, V](
103105
split.asInstanceOf[CombineFileSplit],
104106
context,
105-
classOf[WholeTextFileRecordReader]
107+
recordReaderClass
106108
) with Configurable {
107109

108110
override def initNextRecordReader(): Boolean = {
109111
val r = super.initNextRecordReader()
110112
if (r) {
111-
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf)
113+
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
112114
}
113115
r
114116
}

0 commit comments

Comments
 (0)