Skip to content

Commit 281614d

Browse files
committed
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <[email protected]> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example (cherry picked from commit c151346) Signed-off-by: Tathagata Das <[email protected]>
1 parent 01905c4 commit 281614d

File tree

17 files changed

+1048
-262
lines changed

17 files changed

+1048
-262
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.util.HashMap;
21+
import java.util.HashSet;
22+
import java.util.Arrays;
23+
import java.util.regex.Pattern;
24+
25+
import scala.Tuple2;
26+
27+
import com.google.common.collect.Lists;
28+
import kafka.serializer.StringDecoder;
29+
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.api.java.function.*;
32+
import org.apache.spark.streaming.api.java.*;
33+
import org.apache.spark.streaming.kafka.KafkaUtils;
34+
import org.apache.spark.streaming.Durations;
35+
36+
/**
37+
* Consumes messages from one or more topics in Kafka and does wordcount.
38+
* Usage: DirectKafkaWordCount <brokers> <topics>
39+
* <brokers> is a list of one or more Kafka brokers
40+
* <topics> is a list of one or more kafka topics to consume from
41+
*
42+
* Example:
43+
* $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
44+
*/
45+
46+
public final class JavaDirectKafkaWordCount {
47+
private static final Pattern SPACE = Pattern.compile(" ");
48+
49+
public static void main(String[] args) {
50+
if (args.length < 2) {
51+
System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
52+
" <brokers> is a list of one or more Kafka brokers\n" +
53+
" <topics> is a list of one or more kafka topics to consume from\n\n");
54+
System.exit(1);
55+
}
56+
57+
StreamingExamples.setStreamingLogLevels();
58+
59+
String brokers = args[0];
60+
String topics = args[1];
61+
62+
// Create context with 2 second batch interval
63+
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
64+
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
65+
66+
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
67+
HashMap<String, String> kafkaParams = new HashMap<String, String>();
68+
kafkaParams.put("metadata.broker.list", brokers);
69+
70+
// Create direct kafka stream with brokers and topics
71+
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
72+
jssc,
73+
String.class,
74+
String.class,
75+
StringDecoder.class,
76+
StringDecoder.class,
77+
kafkaParams,
78+
topicsSet
79+
);
80+
81+
// Get the lines, split them into words, count the words and print
82+
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
83+
@Override
84+
public String call(Tuple2<String, String> tuple2) {
85+
return tuple2._2();
86+
}
87+
});
88+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
89+
@Override
90+
public Iterable<String> call(String x) {
91+
return Lists.newArrayList(SPACE.split(x));
92+
}
93+
});
94+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
95+
new PairFunction<String, String, Integer>() {
96+
@Override
97+
public Tuple2<String, Integer> call(String s) {
98+
return new Tuple2<String, Integer>(s, 1);
99+
}
100+
}).reduceByKey(
101+
new Function2<Integer, Integer, Integer>() {
102+
@Override
103+
public Integer call(Integer i1, Integer i2) {
104+
return i1 + i2;
105+
}
106+
});
107+
wordCounts.print();
108+
109+
// Start the computation
110+
jssc.start();
111+
jssc.awaitTermination();
112+
}
113+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming
19+
20+
import kafka.serializer.StringDecoder
21+
22+
import org.apache.spark.streaming._
23+
import org.apache.spark.streaming.kafka._
24+
import org.apache.spark.SparkConf
25+
26+
/**
27+
* Consumes messages from one or more topics in Kafka and does wordcount.
28+
* Usage: DirectKafkaWordCount <brokers> <topics>
29+
* <brokers> is a list of one or more Kafka brokers
30+
* <topics> is a list of one or more kafka topics to consume from
31+
*
32+
* Example:
33+
* $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
34+
*/
35+
object DirectKafkaWordCount {
36+
def main(args: Array[String]) {
37+
if (args.length < 2) {
38+
System.err.println(s"""
39+
|Usage: DirectKafkaWordCount <brokers> <topics>
40+
| <brokers> is a list of one or more Kafka brokers
41+
| <topics> is a list of one or more kafka topics to consume from
42+
|
43+
"""".stripMargin)
44+
System.exit(1)
45+
}
46+
47+
StreamingExamples.setStreamingLogLevels()
48+
49+
val Array(brokers, topics) = args
50+
51+
// Create context with 2 second batch interval
52+
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
53+
val ssc = new StreamingContext(sparkConf, Seconds(2))
54+
55+
// Create direct kafka stream with brokers and topics
56+
val topicsSet = topics.split(",").toSet
57+
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
58+
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
59+
ssc, kafkaParams, topicsSet)
60+
61+
// Get the lines, split them into words, count the words and print
62+
val lines = messages.map(_._2)
63+
val words = lines.flatMap(_.split(" "))
64+
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
65+
wordCounts.print()
66+
67+
// Start the computation
68+
ssc.start()
69+
ssc.awaitTermination()
70+
}
71+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._
5050
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
5151
* starting point of the stream
5252
* @param messageHandler function for translating each message into the desired type
53-
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
5453
*/
5554
private[streaming]
5655
class DirectKafkaInputDStream[
5756
K: ClassTag,
5857
V: ClassTag,
59-
U <: Decoder[_]: ClassTag,
60-
T <: Decoder[_]: ClassTag,
58+
U <: Decoder[K]: ClassTag,
59+
T <: Decoder[V]: ClassTag,
6160
R: ClassTag](
6261
@transient ssc_ : StreamingContext,
6362
val kafkaParams: Map[String, String],

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,9 @@ object KafkaCluster {
332332
extends ConsumerConfig(originalProps) {
333333
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
334334
val hpa = hp.split(":")
335+
if (hpa.size == 1) {
336+
throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
337+
}
335338
(hpa(0), hpa(1).toInt)
336339
}
337340
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
3636
* Starting and ending offsets are specified in advance,
3737
* so that you can control exactly-once semantics.
3838
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
39-
* configuration parameters</a>.
40-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
41-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
42-
* @param batch Each KafkaRDDPartition in the batch corresponds to a
43-
* range of offsets for a given Kafka topic/partition
39+
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
40+
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
41+
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
4442
* @param messageHandler function for translating each message into the desired type
4543
*/
46-
private[spark]
44+
private[kafka]
4745
class KafkaRDD[
4846
K: ClassTag,
4947
V: ClassTag,
@@ -183,7 +181,7 @@ class KafkaRDD[
183181
}
184182
}
185183

186-
private[spark]
184+
private[kafka]
187185
object KafkaRDD {
188186
import KafkaCluster.LeaderOffset
189187

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.Partition
2626
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
2727
* @param port preferred kafka host's port
2828
*/
29-
private[spark]
29+
private[kafka]
3030
class KafkaRDDPartition(
3131
val index: Int,
3232
val topic: String,
@@ -36,24 +36,3 @@ class KafkaRDDPartition(
3636
val host: String,
3737
val port: Int
3838
) extends Partition
39-
40-
private[spark]
41-
object KafkaRDDPartition {
42-
def apply(
43-
index: Int,
44-
topic: String,
45-
partition: Int,
46-
fromOffset: Long,
47-
untilOffset: Long,
48-
host: String,
49-
port: Int
50-
): KafkaRDDPartition = new KafkaRDDPartition(
51-
index,
52-
topic,
53-
partition,
54-
fromOffset,
55-
untilOffset,
56-
host,
57-
port
58-
)
59-
}

0 commit comments

Comments
 (0)