Skip to content

Commit 6fe8af0

Browse files
committed
Merge pull request apache#13 from harishreedharan/hdfs-wal
Fix the way we decide if hasNext is true or not in WALReader. Unit tests...
2 parents 01d2bf7 + bbfeae1 commit 6fe8af0

File tree

5 files changed

+226
-32
lines changed

5 files changed

+226
-32
lines changed

streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
2121

2222
private[streaming] object HdfsUtils {
2323

24-
def getOutputStream(path: String): FSDataOutputStream = {
24+
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
2525
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
2626

2727
val dfsPath = new Path(path)
28-
val conf = new Configuration()
2928
val dfs =
3029
this.synchronized {
3130
dfsPath.getFileSystem(conf)
@@ -45,10 +44,10 @@ private[streaming] object HdfsUtils {
4544
stream
4645
}
4746

48-
def getInputStream(path: String): FSDataInputStream = {
47+
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
4948
val dfsPath = new Path(path)
5049
val dfs = this.synchronized {
51-
dfsPath.getFileSystem(new Configuration())
50+
dfsPath.getFileSystem(conf)
5251
}
5352
val instream = dfs.open(dfsPath)
5453
instream

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,25 @@
1717
package org.apache.spark.streaming.storage
1818

1919
import java.io.Closeable
20+
import java.nio.ByteBuffer
2021

21-
private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable {
22+
import org.apache.hadoop.conf.Configuration
2223

23-
private val instream = HdfsUtils.getInputStream(path)
24+
private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
25+
extends Closeable {
26+
27+
private val instream = HdfsUtils.getInputStream(path, conf)
2428
private var closed = false
2529

26-
def read(segment: FileSegment): Array[Byte] = synchronized {
30+
def read(segment: FileSegment): ByteBuffer = synchronized {
2731
assertOpen()
2832
instream.seek(segment.offset)
2933
val nextLength = instream.readInt()
3034
HdfsUtils.checkState(nextLength == segment.length,
3135
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
3236
val buffer = new Array[Byte](nextLength)
3337
instream.readFully(buffer)
34-
buffer
38+
ByteBuffer.wrap(buffer)
3539
}
3640

3741
override def close(): Unit = synchronized {

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,37 @@
1616
*/
1717
package org.apache.spark.streaming.storage
1818

19-
import java.io.Closeable
19+
import java.io.{EOFException, Closeable}
20+
import java.nio.ByteBuffer
2021

21-
private[streaming] class WriteAheadLogReader(path: String)
22-
extends Iterator[Array[Byte]] with Closeable {
22+
import org.apache.hadoop.conf.Configuration
2323

24-
private val instream = HdfsUtils.getInputStream(path)
24+
private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
25+
extends Iterator[ByteBuffer] with Closeable {
26+
27+
private val instream = HdfsUtils.getInputStream(path, conf)
2528
private var closed = false
26-
private var nextItem: Option[Array[Byte]] = None
29+
private var nextItem: Option[ByteBuffer] = None
2730

2831
override def hasNext: Boolean = synchronized {
2932
assertOpen()
3033
if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
3134
true
3235
} else {
33-
val available = instream.available()
34-
if (available < 4) { // Length of next block (which is an Int = 4 bytes) of data is unavailable!
35-
false
36-
}
37-
val length = instream.readInt()
38-
if (instream.available() < length) {
39-
false
36+
try {
37+
val length = instream.readInt()
38+
val buffer = new Array[Byte](length)
39+
instream.readFully(buffer)
40+
nextItem = Some(ByteBuffer.wrap(buffer))
41+
true
42+
} catch {
43+
case e: EOFException => false
44+
case e: Exception => throw e
4045
}
41-
val buffer = new Array[Byte](length)
42-
instream.readFully(buffer)
43-
nextItem = Some(buffer)
44-
true
4546
}
4647
}
4748

48-
override def next(): Array[Byte] = synchronized {
49+
override def next(): ByteBuffer = synchronized {
4950
// TODO: Possible error case where there are not enough bytes in the stream
5051
// TODO: How to handle that?
5152
val data = nextItem.getOrElse {

streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,38 @@
1717
package org.apache.spark.streaming.storage
1818

1919
import java.io.Closeable
20+
import java.lang.reflect.Method
21+
import java.nio.ByteBuffer
2022

21-
private[streaming] class WriteAheadLogWriter(path: String) extends Closeable {
22-
private val stream = HdfsUtils.getOutputStream(path)
23+
import scala.util.Try
24+
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs.FSDataOutputStream
27+
28+
private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable {
29+
private val stream = HdfsUtils.getOutputStream(path, conf)
2330
private var nextOffset = stream.getPos
2431
private var closed = false
32+
private val hflushMethod = getHflushOrSync()
2533

2634
// Data is always written as:
2735
// - Length - Long
2836
// - Data - of length = Length
29-
def write(data: Array[Byte]): FileSegment = synchronized {
37+
def write(data: ByteBuffer): FileSegment = synchronized {
3038
assertOpen()
31-
val segment = new FileSegment(path, nextOffset, data.length)
32-
stream.writeInt(data.length)
33-
stream.write(data)
34-
stream.hflush()
39+
data.rewind() // Rewind to ensure all data in the buffer is retrieved
40+
val lengthToWrite = data.remaining()
41+
val segment = new FileSegment(path, nextOffset, lengthToWrite)
42+
stream.writeInt(lengthToWrite)
43+
if (data.hasArray) {
44+
stream.write(data.array())
45+
} else {
46+
// If the buffer is not backed by an array we need to write the data byte by byte
47+
while (data.hasRemaining) {
48+
stream.write(data.get())
49+
}
50+
}
51+
hflushOrSync()
3552
nextOffset = stream.getPos
3653
segment
3754
}
@@ -41,6 +58,19 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable {
4158
stream.close()
4259
}
4360

61+
private def hflushOrSync() {
62+
hflushMethod.foreach(_.invoke(stream))
63+
}
64+
65+
private def getHflushOrSync(): Option[Method] = {
66+
Try {
67+
Some(classOf[FSDataOutputStream].getMethod("hflush"))
68+
}.recover {
69+
case e: NoSuchMethodException =>
70+
Some(classOf[FSDataOutputStream].getMethod("sync"))
71+
}.getOrElse(None)
72+
}
73+
4474
private def assertOpen() {
4575
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
4676
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage
18+
19+
import java.io.{RandomAccessFile, File}
20+
import java.nio.ByteBuffer
21+
import java.util.Random
22+
23+
import scala.collection.mutable.ArrayBuffer
24+
25+
import com.google.common.io.Files
26+
import org.apache.hadoop.conf.Configuration
27+
28+
import org.apache.spark.streaming.TestSuiteBase
29+
30+
class WriteAheadLogSuite extends TestSuiteBase {
31+
32+
val hadoopConf = new Configuration()
33+
val random = new Random()
34+
35+
test("Test successful writes") {
36+
val dir = Files.createTempDir()
37+
val file = new File(dir, "TestWriter")
38+
try {
39+
val dataToWrite = for (i <- 1 to 50) yield generateRandomData()
40+
val writer = new WriteAheadLogWriter("file:///" + file.toString, hadoopConf)
41+
val segments = dataToWrite.map(writer.write)
42+
writer.close()
43+
val writtenData = readData(segments, file)
44+
assert(writtenData.toArray === dataToWrite.toArray)
45+
} finally {
46+
file.delete()
47+
dir.delete()
48+
}
49+
}
50+
51+
test("Test successful reads using random reader") {
52+
val file = File.createTempFile("TestRandomReads", "")
53+
file.deleteOnExit()
54+
val writtenData = writeData(50, file)
55+
val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
56+
writtenData.foreach {
57+
x =>
58+
val length = x._1.remaining()
59+
assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length)))
60+
}
61+
reader.close()
62+
}
63+
64+
test("Test reading data using random reader written with writer") {
65+
val dir = Files.createTempDir()
66+
val file = new File(dir, "TestRandomReads")
67+
try {
68+
val dataToWrite = for (i <- 1 to 50) yield generateRandomData()
69+
val segments = writeUsingWriter(file, dataToWrite)
70+
val iter = dataToWrite.iterator
71+
val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
72+
val writtenData = segments.map { x =>
73+
reader.read(x)
74+
}
75+
assert(dataToWrite.toArray === writtenData.toArray)
76+
} finally {
77+
file.delete()
78+
dir.delete()
79+
}
80+
}
81+
82+
test("Test successful reads using sequential reader") {
83+
val file = File.createTempFile("TestSequentialReads", "")
84+
file.deleteOnExit()
85+
val writtenData = writeData(50, file)
86+
val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
87+
val iter = writtenData.iterator
88+
iter.foreach { x =>
89+
assert(reader.hasNext === true)
90+
assert(reader.next() === x._1)
91+
}
92+
reader.close()
93+
}
94+
95+
96+
test("Test reading data using sequential reader written with writer") {
97+
val dir = Files.createTempDir()
98+
val file = new File(dir, "TestWriter")
99+
try {
100+
val dataToWrite = for (i <- 1 to 50) yield generateRandomData()
101+
val segments = writeUsingWriter(file, dataToWrite)
102+
val iter = dataToWrite.iterator
103+
val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
104+
reader.foreach { x =>
105+
assert(x === iter.next())
106+
}
107+
} finally {
108+
file.delete()
109+
dir.delete()
110+
}
111+
}
112+
113+
/**
114+
* Writes data to the file and returns the an array of the bytes written.
115+
* @param count
116+
* @return
117+
*/
118+
// We don't want to be using the WAL writer to test the reader - it will be painful to figure
119+
// out where the bug is. Instead generate the file by hand and see if the WAL reader can
120+
// handle it.
121+
def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = {
122+
val writtenData = new ArrayBuffer[(ByteBuffer, Long)]()
123+
val writer = new RandomAccessFile(file, "rw")
124+
var i = 0
125+
while (i < count) {
126+
val data = generateRandomData()
127+
writtenData += ((data, writer.getFilePointer))
128+
data.rewind()
129+
writer.writeInt(data.remaining())
130+
writer.write(data.array())
131+
i += 1
132+
}
133+
writer.close()
134+
writtenData
135+
}
136+
137+
def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = {
138+
val reader = new RandomAccessFile(file, "r")
139+
segments.map { x =>
140+
reader.seek(x.offset)
141+
val data = new Array[Byte](x.length)
142+
reader.readInt()
143+
reader.readFully(data)
144+
ByteBuffer.wrap(data)
145+
}
146+
}
147+
148+
def generateRandomData(): ByteBuffer = {
149+
val data = new Array[Byte](random.nextInt(50))
150+
random.nextBytes(data)
151+
ByteBuffer.wrap(data)
152+
}
153+
154+
def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = {
155+
val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
156+
val segments = input.map(writer.write)
157+
writer.close()
158+
segments
159+
}
160+
}

0 commit comments

Comments
 (0)