Skip to content

Commit b317d74

Browse files
Egor Krivokonanicolaspp
andcommitted
Adding easy access to commitable offsets (apache#628)
* Adding SQL API to write to kafka from Spark (apache#567) * Branch 2.4.3 extended kafka and examples (apache#569) * The v2 API is in its own package - the v2 api is in a different package - the old functionality is available in a separated package * v2 API examples - All the examples are using the newest API. - I have removed the old examples since they are not relevant any more and the same functionality is shown in the new examples usin the new API. * Adding easy access to commitable offsets * Adding easy access to commitable offsets Co-authored-by: Nicolas A Perez <[email protected]>
1 parent ae5ba9f commit b317d74

File tree

1 file changed

+21
-0
lines changed
  • external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010

1 file changed

+21
-0
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package org.apache.spark.streaming
1919

2020
import org.apache.spark.internal.config.ConfigBuilder
2121

22+
import org.apache.kafka.clients.consumer.OffsetCommitCallback
23+
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, DirectKafkaInputDStream, HasOffsetRanges}
24+
2225
/**
2326
* Spark Integration for Kafka 0.9
2427
*/
@@ -72,5 +75,23 @@ package object kafka010 { //scalastyle:ignore
7275
.booleanConf
7376
.createWithDefault(false)
7477

78+
/**
79+
* This extension provides easy access to commit offsets back to MapR-ES or Kafka
80+
*
81+
* @param directKafkaInputDStream We can only call this function on the original stream and not the transformations
82+
* @tparam K
83+
* @tparam V
84+
*/
85+
implicit class CanCommitStreamOffsets[K, V](directKafkaInputDStream: DirectKafkaInputDStream[K, V]) {
86+
def commitOffsetsAsync(): Unit = commitOffsetsAsync(null)
87+
88+
def commitOffsetsAsync(callback: OffsetCommitCallback): Unit = {
89+
directKafkaInputDStream.foreachRDD { rdd =>
90+
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
91+
92+
directKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
93+
}
94+
}
95+
}
7596
}
7697

0 commit comments

Comments
 (0)