Skip to content

Commit 9741683

Browse files
SPARK-1729. Fixes based on review.
1 parent c604a3c commit 9741683

File tree

3 files changed

+47
-85
lines changed

3 files changed

+47
-85
lines changed

external/flume-sink/src/main/avro/sparkflume.avdl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protocol SparkFlumeProtocol {
2828

2929
record EventBatch {
3030
string sequenceNumber;
31-
array<SparkSinkEvent> eventBatch;
31+
array<SparkSinkEvent> events;
3232
}
3333

3434
EventBatch getEventBatch (int n);

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala

Lines changed: 35 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ import org.apache.avro.ipc.NettyServer
3434
import org.apache.avro.ipc.specific.SpecificResponder
3535
import java.net.InetSocketAddress
3636

37-
class SparkSink() extends AbstractSink with Configurable {
37+
class SparkSink extends AbstractSink with Configurable {
3838
private val LOG = LoggerFactory.getLogger(this.getClass)
39-
private val lock = new ReentrantLock()
40-
private val blockingCondition = lock.newCondition()
4139

4240
// This sink will not persist sequence numbers and reuses them if it gets restarted.
4341
// So it is possible to commit a transaction which may have been meant for the sink before the
@@ -58,19 +56,20 @@ class SparkSink() extends AbstractSink with Configurable {
5856

5957
private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
6058

61-
private var processorFactory: Option[SparkHandlerFactory] = None
59+
private var processorManager: Option[TransactionProcessorManager] = None
6260
private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
6361
private var port: Int = 0
6462
private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS
6563
private var serverOpt: Option[NettyServer] = None
66-
private var running = false
64+
65+
private val blockingLatch = new CountDownLatch(1)
6766

6867
override def start() {
6968
transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors,
7069
new ThreadFactoryBuilder().setDaemon(true)
7170
.setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build()))
7271

73-
processorFactory = Option(new SparkHandlerFactory(numProcessors))
72+
processorManager = Option(new TransactionProcessorManager(numProcessors))
7473

7574
val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler())
7675

@@ -80,12 +79,6 @@ class SparkSink() extends AbstractSink with Configurable {
8079
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
8180

8281
serverOpt.map(server => server.start())
83-
lock.lock()
84-
try {
85-
running = true
86-
} finally {
87-
lock.unlock()
88-
}
8982
super.start()
9083
}
9184

@@ -95,65 +88,48 @@ class SparkSink() extends AbstractSink with Configurable {
9588
server.close()
9689
server.join()
9790
})
98-
lock.lock()
99-
try {
100-
running = false
101-
blockingCondition.signalAll()
102-
} finally {
103-
lock.unlock()
104-
}
91+
blockingLatch.countDown()
92+
super.stop()
10593
}
10694

10795
override def configure(ctx: Context) {
10896
import SparkSinkConfig._
10997
hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
110-
val portOpt = Option(ctx.getInteger(CONF_PORT))
111-
if(portOpt.isDefined) {
112-
port = portOpt.get
113-
} else {
114-
throw new ConfigurationException("The Port to bind must be specified")
115-
}
98+
port = Option(ctx.getInteger(CONF_PORT)).
99+
getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
116100
numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT)
117101
transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
118102
maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS)
119103
}
120104

121105
override def process(): Status = {
122106
// This method is called in a loop by the Flume framework - block it until the sink is
123-
// stopped to save CPU resources
124-
lock.lock()
125-
try {
126-
while(running) {
127-
blockingCondition.await()
128-
}
129-
} finally {
130-
lock.unlock()
131-
}
107+
// stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
108+
// being shut down.
109+
blockingLatch.await()
132110
Status.BACKOFF
133111
}
134112

135113

136114
// Object representing an empty batch returned by the txn processor due to some error.
137115
case object ErrorEventBatch extends EventBatch
138116

