Skip to content

Commit eadde56

Browse files
committed
Transferred HDFSBackedBlockRDD for the driver-ha-working branch
1 parent 6a40a76 commit eadde56

File tree

3 files changed

+248
-0
lines changed

3 files changed

+248
-0
lines changed

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
8484
"Attempted to use %s after its blocks have been removed!".format(toString))
8585
}
8686
}
87+
88+
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
89+
locations_
90+
}
8791
}
8892

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.rdd
18+
19+
import scala.reflect.ClassTag
20+
21+
import org.apache.hadoop.conf.Configuration
22+
23+
import org.apache.spark.broadcast.Broadcast
24+
import org.apache.spark.rdd.BlockRDD
25+
import org.apache.spark.storage.{BlockId, StorageLevel}
26+
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader}
27+
import org.apache.spark._
28+
29+
private[streaming]
30+
class HDFSBackedBlockRDDPartition(
31+
val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition {
32+
val index = idx
33+
}
34+
35+
private[streaming]
36+
class HDFSBackedBlockRDD[T: ClassTag](
37+
@transient sc: SparkContext,
38+
@transient hadoopConfiguration: Configuration,
39+
@transient override val blockIds: Array[BlockId],
40+
@transient val segments: Array[WriteAheadLogFileSegment],
41+
val storeInBlockManager: Boolean,
42+
val storageLevel: StorageLevel
43+
) extends BlockRDD[T](sc, blockIds) {
44+
45+
if (blockIds.length != segments.length) {
46+
throw new IllegalStateException("Number of block ids must be the same as number of segments!")
47+
}
48+
49+
// Hadoop Configuration is not serializable, so broadcast it as a serializable.
50+
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration))
51+
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]]
52+
override def getPartitions: Array[Partition] = {
53+
assertValid()
54+
(0 until blockIds.size).map { i =>
55+
new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i))
56+
}.toArray
57+
}
58+
59+
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
60+
assertValid()
61+
val hadoopConf = broadcastedHadoopConf.value.value
62+
val blockManager = SparkEnv.get.blockManager
63+
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
64+
val blockId = partition.blockId
65+
blockManager.get(blockId) match {
66+
// Data is in Block Manager, grab it from there.
67+
case Some(block) =>
68+
block.data.asInstanceOf[Iterator[T]]
69+
// Data not found in Block Manager, grab it from HDFS
70+
case None =>
71+
logInfo("Reading partition data from write ahead log " + partition.segment.path)
72+
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
73+
val dataRead = reader.read(partition.segment)
74+
reader.close()
75+
// Currently, we support storing the data to BM only in serialized form and not in
76+
// deserialized form
77+
if (storeInBlockManager) {
78+
blockManager.putBytes(blockId, dataRead, storageLevel)
79+
}
80+
dataRead.rewind()
81+
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
82+
}
83+
}
84+
85+
override def getPreferredLocations(split: Partition): Seq[String] = {
86+
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
87+
val locations = getBlockIdLocations()
88+
locations.getOrElse(partition.blockId,
89+
HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration)
90+
.getOrElse(new Array[String](0)).toSeq)
91+
}
92+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.rdd
18+
19+
import java.io.File
20+
import java.util.concurrent.atomic.AtomicInteger
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
import org.scalatest.{BeforeAndAfter, FunSuite}
24+
25+
import com.google.common.io.Files
26+
import org.apache.hadoop.conf.Configuration
27+
28+
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
29+
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
30+
import org.apache.spark.{SparkConf, SparkContext}
31+
32+
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
33+
val conf = new SparkConf()
34+
.setMaster("local[2]")
35+
.setAppName(this.getClass.getSimpleName)
36+
val sparkContext = new SparkContext(conf)
37+
val hadoopConf = new Configuration()
38+
val blockManager = sparkContext.env.blockManager
39+
// Since the same BM is reused in all tests, use an atomic int to generate ids
40+
val idGenerator = new AtomicInteger(0)
41+
var file: File = null
42+
var dir: File = null
43+
44+
before {
45+
dir = Files.createTempDir()
46+
file = new File(dir, "BlockManagerWrite")
47+
}
48+
49+
after {
50+
file.delete()
51+
dir.delete()
52+
}
53+
54+
test("Data available in BM and HDFS") {
55+
doTestHDFSBackedRDD(5, 5, 20, 5)
56+
}
57+
58+
test("Data available in in BM but not in HDFS") {
59+
doTestHDFSBackedRDD(5, 0, 20, 5)
60+
}
61+
62+
test("Data available in in HDFS and not in BM") {
63+
doTestHDFSBackedRDD(0, 5, 20, 5)
64+
}
65+
66+
test("Data partially available in BM, and the rest in HDFS") {
67+
doTestHDFSBackedRDD(3, 2, 20, 5)
68+
}
69+
70+
/**
71+
* Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the
72+
* BlockManager, so all reads need not happen from HDFS.
73+
* @param total - Total number of Strings to write
74+
* @param blockCount - Number of blocks to write (therefore, total # of events per block =
75+
* total/blockCount
76+
*/
77+
private def doTestHDFSBackedRDD(
78+
writeToBMCount: Int,
79+
writeToHDFSCount: Int,
80+
total: Int,
81+
blockCount: Int
82+
) {
83+
val countPerBlock = total / blockCount
84+
val blockIds = (0 until blockCount).map {
85+
i =>
86+
StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet())
87+
}
88+
89+
val writtenStrings = generateData(total, countPerBlock)
90+
91+
if (writeToBMCount != 0) {
92+
(0 until writeToBMCount).foreach { i =>
93+
blockManager
94+
.putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER)
95+
}
96+
}
97+
98+
val segments = new ArrayBuffer[WriteAheadLogFileSegment]
99+
if (writeToHDFSCount != 0) {
100+
// Generate some fake segments for the blocks in BM so the RDD does not complain
101+
segments ++= generateFakeSegments(writeToBMCount)
102+
segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
103+
blockIds.slice(writeToBMCount, blockCount))
104+
105+
} else {
106+
segments ++= generateFakeSegments(blockCount)
107+
}
108+
val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
109+
segments.toArray, false, StorageLevel.MEMORY_ONLY)
110+
111+
val dataFromRDD = rdd.collect()
112+
// verify each partition is equal to the data pulled out
113+
assert(writtenStrings.flatten === dataFromRDD)
114+
}
115+
116+
/**
117+
* Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that
118+
* went into one block.
119+
* @param count - Number of Strings to write
120+
* @param countPerBlock - Number of Strings per block
121+
* @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments,
122+
* each representing the block being written to HDFS.
123+
*/
124+
private def generateData(
125+
count: Int,
126+
countPerBlock: Int
127+
): Seq[Seq[String]] = {
128+
val strings = (0 until count).map { _ => scala.util.Random.nextString(50)}
129+
strings.grouped(countPerBlock).toSeq
130+
}
131+
132+
private def writeDataToHDFS(
133+
blockData: Seq[Seq[String]],
134+
blockIds: Seq[BlockId]
135+
): Seq[WriteAheadLogFileSegment] = {
136+
assert(blockData.size === blockIds.size)
137+
val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
138+
val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
139+
blockData.zip(blockIds).foreach {
140+
case (data, id) =>
141+
segments += writer.write(blockManager.dataSerialize(id, data.iterator))
142+
}
143+
writer.close()
144+
segments
145+
}
146+
147+
private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
148+
(0 until count).map {
149+
_ => new WriteAheadLogFileSegment("random", 0l, 0)
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)