@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
25
25
import org .apache .hadoop .mapreduce .RecordReader
26
26
import org .apache .hadoop .mapreduce .TaskAttemptContext
27
27
import org .apache .hadoop .fs .{ FSDataInputStream , Path }
28
+ import org .apache .spark .annotation .DeveloperApi
28
29
import org .apache .hadoop .mapreduce .lib .input .CombineFileInputFormat
29
30
import org .apache .hadoop .mapreduce .JobContext
30
31
import org .apache .hadoop .mapreduce .lib .input .CombineFileRecordReader
@@ -60,6 +61,7 @@ abstract class StreamFileInputFormat[T]
60
61
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
61
62
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
62
63
*/
64
+ @ DeveloperApi
63
65
class PortableDataStream (@ transient isplit : CombineFileSplit ,
64
66
@ transient context : TaskAttemptContext , index : Integer )
65
67
extends Serializable {
@@ -205,8 +207,7 @@ private[spark] class StreamRecordReader(
205
207
}
206
208
207
209
/**
208
- * A class for extracting the information from the file using the
209
- * BinaryRecordReader (as Byte array)
210
+ * The format for the PortableDataStream files
210
211
*/
211
212
private [spark] class StreamInputFormat extends StreamFileInputFormat [PortableDataStream ] {
212
213
override def createRecordReader (split : InputSplit , taContext : TaskAttemptContext ) =
@@ -216,22 +217,3 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
216
217
}
217
218
}
218
219
219
- /**
220
- * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader ]] for reading a single binary file
221
- * out in a key-value pair, where the key is the file path and the value is the entire content of
222
- * the file as a byte array
223
- */
224
- abstract class BinaryRecordReader [T ](
225
- split : CombineFileSplit ,
226
- context : TaskAttemptContext ,
227
- index : Integer )
228
- extends StreamBasedRecordReader [T ](split, context, index) {
229
-
230
- def parseStream (inpStream : PortableDataStream ): T = {
231
- val inStream = inpStream.open()
232
- val innerBuffer = ByteStreams .toByteArray(inStream)
233
- Closeables .close(inStream, false )
234
- parseByteArray(innerBuffer)
235
- }
236
- def parseByteArray (inArray : Array [Byte ]): T
237
- }
0 commit comments