Skip to content

Commit 2a20a01

Browse files
committed
Address some comments
1 parent 9f636b3 commit 2a20a01

File tree

4 files changed

+26
-25
lines changed

4 files changed

+26
-25
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ReliableKafkaReceiver[
8383
*/
8484
private var blockGenerator: BlockGenerator = null
8585

86-
/** Threadpool running the handlers for receiving message from multiple topics and partitions. */
86+
/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
8787
private var messageHandlerThreadPool: ThreadPoolExecutor = null
8888

8989
override def onStart(): Unit = {
@@ -142,7 +142,6 @@ class ReliableKafkaReceiver[
142142
messageHandlerThreadPool.submit(new MessageHandler(stream))
143143
}
144144
}
145-
println("Starting")
146145
}
147146

148147
override def onStop(): Unit = {
@@ -177,25 +176,28 @@ class ReliableKafkaReceiver[
177176
}
178177
}
179178

180-
/** Store a Kafka message and the associated metadata as a tuple */
179+
/** Store a Kafka message and the associated metadata as a tuple. */
181180
private def storeMessageAndMetadata(
182181
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
183182
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
184183
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
185184
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
186185
}
187186

188-
/** Remember the current offsets for each topic and partition. This is called when a block is generated */
187+
/**
188+
* Remember the current offsets for each topic and partition. This is called when a block is
189+
* generated.
190+
*/
189191
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
190-
// Get a snapshot of current offset map and store with related block id. Since this hook
191-
// function is called in synchronized block, so we can get the snapshot without explicit lock.
192+
// Get a snapshot of current offset map and store with related block id.
192193
val offsetSnapshot = topicPartitionOffsetMap.toMap
193194
blockOffsetMap.put(blockId, offsetSnapshot)
194195
topicPartitionOffsetMap.clear()
195196
}
196197

197-
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
198-
private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
198+
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
199+
private def storeBlockAndCommitOffset(
200+
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
199201
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
200202
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
201203
blockOffsetMap.remove(blockId)
@@ -232,7 +234,6 @@ class ReliableKafkaReceiver[
232234
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
233235
override def run(): Unit = {
234236
while (!isStopped) {
235-
println(s"Starting message process thread ${Thread.currentThread().getId}.")
236237
try {
237238
val streamIterator = stream.iterator()
238239
while (streamIterator.hasNext) {

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.apache.spark.api.java.JavaPairRDD;
3434
import org.apache.spark.api.java.function.Function;
3535
import org.apache.spark.storage.StorageLevel;
36-
import org.apache.spark.streaming.Duration;
37-
import org.apache.spark.streaming.LocalJavaStreamingContext;
3836
import org.apache.spark.streaming.api.java.JavaDStream;
3937
import org.apache.spark.streaming.api.java.JavaPairDStream;
4038
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -43,23 +41,25 @@
4341
import org.junit.After;
4442
import org.junit.Before;
4543

46-
public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable {
47-
private transient JavaStreamingContext ssc = null;
48-
private Random random = new Random();
44+
public class JavaKafkaStreamSuite implements Serializable {
45+
private transient JavaStreamingContext ssc = null;
46+
private Random random = new Random();
47+
private transient KafkaStreamSuiteBase suiteBase = null;
4948

5049
@Before
5150
public void setUp() {
52-
beforeFunction();
51+
suiteBase = new KafkaStreamSuiteBase() { };
52+
suiteBase.beforeFunction();
5353
System.clearProperty("spark.driver.port");
54-
ssc = new JavaStreamingContext(sparkConf(), batchDuration());
54+
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
5555
}
5656

5757
@After
5858
public void tearDown() {
5959
ssc.stop();
6060
ssc = null;
6161
System.clearProperty("spark.driver.port");
62-
afterFunction();
62+
suiteBase.afterFunction();
6363
}
6464

6565
@Test
@@ -73,14 +73,14 @@ public void testKafkaStream() throws InterruptedException {
7373
sent.put("b", 3);
7474
sent.put("c", 10);
7575

76-
createTopic(topic);
76+
suiteBase.createTopic(topic);
7777
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
78-
produceAndSendMessage(topic,
78+
suiteBase.produceAndSendMessage(topic,
7979
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
8080
Predef.<Tuple2<String, Object>>conforms()));
8181

8282
HashMap<String, String> kafkaParams = new HashMap<String, String>();
83-
kafkaParams.put("zookeeper.connect", zkAddress());
83+
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
8484
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
8585
kafkaParams.put("auto.offset.reset", "smallest");
8686

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
4242
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
4343
import org.apache.spark.util.Utils
4444

45-
abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Logging {
45+
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
4646
import KafkaTestUtils._
4747

4848
val sparkConf = new SparkConf()
@@ -154,7 +154,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Lo
154154
}
155155
}
156156

157-
class KafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
157+
class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
158158

159159
before { beforeFunction() }
160160
after { afterFunction() }

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import scala.util.Random
2626

2727
import kafka.serializer.StringDecoder
2828
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
29+
import org.scalatest.BeforeAndAfter
2930
import org.scalatest.concurrent.Eventually
3031

3132
import org.apache.spark.storage.StorageLevel
3233
import org.apache.spark.streaming.StreamingContext
3334
import org.apache.spark.util.Utils
3435

35-
class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
36+
class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
3637
val topic = "topic"
3738
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
3839
var groupId: String = _
@@ -85,7 +86,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
8586
}
8687
ssc.stop()
8788
}
88-
/*
8989
test("Verify the offset commit") {
9090
// Verify the correctness of offset commit mechanism.
9191
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
@@ -147,7 +147,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
147147
}
148148
ssc.stop()
149149
}
150-
*/
150+
151151
/** Getting partition offset from Zookeeper. */
152152
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
153153
assert(zkClient != null, "Zookeeper client is not initialized")

0 commit comments

Comments
 (0)