Skip to content

Commit eb356ca

Browse files
committed
Merge pull request alteryx#18 from harishreedharan/driver-ha-wal
Fix tests to not ignore ordering and also assert all data is present
2 parents ef8db09 + 82ce56e commit eb356ca

File tree

1 file changed

+24
-23
lines changed

1 file changed

+24
-23
lines changed

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
1919
import java.io._
2020
import java.nio.ByteBuffer
2121

22-
import org.apache.hadoop.fs.Path
22+
import org.apache.hadoop.fs.{FileStatus, Path}
2323

2424
import scala.collection.mutable.ArrayBuffer
2525
import scala.concurrent.duration._
@@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
4141
val hadoopConf = new Configuration()
4242
val dfsDir = Files.createTempDir()
4343
val TEST_BUILD_DATA_KEY: String = "test.build.data"
44-
val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY)
44+
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
45+
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
4546
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
4647
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
47-
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
48+
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
4849
var pathForTest: String = null
4950

5051
override def beforeAll() {
51-
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
5252
cluster.waitActive()
5353
}
5454

@@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
5959
override def afterAll() {
6060
cluster.shutdown()
6161
FileUtils.deleteDirectory(dfsDir)
62-
System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp)
62+
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
6363
}
6464

6565
test("WriteAheadLogWriter - writing data") {
@@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
7171
assert(writtenData.toArray === dataToWrite.toArray)
7272
}
7373

74-
test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " +
75-
"Minicluster") {
74+
test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
7675
val dataToWrite = generateRandomData()
7776
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
7877
dataToWrite.foreach { data =>
@@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
9897
reader.close()
9998
}
10099

101-
test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") {
100+
test("WriteAheadLogReader - sequentially reading data written with writer") {
102101
// Write data manually for testing the sequential reader
103102
val dataToWrite = generateRandomData()
104103
writeDataUsingWriter(pathForTest, dataToWrite)
@@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
124123
reader.close()
125124
}
126125

127-
test("WriteAheadLogRandomReader - reading data using random reader written with writer using " +
128-
"Minicluster") {
126+
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
129127
// Write data using writer for testing the random reader
130128
val data = generateRandomData()
131129
val segments = writeDataUsingWriter(pathForTest, data)
@@ -141,24 +139,23 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
141139

142140
test("WriteAheadLogManager - write rotating logs") {
143141
// Write data using manager
144-
val dataToWrite = generateRandomData(10)
142+
val dataToWrite = generateRandomData()
145143
val dir = pathForTest
146144
writeDataUsingManager(dir, dataToWrite)
147145

148146
// Read data manually to verify the written data
149147
val logFiles = getLogFilesInDirectory(dir)
150148
assert(logFiles.size > 1)
151-
val writtenData = logFiles.flatMap { file => readDataManually(file) }
152-
assert(writtenData.toSet === dataToWrite.toSet)
149+
val writtenData = logFiles.flatMap { file => readDataManually(file)}
150+
assert(writtenData.toList === dataToWrite.toList)
153151
}
154152

155-
// This one is failing right now -- commenting out for now.
156153
test("WriteAheadLogManager - read rotating logs") {
157154
// Write data manually for testing reading through manager
158155
val dir = pathForTest
159156
val writtenData = (1 to 10).map { i =>
160-
val data = generateRandomData(10)
161-
val file = dir + "/log-" + i
157+
val data = generateRandomData()
158+
val file = dir + s"/log-$i-$i"
162159
writeDataManually(data, file)
163160
data
164161
}.flatten
@@ -169,12 +166,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
169166

170167
// Read data using manager and verify
171168
val readData = readDataUsingManager(dir)
172-
// assert(readData.toList === writtenData.toList)
169+
assert(readData.toList === writtenData.toList)
173170
}
174171

175172
test("WriteAheadLogManager - recover past logs when creating new manager") {
176173
// Write data with manager, recover with new manager and verify
177-
val dataToWrite = generateRandomData(100)
174+
val dataToWrite = generateRandomData()
178175
val dir = pathForTest
179176
writeDataUsingManager(dir, dataToWrite)
180177
val logFiles = getLogFilesInDirectory(dir)
@@ -186,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
186183
test("WriteAheadLogManager - cleanup old logs") {
187184
// Write data with manager, recover with new manager and verify
188185
val dir = pathForTest
189-
val dataToWrite = generateRandomData(100)
186+
val dataToWrite = generateRandomData()
190187
val fakeClock = new ManualClock
191188
val manager = new WriteAheadLogManager(dir, hadoopConf,
192189
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
@@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
201198
assert(getLogFilesInDirectory(dir).size < logFiles.size)
202199
}
203200
}
204-
205201
// TODO (Hari, TD): Test different failure conditions of writers and readers.
206202
// - Failure while reading incomplete/corrupt file
207203
}
@@ -243,8 +239,10 @@ object WriteAheadLogSuite {
243239

244240
def writeDataUsingManager(logDirectory: String, data: Seq[String]) {
245241
val fakeClock = new ManualClock
242+
fakeClock.setTime(1000000)
246243
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
247244
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
245+
// Ensure that 500 does not get sorted after 2000, so put a high base value.
248246
data.foreach { item =>
249247
fakeClock.addToTime(500)
250248
manager.writeToLog(item)
@@ -271,7 +269,8 @@ object WriteAheadLogSuite {
271269
val reader = HdfsUtils.getInputStream(file, hadoopConf)
272270
val buffer = new ArrayBuffer[String]
273271
try {
274-
while (true) { // Read till EOF is thrown
272+
while (true) {
273+
// Read till EOF is thrown
275274
val length = reader.readInt()
276275
val bytes = new Array[Byte](length)
277276
reader.read(bytes)
@@ -293,8 +292,10 @@ object WriteAheadLogSuite {
293292
data
294293
}
295294

296-
def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
297-
(1 to numItems).map { _.toString }
295+
def generateRandomData(): Seq[String] = {
296+
(1 to 50).map {
297+
_.toString
298+
}
298299
}
299300

300301
def getLogFilesInDirectory(directory: String): Seq[String] = {

0 commit comments

Comments
 (0)