Skip to content

Commit 393bd94

Browse files
SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections.
1 parent 120e2a1 commit 393bd94

File tree

3 files changed

+52
-49
lines changed

3 files changed

+52
-49
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume
2020
import java.io.{ObjectOutput, ObjectInput, Externalizable}
2121
import java.net.InetSocketAddress
2222
import java.nio.ByteBuffer
23-
import java.util.concurrent.{TimeUnit, Executors}
23+
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
2424

2525
import org.apache.spark.flume.sink.SparkSinkUtils
2626

@@ -33,7 +33,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor
3333
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
3434

3535
import org.apache.spark.Logging
36-
import org.apache.spark.flume.{EventBatch, SparkSinkEvent, SparkFlumeProtocol}
36+
import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol}
3737
import org.apache.spark.storage.StorageLevel
3838
import org.apache.spark.streaming.StreamingContext
3939
import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -83,60 +83,64 @@ private[streaming] class FlumePollingReceiver(
8383
lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
8484
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
8585

86-
private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later
86+
private val connections = new LinkedBlockingQueue[FlumeConnection]()
8787

8888
override def onStart(): Unit = {
8989
// Create the connections to each Flume agent.
90-
connections = addresses.map(host => {
90+
addresses.foreach(host => {
9191
val transceiver = new NettyTransceiver(host, channelFactory)
9292
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
93-
new FlumeConnection(transceiver, client)
94-
}).toArray
95-
93+
connections.add(new FlumeConnection(transceiver, client))
94+
})
9695
for (i <- 0 until parallelism) {
9796
logInfo("Starting Flume Polling Receiver worker threads starting..")
9897
// Threads that pull data from Flume.
9998
receiverExecutor.submit(new Runnable {
10099
override def run(): Unit = {
101-
var counter = i
102100
while (true) {
103-
counter = counter % (connections.length)
104-
val client = connections(counter).client
105-
counter += 1
106-
val eventBatch = client.getEventBatch(maxBatchSize)
107-
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
108-
// No error, proceed with processing data
109-
val seq = eventBatch.getSequenceNumber
110-
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
111-
logDebug(
112-
"Received batch of " + events.size() + " events with sequence number: " + seq)
113-
try {
114-
// Convert each Flume event to a serializable SparkPollingEvent
115-
var j = 0
116-
while (j < events.size()) {
117-
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
118-
logDebug("Stored events with seq:" + seq)
119-
j += 1
120-
}
121-
logInfo("Sending ack for: " +seq)
122-
// Send an ack to Flume so that Flume discards the events from its channels.
123-
client.ack(seq)
124-
logDebug("Ack sent for sequence number: " + seq)
125-
} catch {
126-
case e: Exception =>
127-
try {
128-
// Let Flume know that the events need to be pushed back into the channel.
129-
client.nack(seq) // If the agent is down, even this could fail and throw
130-
} catch {
131-
case e: Exception => logError(
132-
"Sending Nack also failed. A Flume agent is down.")
101+
val connection = connections.poll()
102+
val client = connection.client
103+
try {
104+
val eventBatch = client.getEventBatch(maxBatchSize)
105+
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
106+
// No error, proceed with processing data
107+
val seq = eventBatch.getSequenceNumber
108+
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
109+
logDebug(
110+
"Received batch of " + events.size() + " events with sequence number: " + seq)
111+
try {
112+
// Convert each Flume event to a serializable SparkPollingEvent
113+
var j = 0
114+
while (j < events.size()) {
115+
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
116+
logDebug("Stored events with seq:" + seq)
117+
j += 1
133118
}
134-
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
135-
logWarning("Error while attempting to store events", e)
119+
logDebug("Sending ack for: " +seq)
120+
// Send an ack to Flume so that Flume discards the events from its channels.
121+
client.ack(seq)
122+
logDebug("Ack sent for sequence number: " + seq)
123+
} catch {
124+
case e: Exception =>
125+
try {
126+
// Let Flume know that the events need to be pushed back into the channel.
127+
client.nack(seq) // If the agent is down, even this could fail and throw
128+
} catch {
129+
case e: Exception => logError(
130+
"Sending Nack also failed. A Flume agent is down.")
131+
}
132+
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
133+
logWarning("Error while attempting to store events", e)
134+
}
135+
} else {
136+
logWarning("Did not receive events from Flume agent due to error on the Flume " +
137+
"agent: " + eventBatch.getErrorMsg)
136138
}
137-
} else {
138-
logWarning("Did not receive events from Flume agent due to error on the Flume " +
139-
"agent: " + eventBatch.getErrorMsg)
139+
} catch {
140+
case e: Exception =>
141+
logWarning("Error while reading data from Flume", e)
142+
} finally {
143+
connections.add(connection)
140144
}
141145
}
142146
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
6666
sink.start()
6767
ssc.start()
6868
writeAndVerify(Seq(channel), ssc, outputBuffer)
69-
assertQueuesAreEmpty(channel)
69+
assertChannelIsEmpty(channel)
7070
sink.stop()
7171
channel.stop()
7272
}
@@ -76,7 +76,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
7676
val ssc = new StreamingContext(conf, batchDuration)
7777
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
7878
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
79-
new InetSocketAddress("localhost", testPort + 1)), 100, 2,
79+
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
8080
StorageLevel.MEMORY_AND_DISK)
8181
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
8282
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
@@ -109,8 +109,8 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
109109
sink2.start()
110110
ssc.start()
111111
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
112-
assertQueuesAreEmpty(channel)
113-
assertQueuesAreEmpty(channel2)
112+
assertChannelIsEmpty(channel)
113+
assertChannelIsEmpty(channel2)
114114
sink.stop()
115115
channel.stop()
116116
}
@@ -160,7 +160,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
160160
assert(counter === 25 * channels.size)
161161
}
162162

163-
def assertQueuesAreEmpty(channel: MemoryChannel) = {
163+
def assertChannelIsEmpty(channel: MemoryChannel) = {
164164
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
165165
queueRemaining.setAccessible(true)
166166
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")

project/SparkBuild.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ object SparkBuild extends PomBuild {
122122
retrieveManaged := true,
123123
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
124124
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
125-
"spark-streaming-flume-sink",
126125
"spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
127126
"spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
128127
"spark-core").map(versionArtifact(_).get intransitive())

0 commit comments

Comments
 (0)