Skip to content

Commit 4705fff

Browse files
Sort listed files by name. Use local files for WAL tests.
1 parent 82ce56e commit 4705fff

File tree

5 files changed

+44
-65
lines changed

5 files changed

+44
-65
lines changed

pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,6 @@
406406
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
407407
<version>${akka.version}</version>
408408
</dependency>
409-
<dependency>
410-
<groupId>org.apache.hadoop</groupId>
411-
<artifactId>hadoop-minicluster</artifactId>
412-
<version>${hadoop.version}</version>
413-
<scope>test</scope>
414-
</dependency>
415409
<dependency>
416410
<groupId>${akka.group}</groupId>
417411
<artifactId>akka-testkit_${scala.binary.version}</artifactId>

streaming/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@
6868
<artifactId>junit-interface</artifactId>
6969
<scope>test</scope>
7070
</dependency>
71-
<dependency>
72-
<groupId>org.apache.hadoop</groupId>
73-
<artifactId>hadoop-minicluster</artifactId>
74-
<scope>test</scope>
75-
</dependency>
7671
</dependencies>
7772
<build>
7873
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
package org.apache.spark.streaming.util
1818

1919
import org.apache.hadoop.conf.Configuration
20-
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
20+
import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
2121

2222
private[streaming] object HdfsUtils {
2323

2424
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
2525
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
26-
2726
val dfsPath = new Path(path)
2827
val dfs = getFileSystemForPath(dfsPath, conf)
2928
// If the file exists and we have append support, append instead of creating a new file
@@ -63,6 +62,10 @@ private[streaming] object HdfsUtils {
6362
}
6463

6564
def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
66-
path.getFileSystem(conf)
65+
val fs = path.getFileSystem(conf)
66+
fs match {
67+
case localFs: LocalFileSystem => localFs.getRawFileSystem
68+
case _ => fs
69+
}
6770
}
6871
}

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
7373
}
7474

