Skip to content

Commit 6f671d0

Browse files
harishreedharantdas
authored andcommitted
[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Currently lot of errors get thrown from Avro IPC layer when the dstream or sink is shutdown. This PR cleans it up. Some refactoring is done in the receiver code to put all of the RPC code into a single Try and just recover from that. The sink code has also been cleaned up. Author: Hari Shreedharan <[email protected]> Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits: f93a07c [Hari Shreedharan] Formatting fixes. d7427cc [Hari Shreedharan] More fixes! a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes. 4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes. 8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor. 445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown 87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling. 9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method e7b8d82 [Hari Shreedharan] Incorporate review feedback 598efa7 [Hari Shreedharan] Clean up some exception handling code e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
1 parent 171a41c commit 6f671d0

File tree

5 files changed

+236
-86
lines changed

5 files changed

+236
-86
lines changed

external/flume-sink/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@
7070
<artifactId>scalatest_${scala.binary.version}</artifactId>
7171
<scope>test</scope>
7272
</dependency>
73+
<dependency>
74+
<groupId>org.scala-lang</groupId>
75+
<artifactId>scala-library</artifactId>
76+
</dependency>
7377
<dependency>
7478
<!--
7579
Netty explicitly added in test as it has been excluded from

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
1919
import java.util.concurrent.{ConcurrentHashMap, Executors}
2020
import java.util.concurrent.atomic.AtomicLong
2121

22+
import scala.collection.JavaConversions._
23+
2224
import org.apache.flume.Channel
2325
import org.apache.commons.lang.RandomStringUtils
2426
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
4547
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
4648
new ThreadFactoryBuilder().setDaemon(true)
4749
.setNameFormat("Spark Sink Processor Thread - %d").build()))
48-
private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
50+
private val sequenceNumberToProcessor =
51+
new ConcurrentHashMap[CharSequence, TransactionProcessor]()
4952
// This sink will not persist sequence numbers and reuses them if it gets restarted.
5053
// So it is possible to commit a transaction which may have been meant for the sink before the
5154
// restart.
@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
5558
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
5659
private val seqCounter = new AtomicLong(0)
5760

61+
@volatile private var stopped = false
62+
5863
/**
5964
* Returns a bunch of events to Spark over Avro RPC.
6065
* @param n Maximum number of events to return in a batch
@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
6368
override def getEventBatch(n: Int): EventBatch = {
6469
logDebug("Got getEventBatch call from Spark.")
6570
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
66-
val processor = new TransactionProcessor(channel, sequenceNumber,
67-
n, transactionTimeout, backOffInterval, this)
68-
transactionExecutorOpt.foreach(executor => {
69-
executor.submit(processor)
70-
})
71-
// Wait until a batch is available - will be an error if error message is non-empty
72-
val batch = processor.getEventBatch
73-
if (!SparkSinkUtils.isErrorBatch(batch)) {
74-
processorMap.put(sequenceNumber.toString, processor)
75-
logDebug("Sending event batch with sequence number: " + sequenceNumber)
71+
createProcessor(sequenceNumber, n) match {
72+
case Some(processor) =>
73+
transactionExecutorOpt.foreach(_.submit(processor))
74+
// Wait until a batch is available - will be an error if error message is non-empty
75+
val batch = processor.getEventBatch
76+
if (SparkSinkUtils.isErrorBatch(batch)) {
77+
// Remove the processor if it is an error batch since no ACK is sent.
78+
removeAndGetProcessor(sequenceNumber)
79+
logWarning("Received an error batch - no events were received from channel! ")
80+
}
81+
batch
82+
case None =>
83+
new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
84+
}
85+
}
86+
87+
private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
88+
sequenceNumberToProcessor.synchronized {
89+
if (!stopped) {
90+
val processor = new TransactionProcessor(
91+
channel, seq, n, transactionTimeout, backOffInterval, this)
92+
sequenceNumberToProcessor.put(seq, processor)
93+
Some(processor)
94+
} else {
95+
None
96+
}
7697
}
77-
batch
7898
}
7999

80100
/**
@@ -116,16 +136,20 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
116136
* longer tracked and the caller is responsible for that txn processor.
117137
*/
118138
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
119-
processorMap.remove(sequenceNumber.toString) // The toString is required!
139+
sequenceNumberToProcessor.synchronized {
140+
sequenceNumberToProcessor.remove(sequenceNumber.toString)
141+
}
120142
}
121143

