@@ -18,28 +18,27 @@ package org.apache.spark.streaming.flume
18
18
19
19
20
20
import java .net .InetSocketAddress
21
- import java .util .concurrent .{LinkedBlockingQueue , TimeUnit , Executors }
21
+ import java .util .concurrent .{TimeUnit , LinkedBlockingQueue , Executors }
22
22
23
- import org . apache . avro . AvroRemoteException
23
+ import com . google . common . base . Throwables
24
24
25
25
import scala .collection .JavaConversions ._
26
26
import scala .collection .mutable .ArrayBuffer
27
27
import scala .reflect .ClassTag
28
+ import scala .util .Try
29
+ import scala .util .control .Breaks
28
30
29
31
import com .google .common .util .concurrent .ThreadFactoryBuilder
30
32
import org .apache .avro .ipc .NettyTransceiver
31
33
import org .apache .avro .ipc .specific .SpecificRequestor
32
- import org .jboss .netty .channel .socket .nio .NioClientSocketChannelFactory
33
-
34
34
import org .apache .spark .Logging
35
35
import org .apache .spark .storage .StorageLevel
36
36
import org .apache .spark .streaming .StreamingContext
37
37
import org .apache .spark .streaming .dstream .ReceiverInputDStream
38
38
import org .apache .spark .streaming .receiver .Receiver
39
39
import org .apache .spark .streaming .flume .sink ._
40
+ import org .jboss .netty .channel .socket .nio .NioClientSocketChannelFactory
40
41
41
- import scala .util .Try
42
- import scala .util .control .Breaks
43
42
44
43
/**
45
44
* A [[ReceiverInputDStream ]] that can be used to read data from several Flume agents running
@@ -52,11 +51,11 @@ import scala.util.control.Breaks
52
51
* @tparam T Class type of the object of this stream
53
52
*/
54
53
private [streaming] class FlumePollingInputDStream [T : ClassTag ](
55
- @ transient _ssc : StreamingContext ,
56
- val addresses : Seq [InetSocketAddress ],
57
- val maxBatchSize : Int ,
58
- val parallelism : Int ,
59
- storageLevel : StorageLevel
54
+ @ transient _ssc : StreamingContext ,
55
+ val addresses : Seq [InetSocketAddress ],
56
+ val maxBatchSize : Int ,
57
+ val parallelism : Int ,
58
+ storageLevel : StorageLevel
60
59
) extends ReceiverInputDStream [SparkFlumeEvent ](_ssc) {
61
60
62
61
override def getReceiver (): Receiver [SparkFlumeEvent ] = {
@@ -65,10 +64,10 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
65
64
}
66
65
67
66
private [streaming] class FlumePollingReceiver (
68
- addresses : Seq [InetSocketAddress ],
69
- maxBatchSize : Int ,
70
- parallelism : Int ,
71
- storageLevel : StorageLevel
67
+ addresses : Seq [InetSocketAddress ],
68
+ maxBatchSize : Int ,
69
+ parallelism : Int ,
70
+ storageLevel : StorageLevel
72
71
) extends Receiver [SparkFlumeEvent ](storageLevel) with Logging {
73
72
74
73
lazy val channelFactoryExecutor =
@@ -133,30 +132,27 @@ private[streaming] class FlumePollingReceiver(
133
132
}
134
133
}.recover {
135
134
case e : Exception =>
136
- // Is the exception an interrupted exception? If yes,
137
- // check if the receiver was stopped. If the receiver was stopped,
138
- // simply exit. Else send a Nack and exit.
139
- if (e.isInstanceOf [InterruptedException ]) {
140
- if (isStopped()) {
141
- loop.break()
142
- } else {
143
- sendNack(batchReceived, client, seq, e)
144
- }
145
- }
146
- // if there is a cause do the same check as above.
147
- Option (e.getCause) match {
148
- case Some (exception : InterruptedException ) =>
135
+ Throwables .getRootCause(e) match {
136
+ // If the cause was an InterruptedException,
137
+ // then check if the receiver is stopped - if yes,
138
+ // just break out of the loop. Else send a Nack and
139
+ // log a warning.
140
+ // In the unlikely case, the cause was not an Exception,
141
+ // then just throw it out and exit.
142
+ case interrupted : InterruptedException =>
149
143
if (isStopped()) {
150
144
loop.break()
151
145
} else {
152
- sendNack(batchReceived, client, seq, e)
146
+ sendNack(batchReceived, client, seq)
147
+ logWarning(" Interrupted while receiving data from Flume" , interrupted)
153
148
}
154
- case Some (otherException : Exception ) =>
155
- sendNack(batchReceived, client, seq, e)
156
- case Some (majorError : Throwable ) =>
157
- throw majorError // kill immediately
158
- case None =>
159
- sendNack(batchReceived, client, seq, e)
149
+ case exception : Exception =>
150
+ sendNack(batchReceived, client, seq)
151
+ logWarning(" Error while receiving data from Flume" , exception)
152
+ case majorError : Throwable =>
153
+ // Don't bother with Nack - exit immediately
154
+ logError(" Major error while receiving data from Flume." , majorError)
155
+ throw majorError
160
156
}
161
157
}
162
158
} catch {
@@ -173,7 +169,7 @@ private[streaming] class FlumePollingReceiver(
173
169
}
174
170
175
171
private def sendNack (batchReceived : Boolean , client : SparkFlumeProtocol .Callback ,
176
- seq : CharSequence , exception : Exception ): Unit = {
172
+ seq : CharSequence ): Unit = {
177
173
Try {
178
174
if (batchReceived) {
179
175
// Let Flume know that the events need to be pushed back into the channel.
@@ -183,10 +179,8 @@ private[streaming] class FlumePollingReceiver(
183
179
}
184
180
}.recover({
185
181
case e : Exception => logError(
186
- " Sending Nack also failed. A Flume agent is down." )
182
+ " Sending Nack also failed. A Flume agent is down." , e )
187
183
})
188
- TimeUnit .SECONDS .sleep(2L ) // for now just leave this as a fixed 2 seconds.
189
- logWarning(" Error while attempting to store events" , exception)
190
184
}
191
185
192
186
override def onStop (): Unit = {
0 commit comments