7575
private def flush() {
76+
stream.getWrappedStream.flush()
7677
hadoopFlushMethod.foreach {
7778
_.invoke(stream)
7879
}

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
1919
import java.io._
2020
import java.nio.ByteBuffer
2121

22-
import org.apache.hadoop.fs.{FileStatus, Path}
22+
import org.apache.hadoop.fs.Path
2323

2424
import scala.collection.mutable.ArrayBuffer
2525
import scala.concurrent.duration._
@@ -31,63 +31,53 @@ import WriteAheadLogSuite._
3131
import com.google.common.io.Files
3232
import org.apache.commons.io.FileUtils
3333
import org.apache.hadoop.conf.Configuration
34-
import org.apache.hadoop.hdfs.MiniDFSCluster
35-
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
34+
import org.scalatest.{BeforeAndAfter, FunSuite}
3635
import org.apache.spark.util.Utils
3736
import org.scalatest.concurrent.Eventually._
3837

39-
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
38+
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
4039

4140
val hadoopConf = new Configuration()
42-
val dfsDir = Files.createTempDir()
43-
val TEST_BUILD_DATA_KEY: String = "test.build.data"
44-
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
45-
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
46-
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
47-
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
48-
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
49-
var pathForTest: String = null
50-
51-
override def beforeAll() {
52-
cluster.waitActive()
53-
}
41+
var tempDir: File = null
42+
var dirForTest: String = null
43+
var fileForTest: String = null
5444

5545
before {
56-
pathForTest = hdfsUrl + getRandomString()
46+
tempDir = Files.createTempDir()
47+
dirForTest = "file:///" + tempDir.toString
48+
fileForTest = "file:///" + new File(tempDir, getRandomString()).toString
5749
}
5850

59-
override def afterAll() {
60-
cluster.shutdown()
61-
FileUtils.deleteDirectory(dfsDir)
62-
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
51+
after {
52+
FileUtils.deleteDirectory(tempDir)
6353
}
6454

6555
test("WriteAheadLogWriter - writing data") {
6656
val dataToWrite = generateRandomData()
67-
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
57+
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
6858
val segments = dataToWrite.map(data => writer.write(data))
6959
writer.close()
70-
val writtenData = readDataManually(pathForTest, segments)
60+
val writtenData = readDataManually(fileForTest, segments)
7161
assert(writtenData.toArray === dataToWrite.toArray)
7262
}
7363

7464
test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
7565
val dataToWrite = generateRandomData()
76-
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
66+
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
7767
dataToWrite.foreach { data =>
78-
val segment = writer.write(ByteBuffer.wrap(data.getBytes()))
79-
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
68+
val segment = writer.write(stringToByteBuffer(data))
69+
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
8070
val dataRead = reader.read(segment)
81-
assert(data === new String(dataRead.array()))
71+
assert(data === byteBufferToString(dataRead))
8272
}
8373
writer.close()
8474
}
8575

8676
test("WriteAheadLogReader - sequentially reading data") {
8777
// Write data manually for testing the sequential reader
8878
val writtenData = generateRandomData()
89-
writeDataManually(writtenData, pathForTest)
90-
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
79+
writeDataManually(writtenData, fileForTest)
80+
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
9181
val readData = reader.toSeq.map(byteBufferToString)
9282
assert(readData.toList === writtenData.toList)
9383
assert(reader.hasNext === false)
@@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
10090
test("WriteAheadLogReader - sequentially reading data written with writer") {
10191
// Write data manually for testing the sequential reader
10292
val dataToWrite = generateRandomData()
103-
writeDataUsingWriter(pathForTest, dataToWrite)
93+
writeDataUsingWriter(fileForTest, dataToWrite)
10494
val iter = dataToWrite.iterator
105-
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
95+
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
10696
reader.foreach { byteBuffer =>
10797
assert(byteBufferToString(byteBuffer) === iter.next())
10898
}
@@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
112102
test("WriteAheadLogRandomReader - reading data using random reader") {
113103
// Write data manually for testing the random reader
114104
val writtenData = generateRandomData()
115-
val segments = writeDataManually(writtenData, pathForTest)
105+
val segments = writeDataManually(writtenData, fileForTest)
116106

117107
// Get a random order of these segments and read them back
118108
val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
119-
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
109+
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
120110
writtenDataAndSegments.foreach { case (data, segment) =>
121111
assert(data === byteBufferToString(reader.read(segment)))
122112
}
@@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
126116
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
127117
// Write data using writer for testing the random reader
128118
val data = generateRandomData()
129-
val segments = writeDataUsingWriter(pathForTest, data)
119+
val segments = writeDataUsingWriter(fileForTest, data)
130120

131121
// Read a random sequence of segments and verify read data
132122
val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
133-
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
123+
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
134124
dataAndSegments.foreach { case (data, segment) =>
135125
assert(data === byteBufferToString(reader.read(segment)))
136126
}
@@ -140,62 +130,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
140130
test("WriteAheadLogManager - write rotating logs") {
141131
// Write data using manager
142132
val dataToWrite = generateRandomData()
143-
val dir = pathForTest
144-
writeDataUsingManager(dir, dataToWrite)
133+
writeDataUsingManager(dirForTest, dataToWrite)
145134

146135
// Read data manually to verify the written data
147-
val logFiles = getLogFilesInDirectory(dir)
136+
val logFiles = getLogFilesInDirectory(dirForTest)
148137
assert(logFiles.size > 1)
149138
val writtenData = logFiles.flatMap { file => readDataManually(file)}
150139
assert(writtenData.toList === dataToWrite.toList)
151140
}
152141

153142
test("WriteAheadLogManager - read rotating logs") {
154143
// Write data manually for testing reading through manager
155-
val dir = pathForTest
156144
val writtenData = (1 to 10).map { i =>
157145
val data = generateRandomData()
158-
val file = dir + s"/log-$i-$i"
146+
val file = dirForTest + s"/log-$i-$i"
159147
writeDataManually(data, file)
160148
data
161149
}.flatten
162150

163-
val logDirectoryPath = new Path(dir)
151+
val logDirectoryPath = new Path(dirForTest)
164152
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
165153
assert(fileSystem.exists(logDirectoryPath) === true)
166154

167155
// Read data using manager and verify
168-
val readData = readDataUsingManager(dir)
156+
val readData = readDataUsingManager(dirForTest)
169157
assert(readData.toList === writtenData.toList)
170158
}
171159

172160
test("WriteAheadLogManager - recover past logs when creating new manager") {
173161
// Write data with manager, recover with new manager and verify
174162
val dataToWrite = generateRandomData()
175-
val dir = pathForTest
176-
writeDataUsingManager(dir, dataToWrite)
177-
val logFiles = getLogFilesInDirectory(dir)
163+
writeDataUsingManager(dirForTest, dataToWrite)
164+
val logFiles = getLogFilesInDirectory(dirForTest)
178165
assert(logFiles.size > 1)
179-
val readData = readDataUsingManager(dir)
166+
val readData = readDataUsingManager(dirForTest)
180167
assert(dataToWrite.toList === readData.toList)
181168
}
182169

183170
test("WriteAheadLogManager - cleanup old logs") {
184171
// Write data with manager, recover with new manager and verify
185-
val dir = pathForTest
186172
val dataToWrite = generateRandomData()
187173
val fakeClock = new ManualClock
188-
val manager = new WriteAheadLogManager(dir, hadoopConf,
174+
val manager = new WriteAheadLogManager(dirForTest, hadoopConf,
189175
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
190176
dataToWrite.foreach { item =>
191177
fakeClock.addToTime(500) // half second for each
192178
manager.writeToLog(item)
193179
}
194-
val logFiles = getLogFilesInDirectory(dir)
180+
val logFiles = getLogFilesInDirectory(dirForTest)
195181
assert(logFiles.size > 1)
196182
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
197183
eventually(timeout(1 second), interval(10 milliseconds)) {
198-
assert(getLogFilesInDirectory(dir).size < logFiles.size)
184+
assert(getLogFilesInDirectory(dirForTest).size < logFiles.size)
199185
}
200186
}
201187
// TODO (Hari, TD): Test different failure conditions of writers and readers.
@@ -305,7 +291,7 @@ object WriteAheadLogSuite {
305291
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
306292
fileSystem.listStatus(logDirectoryPath).map {
307293
_.getPath.toString
308-
}
294+
}.sorted
309295
} else {
310296
Seq.empty
311297
}

0 commit comments

Comments
 (0)