122144
/**
123145
* Shuts down the executor used to process transactions.
124146
*/
125147
def shutdown() {
126148
logInfo("Shutting down Spark Avro Callback Handler")
127-
transactionExecutorOpt.foreach(executor => {
128-
executor.shutdownNow()
129-
})
149+
sequenceNumberToProcessor.synchronized {
150+
stopped = true
151+
sequenceNumberToProcessor.values().foreach(_.shutdown())
152+
}
153+
transactionExecutorOpt.foreach(_.shutdownNow())
130154
}
131155
}

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
6060
// succeeded.
6161
@volatile private var batchSuccess = false
6262

63+
@volatile private var stopped = false
64+
6365
// The transaction that this processor would handle
6466
var txOpt: Option[Transaction] = None
6567

@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
8890
batchAckLatch.countDown()
8991
}
9092

93+
private[flume] def shutdown(): Unit = {
94+
logDebug("Shutting down transaction processor")
95+
stopped = true
96+
}
97+
9198
/**
9299
* Populates events into the event batch. If the batch cannot be populated,
93100
* this method will not set the events into the event batch, but it sets an error message.
@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
106113
var gotEventsInThisTxn = false
107114
var loopCounter: Int = 0
108115
loop.breakable {
109-
while (events.size() < maxBatchSize
116+
while (!stopped && events.size() < maxBatchSize
110117
&& loopCounter < totalAttemptsToRemoveFromChannel) {
111118
loopCounter += 1
112119
Option(channel.take()) match {
@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
115122
ByteBuffer.wrap(event.getBody)))
116123
gotEventsInThisTxn = true
117124
case None =>
118-
if (!gotEventsInThisTxn) {
125+
if (!gotEventsInThisTxn && !stopped) {
119126
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
120127
" the current transaction")
121128
TimeUnit.MILLISECONDS.sleep(backOffInterval)
@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
125132
}
126133
}
127134
}
128-
if (!gotEventsInThisTxn) {
135+
if (!gotEventsInThisTxn && !stopped) {
129136
val msg = "Tried several times, " +
130137
"but did not get any events from the channel!"
131138
logWarning(msg)
@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
136143
}
137144
})
138145
} catch {
146+
case interrupted: InterruptedException =>
147+
// Don't pollute logs if the InterruptedException came from this being stopped
148+
if (!stopped) {
149+
logWarning("Error while processing transaction.", interrupted)
150+
}
139151
case e: Exception =>
140152
logWarning("Error while processing transaction.", e)
141153
eventBatch.setErrorMsg(e.getMessage)
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.flume
18+
19+
import scala.collection.JavaConversions._
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import com.google.common.base.Throwables
23+
24+
import org.apache.spark.Logging
25+
import org.apache.spark.streaming.flume.sink._
26+
27+
/**
28+
* This class implements the core functionality of [[FlumePollingReceiver]]. When started it
29+
* pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
30+
* run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
31+
*
32+
* @param receiver The receiver that owns this instance.
33+
*/
34+
35+
private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
36+
Logging {
37+
38+
def run(): Unit = {
39+
while (!receiver.isStopped()) {
40+
val connection = receiver.getConnections.poll()
41+
val client = connection.client
42+
var batchReceived = false
43+
var seq: CharSequence = null
44+
try {
45+
getBatch(client) match {
46+
case Some(eventBatch) =>
47+
batchReceived = true
48+
seq = eventBatch.getSequenceNumber
49+
val events = toSparkFlumeEvents(eventBatch.getEvents)
50+
if (store(events)) {
51+
sendAck(client, seq)
52+
} else {
53+
sendNack(batchReceived, client, seq)
54+
}
55+
case None =>
56+
}
57+
} catch {
58+
case e: Exception =>
59+
Throwables.getRootCause(e) match {
60+
// If the cause was an InterruptedException, then check if the receiver is stopped -
61+
// if yes, just break out of the loop. Else send a Nack and log a warning.
62+
// In the unlikely case, the cause was not an Exception,
63+
// then just throw it out and exit.
64+
case interrupted: InterruptedException =>
65+
if (!receiver.isStopped()) {
66+
logWarning("Interrupted while receiving data from Flume", interrupted)
67+
sendNack(batchReceived, client, seq)
68+
}
69+
case exception: Exception =>
70+
logWarning("Error while receiving data from Flume", exception)
71+
sendNack(batchReceived, client, seq)
72+
}
73+
} finally {
74+
receiver.getConnections.add(connection)
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Gets a batch of events from the specified client. This method does not handle any exceptions
81+
* which will be propogated to the caller.
82+
* @param client Client to get events from
83+
* @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
84+
*/
85+
private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
86+
val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
87+
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
88+
// No error, proceed with processing data
89+
logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
90+
s"number: ${eventBatch.getSequenceNumber}")
91+
Some(eventBatch)
92+
} else {
93+
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
94+
eventBatch.getErrorMsg)
95+
None
96+
}
97+
}
98+
99+
/**
100+
* Store the events in the buffer to Spark. This method will not propogate any exceptions,
101+
* but will propogate any other errors.
102+
* @param buffer The buffer to store
103+
* @return true if the data was stored without any exception being thrown, else false
104+
*/
105+
private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
106+
try {
107+
receiver.store(buffer)
108+
true
109+
} catch {
110+
case e: Exception =>
111+
logWarning("Error while attempting to store data received from Flume", e)
112+
false
113+
}
114+
}
115+
116+
/**
117+
* Send an ack to the client for the sequence number. This method does not handle any exceptions
118+
* which will be propagated to the caller.
119+
* @param client client to send the ack to
120+
* @param seq sequence number of the batch to be ack-ed.
121+
* @return
122+
*/
123+
private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
124+
logDebug("Sending ack for sequence number: " + seq)
125+
client.ack(seq)
126+
logDebug("Ack sent for sequence number: " + seq)
127+
}
128+
129+
/**
130+
* This method sends a Nack if a batch was received to the client with the given sequence
131+
* number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
132+
* to handle it.
133+
* @param batchReceived true if a batch was received. If this is false, no nack is sent
134+
* @param client The client to which the nack should be sent
135+
* @param seq The sequence number of the batch that is being nack-ed.
136+
*/
137+
private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
138+
seq: CharSequence): Unit = {
139+
if (batchReceived) {
140+
// Let Flume know that the events need to be pushed back into the channel.
141+
logDebug("Sending nack for sequence number: " + seq)
142+
client.nack(seq) // If the agent is down, even this could fail and throw
143+
logDebug("Nack sent for sequence number: " + seq)
144+
}
145+
}
146+
147+
/**
148+
* Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
149+
* @param events - Events to convert to SparkFlumeEvents
150+
* @return - The SparkFlumeEvent generated from SparkSinkEvent
151+
*/
152+
private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
153+
ArrayBuffer[SparkFlumeEvent] = {
154+
// Convert each Flume event to a serializable SparkFlumeEvent
155+
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
156+
var j = 0
157+
while (j < events.size()) {
158+
val event = events(j)
159+
val sparkFlumeEvent = new SparkFlumeEvent()
160+
sparkFlumeEvent.event.setBody(event.getBody)
161+
sparkFlumeEvent.event.setHeaders(event.getHeaders)
162+
buffer += sparkFlumeEvent
163+
j += 1
164+
}
165+
buffer
166+
}
167+
}

0 commit comments

Comments
 (0)