Skip to content

Commit a949741

Browse files
committed
Address the comments
1 parent 16bfe78 commit a949741

File tree

3 files changed

+24
-128
lines changed

3 files changed

+24
-128
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 2 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ object KafkaUtils {
7070
topics: Map[String, Int],
7171
storageLevel: StorageLevel
7272
): ReceiverInputDStream[(K, V)] = {
73-
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel)
73+
val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
74+
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel)
7475
}
7576

7677
/**
@@ -143,121 +144,4 @@ object KafkaUtils {
143144
createStream[K, V, U, T](
144145
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
145146
}
146-
147-
/**
148-
* Create an reliable input stream that pulls messages from a Kafka Broker.
149-
* @param ssc StreamingContext object
150-
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
151-
* @param groupId The group id for this consumer
152-
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
153-
* in its own thread
154-
* @param storageLevel Storage level to use for storing the received objects
155-
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
156-
*/
157-
def createReliableStream(
158-
ssc: StreamingContext,
159-
zkQuorum: String,
160-
groupId: String,
161-
topics: Map[String, Int],
162-
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
163-
: ReceiverInputDStream[(String, String)] = {
164-
val kafkaParams = Map[String, String](
165-
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
166-
"zookeeper.connection.timeout.ms" -> "10000")
167-
createReliableStream[String, String, StringDecoder, StringDecoder](
168-
ssc, kafkaParams, topics, storageLevel)
169-
}
170-
171-
/**
172-
* Create an reliable input stream that pulls messages from a Kafka Broker.
173-
* @param ssc StreamingContext object
174-
* @param kafkaParams Map of kafka configuration parameters,
175-
* see http://kafka.apache.org/08/configuration.html
176-
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
177-
* in its own thread.
178-
* @param storageLevel Storage level to use for storing the received objects
179-
*/
180-
def createReliableStream[
181-
K: ClassTag,
182-
V: ClassTag,
183-
U <: Decoder[_]: ClassTag,
184-
T <: Decoder[_]: ClassTag](
185-
ssc: StreamingContext,
186-
kafkaParams: Map[String, String],
187-
topics: Map[String, Int],
188-
storageLevel: StorageLevel
189-
): ReceiverInputDStream[(K, V)] = {
190-
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
191-
}
192-
193-
/**
194-
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
195-
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
196-
* @param jssc JavaStreamingContext object
197-
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
198-
* @param groupId The group id for this consumer
199-
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
200-
* in its own thread
201-
*/
202-
def createReliableStream(
203-
jssc: JavaStreamingContext,
204-
zkQuorum: String,
205-
groupId: String,
206-
topics: JMap[String, JInt]
207-
): JavaPairReceiverInputDStream[String, String] = {
208-
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
209-
}
210-
211-
/**
212-
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
213-
* @param jssc JavaStreamingContext object
214-
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
215-
* @param groupId The group id for this consumer.
216-
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
217-
* in its own thread.
218-
* @param storageLevel RDD storage level.
219-
*/
220-
def createReliableStream(
221-
jssc: JavaStreamingContext,
222-
zkQuorum: String,
223-
groupId: String,
224-
topics: JMap[String, JInt],
225-
storageLevel: StorageLevel
226-
): JavaPairReceiverInputDStream[String, String] = {
227-
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
228-
storageLevel)
229-
}
230-
231-
/**
232-
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
233-
* @param jssc JavaStreamingContext object
234-
* @param keyTypeClass Key type of RDD
235-
* @param valueTypeClass value type of RDD
236-
* @param keyDecoderClass Type of kafka key decoder
237-
* @param valueDecoderClass Type of kafka value decoder
238-
* @param kafkaParams Map of kafka configuration parameters,
239-
* see http://kafka.apache.org/08/configuration.html
240-
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
241-
* in its own thread
242-
* @param storageLevel RDD storage level.
243-
*/
244-
def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
245-
jssc: JavaStreamingContext,
246-
keyTypeClass: Class[K],
247-
valueTypeClass: Class[V],
248-
keyDecoderClass: Class[U],
249-
valueDecoderClass: Class[T],
250-
kafkaParams: JMap[String, String],
251-
topics: JMap[String, JInt],
252-
storageLevel: StorageLevel
253-
): JavaPairReceiverInputDStream[K, V] = {
254-
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
255-
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
256-
257-
implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
258-
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
259-
260-
createReliableStream[K, V, U, T](
261-
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
262-
}
263147
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ class ReliableKafkaReceiver[
8282
}
8383

8484
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
85-
// TODO. this should be replaced to reliable store after WAL is ready.
8685
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
8786

8887
// Commit and remove the related offsets.
@@ -120,7 +119,7 @@ class ReliableKafkaReceiver[
120119

121120
if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
122121
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
123-
"otherwise we cannot enable reliable offset commit mechanism")
122+
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")
124123
}
125124

126125
val props = new Properties()

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
3030
import KafkaTestUtils._
3131

3232
test("Reliable Kafka input stream") {
33-
val ssc = new StreamingContext(master, framework, batchDuration)
33+
val sparkConf = new SparkConf()
34+
.setMaster(master)
35+
.setAppName(framework)
36+
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
37+
val ssc = new StreamingContext(sparkConf, batchDuration)
3438
val topic = "test"
3539
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
3640
createTopic(topic)
@@ -40,7 +44,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
4044
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
4145
"auto.offset.reset" -> "smallest")
4246

43-
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
47+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
4448
ssc,
4549
kafkaParams,
4650
Map(topic -> 1),
@@ -64,7 +68,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
6468
}
6569

6670
test("Verify the offset commit") {
67-
val ssc = new StreamingContext(master, framework, batchDuration)
71+
val sparkConf = new SparkConf()
72+
.setMaster(master)
73+
.setAppName(framework)
74+
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
75+
val ssc = new StreamingContext(sparkConf, batchDuration)
6876
val topic = "test"
6977
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
7078
createTopic(topic)
@@ -78,7 +86,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
7886

7987
assert(getCommitOffset(groupId, topic, 0) === 0L)
8088

81-
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
89+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
8290
ssc,
8391
kafkaParams,
8492
Map(topic -> 1),
@@ -92,7 +100,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
92100
}
93101

94102
test("Verify multiple topics offset commit") {
95-
val ssc = new StreamingContext(master, framework, batchDuration)
103+
val sparkConf = new SparkConf()
104+
.setMaster(master)
105+
.setAppName(framework)
106+
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
107+
val ssc = new StreamingContext(sparkConf, batchDuration)
96108
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
97109
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
98110
topics.foreach { case (t, _) =>
@@ -108,7 +120,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
108120

109121
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
110122

111-
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
123+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
112124
ssc,
113125
kafkaParams,
114126
topics,
@@ -125,6 +137,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
125137
val sparkConf = new SparkConf()
126138
.setMaster(master)
127139
.setAppName(framework)
140+
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
128141
var ssc = new StreamingContext(
129142
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
130143
batchDuration)
@@ -141,7 +154,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
141154
"group.id" -> groupId,
142155
"auto.offset.reset" -> "smallest")
143156

144-
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
157+
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
145158
ssc,
146159
kafkaParams,
147160
topics,
@@ -161,7 +174,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
161174

162175
// Restart to see if data is consumed from last checkpoint.
163176
ssc = new StreamingContext(sparkConf, batchDuration)
164-
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
177+
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
165178
ssc,
166179
kafkaParams,
167180
topics,

0 commit comments

Comments
 (0)