Skip to content

Commit b127ff8

Browse files
koeningersrowen
authored andcommitted
[SPARK-2808] [STREAMING] [KAFKA] cleanup tests from
see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests Author: cody koeninger <[email protected]> Closes apache#5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following commits: 1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of method from private test class af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins attempt 1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests
1 parent e84815d commit b127ff8

File tree

5 files changed

+5
-27
lines changed

5 files changed

+5
-27
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
195195
val props = new Properties()
196196
props.put("metadata.broker.list", brokerAddress)
197197
props.put("serializer.class", classOf[StringEncoder].getName)
198+
// wait for all in-sync replicas to ack sends
199+
props.put("request.required.acks", "-1")
198200
props
199201
}
200202

@@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
229231
tryAgain(1)
230232
}
231233

232-
/** Wait until the leader offset for the given topic/partition equals the specified offset */
233-
def waitUntilLeaderOffset(
234-
topic: String,
235-
partition: Int,
236-
offset: Long): Unit = {
237-
eventually(Time(10000), Time(100)) {
238-
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
239-
val tp = TopicAndPartition(topic, partition)
240-
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
241-
assert(
242-
llo == offset,
243-
s"$topic $partition $offset not reached after timeout")
244-
}
245-
}
246-
247234
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
248235
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
249236
case Some(partitionState) =>

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ public void testKafkaRDD() throws InterruptedException {
7272
HashMap<String, String> kafkaParams = new HashMap<String, String>();
7373
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
7474

75-
kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
76-
kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
77-
7875
OffsetRange[] offsetRanges = {
7976
OffsetRange.create(topic1, 0, 0, 1),
8077
OffsetRange.create(topic2, 0, 0, 1)

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
6161
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
6262
"group.id" -> s"test-consumer-${Random.nextInt}")
6363

64-
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
65-
6664
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
6765

6866
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
@@ -86,7 +84,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
8684
// this is the "lots of messages" case
8785
kafkaTestUtils.sendMessages(topic, sent)
8886
val sentCount = sent.values.sum
89-
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
9087

9188
// rdd defined from leaders after sending messages, should get the number sent
9289
val rdd = getRdd(kc, Set(topic))
@@ -113,7 +110,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
113110
val sentOnlyOne = Map("d" -> 1)
114111

115112
kafkaTestUtils.sendMessages(topic, sentOnlyOne)
116-
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
117113

118114
assert(rdd2.isDefined)
119115
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")

project/MimaExcludes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ object MimaExcludes {
4747
// Mima false positive (was a private[spark] class)
4848
ProblemFilters.exclude[MissingClassProblem](
4949
"org.apache.spark.util.collection.PairIterator"),
50+
// Removing a testing method from a private class
51+
ProblemFilters.exclude[MissingMethodProblem](
52+
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
5053
// SQL execution is considered private.
5154
excludePackage("org.apache.spark.sql.execution")
5255
)

python/pyspark/streaming/tests.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,6 @@ def test_kafka_stream(self):
615615

616616
self._kafkaTestUtils.createTopic(topic)
617617
self._kafkaTestUtils.sendMessages(topic, sendData)
618-
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
619618

620619
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
621620
"test-streaming-consumer", {topic: 1},
@@ -631,7 +630,6 @@ def test_kafka_direct_stream(self):
631630

632631
self._kafkaTestUtils.createTopic(topic)
633632
self._kafkaTestUtils.sendMessages(topic, sendData)
634-
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
635633

636634
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
637635
self._validateStreamResult(sendData, stream)
@@ -646,7 +644,6 @@ def test_kafka_direct_stream_from_offset(self):
646644

647645
self._kafkaTestUtils.createTopic(topic)
648646
self._kafkaTestUtils.sendMessages(topic, sendData)
649-
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
650647

651648
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
652649
self._validateStreamResult(sendData, stream)
@@ -661,7 +658,6 @@ def test_kafka_rdd(self):
661658

662659
self._kafkaTestUtils.createTopic(topic)
663660
self._kafkaTestUtils.sendMessages(topic, sendData)
664-
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
665661
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
666662
self._validateRddResult(sendData, rdd)
667663

@@ -677,7 +673,6 @@ def test_kafka_rdd_with_leaders(self):
677673

678674
self._kafkaTestUtils.createTopic(topic)
679675
self._kafkaTestUtils.sendMessages(topic, sendData)
680-
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
681676
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
682677
self._validateRddResult(sendData, rdd)
683678

0 commit comments

Comments
 (0)