Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
val props = new Properties()
props.put("metadata.broker.list", brokerAddress)
props.put("serializer.class", classOf[StringEncoder].getName)
// wait for all in-sync replicas to ack sends
props.put("request.required.acks", "-1")
props
}

Expand Down Expand Up @@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}

/** Wait until the leader offset for the given topic/partition equals the specified offset */
def waitUntilLeaderOffset(
topic: String,
partition: Int,
offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
val tp = TopicAndPartition(topic, partition)
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
llo == offset,
s"$topic $partition $offset not reached after timeout")
}
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ public void testKafkaRDD() throws InterruptedException {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());

kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")

kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)

val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
Expand All @@ -86,7 +84,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
val sentCount = sent.values.sum
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)

// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
Expand All @@ -113,7 +110,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)

kafkaTestUtils.sendMessages(topic, sentOnlyOne)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)

assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ object MimaExcludes {
// Mima false positive (was a private[spark] class)
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.util.collection.PairIterator"),
// Removing a testing method from a private class
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution")
)
Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ def test_kafka_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
Expand All @@ -631,7 +630,6 @@ def test_kafka_direct_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
Expand All @@ -646,7 +644,6 @@ def test_kafka_direct_stream_from_offset(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
Expand All @@ -661,7 +658,6 @@ def test_kafka_rdd(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

Expand All @@ -677,7 +673,6 @@ def test_kafka_rdd_with_leaders(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)

Expand Down