Skip to content

Commit 28cb0fe

Browse files
committed
add whole text files reader
1 parent d679843 commit 28cb0fe

File tree

4 files changed

+290
-0
lines changed

4 files changed

+290
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib
19+
20+
import org.apache.spark.mllib.input.WholeTextFileInputFormat
21+
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.SparkContext
23+
24+
/**
25+
* Extra functions available on SparkContext of mllib through an implicit conversion. Import
26+
* `org.apache.spark.mllib.MLContext._` at the top of your program to use these functions.
27+
*/
28+
class MLContext(self: SparkContext) {
29+
30+
/**
31+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
32+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
33+
* key-value pair, where the key is the path of each file, the value is the content of each file.
34+
*
35+
* <p> For example, if you have the following files:
36+
* {{{
37+
* hdfs://a-hdfs-path/part-00000
38+
* hdfs://a-hdfs-path/part-00001
39+
* ...
40+
* hdfs://a-hdfs-path/part-nnnnn
41+
* }}}
42+
*
43+
* Do `val rdd = mlContext.wholeTextFile("hdfs://a-hdfs-path")`,
44+
*
45+
* <p> then `rdd` contains
46+
* {{{
47+
* (a-hdfs-path/part-00000, its content)
48+
* (a-hdfs-path/part-00001, its content)
49+
* ...
50+
* (a-hdfs-path/part-nnnnn, its content)
51+
* }}}
52+
*/
53+
def wholeTextFile(path: String): RDD[(String, String)] = {
54+
self.newAPIHadoopFile(
55+
path,
56+
classOf[WholeTextFileInputFormat],
57+
classOf[String],
58+
classOf[String])
59+
}
60+
}
61+
62+
/**
63+
* The MLContext object contains a number of implicit conversions and parameters for use with
64+
* various mllib features.
65+
*/
66+
object MLContext {
67+
implicit def sparkContextToMLContext(sc: SparkContext) = new MLContext(sc)
68+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.input
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.mapreduce.InputSplit
22+
import org.apache.hadoop.mapreduce.JobContext
23+
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
24+
import org.apache.hadoop.mapreduce.RecordReader
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext
26+
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
27+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
28+
29+
/**
30+
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
31+
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
32+
* the value is the entire content of file.
33+
*/
34+
35+
private[mllib] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
36+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
37+
38+
override def createRecordReader(
39+
split: InputSplit,
40+
context: TaskAttemptContext): RecordReader[String, String] = {
41+
42+
new CombineFileRecordReader[String, String](
43+
split.asInstanceOf[CombineFileSplit],
44+
context,
45+
classOf[WholeTextFileRecordReader])
46+
}
47+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.input
19+
20+
import com.google.common.io.{ByteStreams, Closeables}
21+
22+
import org.apache.hadoop.io.Text
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
25+
import org.apache.hadoop.mapreduce.RecordReader
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext
27+
28+
/**
29+
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
30+
* out in a key-value pair, where the key is the file path and the value is the entire content of
31+
* the file.
32+
*/
33+
private[mllib] class WholeTextFileRecordReader(
34+
split: CombineFileSplit,
35+
context: TaskAttemptContext,
36+
index: Integer)
37+
extends RecordReader[String, String] {
38+
39+
private val path = split.getPath(index)
40+
private val fs = path.getFileSystem(context.getConfiguration)
41+
42+
// True means the current file has been processed, then skip it.
43+
private var processed = false
44+
45+
private val key = path.toString
46+
private var value: String = null
47+
48+
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
49+
50+
override def close() = {}
51+
52+
override def getProgress = if (processed) 1.0f else 0.0f
53+
54+
override def getCurrentKey = key
55+
56+
override def getCurrentValue = value
57+
58+
override def nextKeyValue = {
59+
if (!processed) {
60+
val fileIn = fs.open(path)
61+
val innerBuffer = ByteStreams.toByteArray(fileIn)
62+
63+
value = new Text(innerBuffer).toString
64+
Closeables.close(fileIn, false)
65+
66+
processed = true
67+
true
68+
} else {
69+
false
70+
}
71+
}
72+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.input
19+
20+
import java.io.DataOutputStream
21+
import java.io.File
22+
import java.io.FileOutputStream
23+
24+
import scala.collection.immutable.IndexedSeq
25+
26+
import com.google.common.io.Files
27+
28+
import org.scalatest.BeforeAndAfterAll
29+
import org.scalatest.FunSuite
30+
31+
import org.apache.hadoop.io.Text
32+
33+
import org.apache.spark.SparkContext
34+
import org.apache.spark.mllib.MLContext._
35+
36+
/**
37+
* Tests the correctness of
38+
* [[org.apache.spark.mllib.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
39+
* directory is created as fake input. Temporal storage would be deleted in the end.
40+
*/
41+
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
42+
private var sc: SparkContext = _
43+
44+
override def beforeAll() {
45+
sc = new SparkContext("local", "test")
46+
}
47+
48+
override def afterAll() {
49+
sc.stop()
50+
}
51+
52+
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
53+
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
54+
out.write(contents, 0, contents.length)
55+
out.close()
56+
}
57+
58+
/**
59+
* This code will test the behaviors of WholeTextFileRecordReader based on local disk. There are
60+
* three aspects to check:
61+
* 1) Whether all files are read;
62+
* 2) Whether paths are read correctly;
63+
* 3) Does the contents be the same.
64+
*/
65+
test("Correctness of WholeTextFileRecordReader.") {
66+
67+
val dir = Files.createTempDir()
68+
println(s"Local disk address is ${dir.toString}.")
69+
70+
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
71+
createNativeFile(dir, filename, contents)
72+
}
73+
74+
val res = sc.wholeTextFile(dir.toString).collect()
75+
76+
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
77+
"Number of files read out does not fit with the actual value.")
78+
79+
for ((filename, contents) <- res) {
80+
val shortName = filename.split('/').last
81+
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
82+
s"Missing file name $filename.")
83+
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
84+
s"file $filename contents can not match.")
85+
}
86+
87+
dir.delete()
88+
}
89+
}
90+
91+
/**
92+
* Files to be tested are defined here.
93+
*/
94+
object WholeTextFileRecordReaderSuite {
95+
private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
96+
97+
private val fileNames = Array("part-00000", "part-00001", "part-00002")
98+
private val fileLengths = Array(10, 100, 1000)
99+
100+
private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
101+
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
102+
}.toMap
103+
}

0 commit comments

Comments
 (0)