Skip to content

Commit 7710b71

Browse files
jerryshaotdas
authored andcommitted
[SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kafka 0.8
Update the KafkaReceiver's behavior when auto.offset.reset is set. In Kafka 0.8, `auto.offset.reset` is a hint for out-range offset to seek to the beginning or end of the partition. While in the previous code `auto.offset.reset` is a enforcement to seek to the beginning or end immediately, this is different from Kafka 0.8 defined behavior. Also deleting extesting ZK metadata in Receiver when multiple consumers are launched will introduce issue as mentioned in [SPARK-2383](https://issues.apache.org/jira/browse/SPARK-2383). So Here we change to offer user to API to explicitly reset offset before create Kafka stream, while in the meantime keep the same behavior as Kafka 0.8 for parameter `auto.offset.reset`. @tdas, would you please review this PR? Thanks a lot. Author: jerryshao <[email protected]> Closes #1420 from jerryshao/kafka-fix and squashes the following commits: d6ae94d [jerryshao] Address the comment to remove the resetOffset() function de3a4c8 [jerryshao] Fix compile error 4a1c3f9 [jerryshao] Doc changes b2c1430 [jerryshao] Move offset reset to a helper function to let user explicitly delete ZK metadata by calling this API fac8fd6 [jerryshao] Changes to align with Kafka 0.8 (cherry picked from commit c8850a3) Signed-off-by: Tathagata Das <[email protected]>
1 parent fe8a1cd commit 7710b71

File tree

2 files changed

+5
-36
lines changed

2 files changed

+5
-36
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ import java.util.concurrent.Executors
2626
import kafka.consumer._
2727
import kafka.serializer.Decoder
2828
import kafka.utils.VerifiableProperties
29-
import kafka.utils.ZKStringSerializer
30-
import org.I0Itec.zkclient._
3129

3230
import org.apache.spark.Logging
3331
import org.apache.spark.storage.StorageLevel
@@ -97,12 +95,6 @@ class KafkaReceiver[
9795
consumerConnector = Consumer.create(consumerConfig)
9896
logInfo("Connected to " + zkConnect)
9997

100-
// When auto.offset.reset is defined, it is our responsibility to try and whack the
101-
// consumer group zk node.
102-
if (kafkaParams.contains("auto.offset.reset")) {
103-
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
104-
}
105-
10698
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
10799
.newInstance(consumerConfig.props)
108100
.asInstanceOf[Decoder[K]]
@@ -139,26 +131,4 @@ class KafkaReceiver[
139131
}
140132
}
141133
}
142-
143-
// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
144-
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
145-
//
146-
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
147-
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
148-
// 'smallest'/'largest':
149-
// scalastyle:off
150-
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
151-
// scalastyle:on
152-
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
153-
val dir = "/consumers/" + groupId
154-
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
155-
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
156-
try {
157-
zk.deleteRecursive(dir)
158-
} catch {
159-
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
160-
} finally {
161-
zk.close()
162-
}
163-
}
164134
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20-
import scala.reflect.ClassTag
21-
import scala.collection.JavaConversions._
22-
2320
import java.lang.{Integer => JInt}
2421
import java.util.{Map => JMap}
2522

23+
import scala.reflect.ClassTag
24+
import scala.collection.JavaConversions._
25+
2626
import kafka.serializer.{Decoder, StringDecoder}
2727

2828
import org.apache.spark.storage.StorageLevel
2929
import org.apache.spark.streaming.StreamingContext
30-
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
31-
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
32-
30+
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
31+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
3332

3433
object KafkaUtils {
3534
/**

0 commit comments

Comments
 (0)