Skip to content

Commit 4854ee9

Browse files
committed
Address all the comments
1 parent 96c7a1d commit 4854ee9

File tree

2 files changed

+75
-37
lines changed

2 files changed

+75
-37
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,22 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import java.util.Properties
21-
import java.util.concurrent.{ConcurrentHashMap, Executors}
21+
import java.util.concurrent.ConcurrentHashMap
2222

2323
import scala.collection.Map
2424
import scala.collection.mutable
2525
import scala.reflect.{classTag, ClassTag}
2626

2727
import kafka.common.TopicAndPartition
28-
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
28+
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
2929
import kafka.serializer.Decoder
3030
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
3131
import org.I0Itec.zkclient.ZkClient
3232

3333
import org.apache.spark.{SparkEnv, Logging}
3434
import org.apache.spark.storage.{StreamBlockId, StorageLevel}
3535
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
36+
import org.apache.spark.util.Utils
3637

3738
private[streaming]
3839
class ReliableKafkaReceiver[
@@ -45,27 +46,33 @@ class ReliableKafkaReceiver[
4546
storageLevel: StorageLevel)
4647
extends Receiver[Any](storageLevel) with Logging {
4748

48-
/** High level consumer to connect to Kafka */
49+
/** High level consumer to connect to Kafka. */
4950
private var consumerConnector: ConsumerConnector = null
5051

51-
/** zkClient to connect to Zookeeper to commit the offsets */
52+
/** zkClient to connect to Zookeeper to commit the offsets. */
5253
private var zkClient: ZkClient = null
5354

5455
private val groupId = kafkaParams("group.id")
5556

56-
private lazy val env = SparkEnv.get
57+
private def conf() = SparkEnv.get.conf
5758

5859
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
5960

6061
/** A HashMap to manage the offset for each topic/partition, this HashMap is called in
61-
* synchronized block, so mutable HashMap will not meet concurrency issue */
62-
private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
62+
* synchronized block, so mutable HashMap will not meet concurrency issue.
63+
*/
64+
private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
6365

64-
/** A concurrent HashMap to store the stream block id and related offset snapshot */
65-
private lazy val blockOffsetMap =
66-
new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
66+
/** A concurrent HashMap to store the stream block id and related offset snapshot. */
67+
private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
6768

68-
private lazy val blockGeneratorListener = new BlockGeneratorListener {
69+
/** Manage the BlockGenerator in receiver itself for better managing block store and offset
70+
* commit.
71+
*/
72+
private var blockGenerator: BlockGenerator = null
73+
74+
/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
75+
private final class OffsetCheckpointListener extends BlockGeneratorListener {
6976
override def onStoreData(data: Any, metadata: Any): Unit = {
7077
if (metadata != null) {
7178
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
@@ -96,10 +103,6 @@ class ReliableKafkaReceiver[
96103
}
97104
}
98105

99-
/** Manage the BlockGenerator in receiver itself for better managing block store and offset
100-
* commit */
101-
private var blockGenerator: BlockGenerator = null
102-
103106
override def onStop(): Unit = {
104107
if (consumerConnector != null) {
105108
consumerConnector.shutdown()
@@ -111,13 +114,33 @@ class ReliableKafkaReceiver[
111114
zkClient = null
112115
}
113116

114-
blockGenerator.stop()
117+
if (blockGenerator != null) {
118+
blockGenerator.stop()
119+
blockGenerator = null
120+
}
121+
122+
if (topicPartitionOffsetMap != null) {
123+
topicPartitionOffsetMap.clear()
124+
topicPartitionOffsetMap = null
125+
}
126+
127+
if (blockOffsetMap != null) {
128+
blockOffsetMap.clear()
129+
blockOffsetMap = null
130+
}
115131
}
116132

117133
override def onStart(): Unit = {
118134
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
119135

120-
blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
136+
// Initialize the topic-partition / offset hash map.
137+
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
138+
139+
// Initialize the stream block id / offset snapshot hash map.
140+
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
141+
142+
// Initialize the block generator for storing Kafka message.
143+
blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf())
121144

122145
if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
123146
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
@@ -133,7 +156,7 @@ class ReliableKafkaReceiver[
133156

134157
val consumerConfig = new ConsumerConfig(props)
135158

136-
assert(consumerConfig.autoCommitEnable == false)
159+
assert(!consumerConfig.autoCommitEnable)
137160

138161
logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
139162
consumerConnector = Consumer.create(consumerConfig)
@@ -156,41 +179,45 @@ class ReliableKafkaReceiver[
156179
val topicMessageStreams = consumerConnector.createMessageStreams(
157180
topics, keyDecoder, valueDecoder)
158181

159-
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
182+
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
160183

161184
try {
162185
topicMessageStreams.values.foreach { streams =>
163186
streams.foreach { stream =>
164-
executorPool.submit(new Runnable {
165-
override def run(): Unit = {
166-
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
167-
try {
168-
for (msgAndMetadata <- stream) {
169-
val topicAndPartition = TopicAndPartition(
170-
msgAndMetadata.topic, msgAndMetadata.partition)
171-
val metadata = (topicAndPartition, msgAndMetadata.offset)
172-
173-
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
174-
}
175-
} catch {
176-
case e: Throwable => logError("Error handling message; existing", e)
177-
}
178-
}
179-
})
187+
executorPool.submit(new MessageHandler(stream))
180188
}
181189
}
182190
} finally {
183191
executorPool.shutdown()
184192
}
185193
}
186194

195+
/** A inner class to handle received Kafka message. */
196+
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
197+
override def run(): Unit = {
198+
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
199+
try {
200+
for (msgAndMetadata <- stream) {
201+
val topicAndPartition = TopicAndPartition(
202+
msgAndMetadata.topic, msgAndMetadata.partition)
203+
val metadata = (topicAndPartition, msgAndMetadata.offset)
204+
205+
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
206+
}
207+
} catch {
208+
case e: Throwable => logError("Error handling message; existing", e)
209+
}
210+
}
211+
}
212+
187213
/**
188214
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
189215
* metadata schema in Zookeeper.
190216
*/
191217
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
192218
if (zkClient == null) {
193-
logError(s"zkClient $zkClient should be initialized at started")
219+
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
220+
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
194221
return
195222
}
196223

@@ -205,7 +232,7 @@ class ReliableKafkaReceiver[
205232
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
206233
}
207234

208-
logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
235+
logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
209236
s"partition ${topicAndPart.partition}")
210237
}
211238
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,17 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
6969
ssc.start()
7070
ssc.awaitTermination(3000)
7171

72+
// A basic process verification for ReliableKafkaReceiver.
73+
// Verify whether received message number is equal to the sent message number.
7274
assert(sent.size === result.size)
75+
// Verify whether each message is the same as the data to be verified.
7376
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
7477

7578
ssc.stop()
7679
}
7780

7881
test("Verify the offset commit") {
82+
// Verify the corretness of offset commit mechanism.
7983
val sparkConf = new SparkConf()
8084
.setMaster(master)
8185
.setAppName(framework)
@@ -97,8 +101,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
97101
"group.id" -> groupId,
98102
"auto.offset.reset" -> "smallest")
99103

104+
// Verify whether the offset of this group/topic/partition is 0 before starting.
100105
assert(getCommitOffset(groupId, topic, 0) === 0L)
101106

107+
// Do this to consume all the message of this group/topic.
102108
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
103109
ssc,
104110
kafkaParams,
@@ -109,6 +115,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
109115
ssc.awaitTermination(3000)
110116
ssc.stop()
111117

118+
// Verify the offset number whether it is equal to the total message number.
112119
assert(getCommitOffset(groupId, topic, 0) === 29L)
113120
}
114121

@@ -136,8 +143,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
136143
"group.id" -> groupId,
137144
"auto.offset.reset" -> "smallest")
138145

146+
// Before started, verify all the group/topic/partition offsets are 0.
139147
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
140148

149+
// Consuming all the data sent to the broker which will potential commit the offsets internally.
141150
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
142151
ssc,
143152
kafkaParams,
@@ -148,9 +157,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
148157
ssc.awaitTermination(3000)
149158
ssc.stop()
150159

160+
// Verify the offset for each group/topic to see whether they are equal to the expected one.
151161
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
152162
}
153163

164+
/** Getting partition offset from Zookeeper. */
154165
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
155166
assert(zkClient != null, "Zookeeper client is not initialized")
156167

0 commit comments

Comments
 (0)