Skip to content

Commit 6cf1cd4

Browse files
committed
Made KinesisReceiver.onStart non-blocking
1 parent 6b18cdc commit 6cf1cd4

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis
1818

1919
import java.util.UUID
2020

21+
import scala.util.control.NonFatal
22+
2123
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
2224
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
2325
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
@@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver(
98100
*/
99101
private var worker: Worker = null
100102

103+
/** Thread running the worker */
104+
private var workerThread: Thread = null
105+
101106
/**
102107
* This is called when the KinesisReceiver starts and must be non-blocking.
103108
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver(
126131
}
127132

128133
worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
129-
worker.run()
130-
134+
workerThread = new Thread() {
135+
override def run(): Unit = {
136+
try {
137+
worker.run()
138+
} catch {
139+
case NonFatal(e) =>
140+
restart("Error running the KCL worker in Receiver", e)
141+
}
142+
}
143+
}
144+
workerThread.setName("Kinesis Receiver")
145+
workerThread.setDaemon(true)
146+
workerThread.start()
131147
logInfo(s"Started receiver with workerId $workerId")
132148
}
133149

@@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver(
137153
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
138154
*/
139155
override def onStop() {
140-
if (worker != null) {
141-
worker.shutdown()
156+
if (workerThread != null) {
157+
if (worker != null) {
158+
worker.shutdown()
159+
worker = null
160+
}
161+
workerThread.join()
162+
workerThread = null
142163
logInfo(s"Stopped receiver for workerId $workerId")
143-
worker = null
144164
}
145165
workerId = null
146166
}

0 commit comments

Comments
 (0)