Skip to content

Commit 25611d6

Browse files
committed
Minor changes before submitting PR
1 parent 7ae0a7f commit 25611d6

File tree

4 files changed

+15
-15
lines changed

4 files changed

+15
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.rdd.{BlockRDD, RDD}
2323
import org.apache.spark.storage.{BlockId, StorageLevel}
2424
import org.apache.spark.streaming._
25-
import org.apache.spark.streaming.rdd.HDFSBackedBlockRDD
25+
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
2626
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
2727
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
2828

@@ -67,7 +67,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
6767
val logSegments = blockStoreResults.map {
6868
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
6969
}.toArray
70-
new HDFSBackedBlockRDD[T](ssc.sparkContext, ssc.sparkContext.hadoopConfiguration,
70+
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
7171
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
7272
} else {
7373
new BlockRDD[T](ssc.sc, blockIds)

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class WriteAheadLogBackedBlockRDDPartition(
4848
* If it does not find them, it looks up the corresponding file segment.
4949
*
5050
* @param sc SparkContext
51-
* @param hadoopConfig Hadoop configuration
5251
* @param blockIds Ids of the blocks that contains this RDD's data
5352
* @param segments Segments in write ahead logs that contain this RDD's data
5453
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
@@ -58,7 +57,6 @@ class WriteAheadLogBackedBlockRDDPartition(
5857
private[streaming]
5958
class WriteAheadLogBackedBlockRDD[T: ClassTag](
6059
@transient sc: SparkContext,
61-
@transient hadoopConfig: Configuration,
6260
@transient blockIds: Array[BlockId],
6361
@transient segments: Array[WriteAheadLogFileSegment],
6462
storeInBlockManager: Boolean,
@@ -71,6 +69,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
7169
s"the same as number of segments (${segments.length}})!")
7270

7371
// Hadoop configuration is not serializable, so broadcast it as a serializable.
72+
@transient private val hadoopConfig = sc.hadoopConfiguration
7473
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
7574

7675
override def getPartitions: Array[Partition] = {

streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import org.apache.spark.streaming.StreamingContext._
21-
22-
import org.apache.spark.rdd.{BlockRDD, RDD}
23-
import org.apache.spark.SparkContext._
20+
import scala.collection.mutable
21+
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
22+
import scala.language.existentials
23+
import scala.reflect.ClassTag
2424

2525
import util.ManualClock
26-
import org.apache.spark.{SparkException, SparkConf}
27-
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
28-
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
29-
import scala.reflect.ClassTag
26+
27+
import org.apache.spark.{SparkConf, SparkException}
28+
import org.apache.spark.SparkContext._
29+
import org.apache.spark.rdd.{BlockRDD, RDD}
3030
import org.apache.spark.storage.StorageLevel
31-
import scala.collection.mutable
31+
import org.apache.spark.streaming.StreamingContext._
32+
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
3233

3334
class BasicOperationsSuite extends TestSuiteBase {
3435
test("map") {

streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
117117
)
118118

119119
// Create the RDD and verify whether the returned data is correct
120-
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
120+
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
121121
segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
122122
assert(rdd.collect() === data.flatten)
123123

124124
if (testStoreInBM) {
125-
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
125+
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
126126
segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
127127
assert(rdd2.collect() === data.flatten)
128128
assert(

0 commit comments

Comments
 (0)