139-
private class AvroCallbackHandler() extends SparkFlumeProtocol {
117+
private class AvroCallbackHandler extends SparkFlumeProtocol {
140118

141119
override def getEventBatch(n: Int): EventBatch = {
142-
val processor = processorFactory.get.checkOut(n)
120+
val processor = processorManager.get.checkOut(n)
143121
transactionExecutorOpt.map(executor => executor.submit(processor))
144122
// Wait until a batch is available - can be null if some error was thrown
145-
val eventBatch = processor.eventQueue.take()
146-
eventBatch match {
123+
processor.eventQueue.take() match {
147124
case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" +
148125
" retrieved from channel.")
149-
case events => {
150-
processorMap.put(events.getSequenceNumber, processor)
126+
case eventBatch: EventBatch =>
127+
processorMap.put(eventBatch.getSequenceNumber, processor)
151128
if (LOG.isDebugEnabled) {
152-
LOG.debug("Sent " + events.getEventBatch.size() +
153-
" events with sequence number: " + events.getSequenceNumber)
129+
LOG.debug("Sent " + eventBatch.getEvents.size() +
130+
" events with sequence number: " + eventBatch.getSequenceNumber)
154131
}
155-
events
156-
}
132+
eventBatch
157133
}
158134
}
159135

@@ -214,41 +190,23 @@ class SparkSink() extends AbstractSink with Configurable {
214190
tx.begin()
215191
try {
216192
eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
217-
val events = eventBatch.getEventBatch
193+
val events = eventBatch.getEvents
218194
events.clear()
219195
val loop = new Breaks
220196
var gotEventsInThisTxn = false
221197
loop.breakable {
222-
var i = 0
223-
// Using for here causes the maxBatchSize change to be ineffective as the Range gets
224-
// pregenerated
225-
while (i < maxBatchSize) {
226-
i += 1
227-
val eventOpt = Option(getChannel.take())
228-
eventOpt.map(event => {
229-
events.add(new SparkSinkEvent(toCharSequenceMap(event
230-
.getHeaders),
231-
ByteBuffer.wrap(event.getBody)))
232-
gotEventsInThisTxn = true
233-
})
234-
if (eventOpt.isEmpty) {
235-
if (!gotEventsInThisTxn) {
236-
// To avoid sending empty batches, we wait till events are available backing off
237-
// between attempts to get events. Each attempt to get an event though causes one
238-
// iteration to be lost. To ensure that we still send back maxBatchSize number of
239-
// events, we cheat and increase the maxBatchSize by 1 to account for the lost
240-
// iteration. Even throwing an exception is expensive as Avro will serialize it
241-
// and send it over the wire, which is useless. Before incrementing though,
242-
// ensure that we are not anywhere near INT_MAX.
243-
if (maxBatchSize >= Int.MaxValue / 2) {
244-
// Random sanity check
245-
throw new RuntimeException("Safety exception - polled too many times, no events!")
198+
while (events.size() < maxBatchSize) {
199+
Option(getChannel.take()) match {
200+
case Some(event) =>
201+
events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
202+
ByteBuffer.wrap(event.getBody)))
203+
gotEventsInThisTxn = true
204+
case None =>
205+
if (!gotEventsInThisTxn) {
206+
Thread.sleep(500)
207+
} else {
208+
loop.break()
246209
}
247-
maxBatchSize += 1
248-
Thread.sleep(500)
249-
} else {
250-
loop.break()
251-
}
252210
}
253211
}
254212
}
@@ -284,7 +242,7 @@ class SparkSink() extends AbstractSink with Configurable {
284242
} finally {
285243
resultQueueUpdateLock.unlock()
286244
}
287-
eventBatch.getEventBatch.clear()
245+
eventBatch.getEvents.clear()
288246
// If the batch failed on spark side, throw a FlumeException
289247
maybeResult.map(success =>
290248
if (!success) {
@@ -315,7 +273,7 @@ class SparkSink() extends AbstractSink with Configurable {
315273
// remove the event from the map and then clear the value
316274
resultQueue.clear()
317275
processorMap.remove(eventBatch.getSequenceNumber)
318-
processorFactory.get.checkIn(this)
276+
processorManager.get.checkIn(this)
319277
tx.close()
320278
}
321279
}
@@ -328,7 +286,7 @@ class SparkSink() extends AbstractSink with Configurable {
328286
}
329287
}
330288

331-
private class SparkHandlerFactory(val maxInstances: Int) {
289+
private class TransactionProcessorManager(val maxInstances: Int) {
332290
val queue = new scala.collection.mutable.Queue[TransactionProcessor]
333291
val queueModificationLock = new ReentrantLock()
334292
var currentSize = 0

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ private[streaming] class FlumePollingReceiver(
7777
private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later
7878

7979
override def onStart(): Unit = {
80-
val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]()
81-
addresses.map(host => {
80+
// Create the connections to each Flume agent.
81+
connections = addresses.map(host => {
8282
val transceiver = new NettyTransceiver(host, channelFactory)
8383
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
84-
connectionBuilder += new FlumeConnection(transceiver, client)
85-
})
86-
connections = connectionBuilder.result()
84+
new FlumeConnection(transceiver, client)
85+
}).toArray
86+
8787
val dataReceiver = new Runnable {
8888
override def run(): Unit = {
8989
var counter = 0
@@ -93,14 +93,18 @@ private[streaming] class FlumePollingReceiver(
9393
counter += 1
9494
val batch = client.getEventBatch(maxBatchSize)
9595
val seq = batch.getSequenceNumber
96-
val events: java.util.List[SparkSinkEvent] = batch.getEventBatch
96+
val events: java.util.List[SparkSinkEvent] = batch.getEvents
9797
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
9898
try {
9999
events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event)))
100100
client.ack(seq)
101101
} catch {
102102
case e: Throwable =>
103-
client.nack(seq)
103+
try {
104+
client.nack(seq) // If the agent is down, even this could fail and throw
105+
} catch {
106+
case e: Throwable => logError("Sending Nack also failed. A Flume agent is down.")
107+
}
104108
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
105109
logWarning("Error while attempting to store events", e)
106110
}

0 commit comments

Comments
 (0)