Skip to content

Commit e8d59b9

Browse files
committed
[MAPR-32290] Spark processing offsets when messages are already TTL in the first batch (apache#376)
1 parent 4f380b4 commit e8d59b9

File tree

1 file changed

+5
-12
lines changed

1 file changed

+5
-12
lines changed

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
7676

7777
@transient val serviceConsumer: Consumer[K, V] = consumerStrategy.serviceConsumer
7878

79-
def consumerForAssign(): KafkaConsumer[Long, String] = this.synchronized {
80-
val properties = consumerStrategy.executorKafkaParams
81-
properties.put("max.poll.records", "1")
82-
properties.put(ConsumerConfig.GROUP_ID_CONFIG,
83-
s"${properties.get(ConsumerConfig.GROUP_ID_CONFIG)}_assignGroup")
84-
new KafkaConsumer[Long, String](properties)
85-
}
86-
8779
override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
8880
logError("Kafka ConsumerRecord is not serializable. " +
8981
"Use .map to extract fields before calling .persist or .window")
@@ -288,26 +280,27 @@ private[spark] class DirectKafkaInputDStream[K, V](
288280

289281
override def start(): Unit = {
290282
val c = consumer
291-
val consumerAssign = consumerForAssign
292283
val pollTimeout = ssc.sparkContext.getConf
293-
.getLong("spark.streaming.kafka.consumer.driver.poll.ms", 120000)
284+
.getLong("spark.streaming.kafka.consumer.driver.poll.ms", 5000)
294285
paranoidPoll(c)
295286
if (currentOffsets.isEmpty) {
296287
currentOffsets = c.assignment().asScala.map { tp =>
297288
tp -> {
298289
val position = c.position(tp)
299290

300-
consumerAssign.assign(ju.Arrays.asList(tp))
301-
val records = consumerAssign.poll(pollTimeout).iterator()
291+
serviceConsumer.assign(ju.Arrays.asList(tp))
292+
val records = serviceConsumer.poll(pollTimeout).iterator()
302293
val firstRecordOffset = if (records.hasNext) {
303294
records.next().offset()
304295
} else {
305296
c.endOffsets(ju.Arrays.asList(tp)).get(tp).longValue()
306297
}
307298

308299
if (position < firstRecordOffset) {
300+
serviceConsumer.seek(tp, firstRecordOffset)
309301
firstRecordOffset
310302
} else {
303+
serviceConsumer.seek(tp, position)
311304
position
312305
}
313306
}

0 commit comments

Comments
 (0)