Skip to content

Commit 82ce56e

Browse files
Fix file ordering issue in WALManager tests
1 parent 5ff90ee commit 82ce56e

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
139139

140140
test("WriteAheadLogManager - write rotating logs") {
141141
// Write data using manager
142-
val dataToWrite = generateRandomData(10)
142+
val dataToWrite = generateRandomData()
143143
val dir = pathForTest
144144
writeDataUsingManager(dir, dataToWrite)
145145

@@ -154,7 +154,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
154154
// Write data manually for testing reading through manager
155155
val dir = pathForTest
156156
val writtenData = (1 to 10).map { i =>
157-
val data = generateRandomData(10)
157+
val data = generateRandomData()
158158
val file = dir + s"/log-$i-$i"
159159
writeDataManually(data, file)
160160
data
@@ -171,7 +171,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
171171

172172
test("WriteAheadLogManager - recover past logs when creating new manager") {
173173
// Write data with manager, recover with new manager and verify
174-
val dataToWrite = generateRandomData(100)
174+
val dataToWrite = generateRandomData()
175175
val dir = pathForTest
176176
writeDataUsingManager(dir, dataToWrite)
177177
val logFiles = getLogFilesInDirectory(dir)
@@ -183,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
183183
test("WriteAheadLogManager - cleanup old logs") {
184184
// Write data with manager, recover with new manager and verify
185185
val dir = pathForTest
186-
val dataToWrite = generateRandomData(100)
186+
val dataToWrite = generateRandomData()
187187
val fakeClock = new ManualClock
188188
val manager = new WriteAheadLogManager(dir, hadoopConf,
189189
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
@@ -239,8 +239,10 @@ object WriteAheadLogSuite {
239239

240240
def writeDataUsingManager(logDirectory: String, data: Seq[String]) {
241241
val fakeClock = new ManualClock
242+
fakeClock.setTime(1000000)
242243
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
243244
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
245+
// Ensure that 500 does not get sorted after 2000, so put a high base value.
244246
data.foreach { item =>
245247
fakeClock.addToTime(500)
246248
manager.writeToLog(item)
@@ -290,8 +292,8 @@ object WriteAheadLogSuite {
290292
data
291293
}
292294

293-
def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
294-
(1 to numItems).map {
295+
def generateRandomData(): Seq[String] = {
296+
(1 to 50).map {
295297
_.toString
296298
}
297299
}
@@ -300,11 +302,8 @@ object WriteAheadLogSuite {
300302
val logDirectoryPath = new Path(directory)
301303
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
302304

303-
implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering
304-
.by(f => f.getModificationTime)
305-
306305
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
307-
fileSystem.listStatus(logDirectoryPath).sorted.map {
306+
fileSystem.listStatus(logDirectoryPath).map {
308307
_.getPath.toString
309308
}
310309
} else {

0 commit comments

Comments
 (0)