Skip to content

Commit 0f10788

Browse files
SPARK-1729. Make Flume pull data from source, rather than the current push model
Added support for polling several Flume agents from a single receiver.
1 parent 87775aa commit 0f10788

File tree

3 files changed

+120
-44
lines changed

3 files changed

+120
-44
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
3232
import java.io.{ObjectOutput, ObjectInput, Externalizable}
3333
import java.nio.ByteBuffer
3434
import scala.collection.JavaConversions._
35+
import scala.collection.mutable
3536

3637
class FlumePollingInputDStream[T: ClassTag](
3738
@transient ssc_ : StreamingContext,
38-
val host: String,
39-
val port: Int,
39+
val addresses: Seq[InetSocketAddress],
4040
val maxBatchSize: Int,
4141
val parallelism: Int,
4242
storageLevel: StorageLevel
@@ -47,30 +47,44 @@ class FlumePollingInputDStream[T: ClassTag](
4747
* of a NetworkInputDStream.
4848
*/
4949
override def getReceiver(): Receiver[SparkPollingEvent] = {
50-
new FlumePollingReceiver(host, port, maxBatchSize, parallelism, storageLevel)
50+
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
5151
}
5252
}
5353

5454
private[streaming] class FlumePollingReceiver(
55-
host: String,
56-
port: Int,
55+
addresses: Seq[InetSocketAddress],
5756
maxBatchSize: Int,
5857
parallelism: Int,
5958
storageLevel: StorageLevel
6059
) extends Receiver[SparkPollingEvent](storageLevel) with Logging {
6160

61+
lazy val channelFactoryExecutor =
62+
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
63+
setNameFormat("Flume Receiver Channel Thread - %d").build())
64+
6265
lazy val channelFactory =
63-
new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),
64-
Executors.newSingleThreadExecutor())
65-
lazy val transceiver = new NettyTransceiver(new InetSocketAddress(host, port), channelFactory)
66-
lazy val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
66+
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
67+
6768
lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
6869
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
6970

71+
private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later
72+
7073
override def onStart(): Unit = {
74+
val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]()
75+
addresses.map(host => {
76+
val transceiver = new NettyTransceiver(host, channelFactory)
77+
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
78+
connectionBuilder += new FlumeConnection(transceiver, client)
79+
})
80+
connections = connectionBuilder.result()
7181
val dataReceiver = new Runnable {
7282
override def run(): Unit = {
83+
var counter = 0
7384
while (true) {
85+
counter = counter % connections.size
86+
val client = connections(counter).client
87+
counter += 1
7488
val batch = client.getEventBatch(maxBatchSize)
7589
val seq = batch.getSequenceNumber
7690
val events: java.util.List[SparkSinkEvent] = batch.getEventBatch
@@ -104,11 +118,16 @@ private[streaming] class FlumePollingReceiver(
104118
override def onStop(): Unit = {
105119
logInfo("Shutting down Flume Polling Receiver")
106120
receiverExecutor.shutdownNow()
107-
transceiver.close()
121+
connections.map(connection => {
122+
connection.tranceiver.close()
123+
})
108124
channelFactory.releaseExternalResources()
109125
}
110126
}
111127

128+
private class FlumeConnection(val tranceiver: NettyTransceiver,
129+
val client: SparkFlumeProtocol.Callback)
130+
112131
private[streaming] object SparkPollingEvent {
113132
def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = {
114133
val event = new SparkPollingEvent()

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.storage.StorageLevel
2121
import org.apache.spark.streaming.StreamingContext
2222
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
2323
import org.apache.spark.streaming.dstream.ReceiverInputDStream
24+
import java.net.InetSocketAddress
2425

2526
object FlumeUtils {
2627
/**
@@ -72,8 +73,7 @@ object FlumeUtils {
7273
/**
7374
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
7475
* This stream will poll the sink for data and will pull events as they are available.
75-
* @param host The host on which the Flume agent is running
76-
* @param port The port the Spark Sink is accepting connections on
76+
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
7777
* @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
7878
* single RPC call
7979
* @param parallelism Number of concurrent requests this stream should send to the sink. Note
@@ -83,21 +83,19 @@ object FlumeUtils {
8383
*/
8484
def createPollingStream (
8585
ssc: StreamingContext,
86-
host: String,
87-
port: Int,
86+
addresses: Seq[InetSocketAddress],
8887
maxBatchSize: Int = 100,
8988
parallelism: Int = 5,
9089
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
9190
): ReceiverInputDStream[SparkPollingEvent] = {
92-
new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
91+
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
9392
parallelism, storageLevel)
9493
}
9594

9695
/**
9796
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
9897
* This stream will poll the sink for data and will pull events as they are available.
99-
* @param host The host on which the Flume agent is running
100-
* @param port The port the Spark Sink is accepting connections on
98+
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
10199
* @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
102100
* single RPC call
103101
* @param parallelism Number of concurrent requests this stream should send to the sink. Note
@@ -107,13 +105,12 @@ object FlumeUtils {
107105
*/
108106
def createJavaPollingStream (
109107
ssc: StreamingContext,
110-
host: String,
111-
port: Int,
108+
addresses: Seq[InetSocketAddress],
112109
maxBatchSize: Int = 100,
113110
parallelism: Int = 5,
114111
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
115112
): JavaReceiverInputDStream[SparkPollingEvent] = {
116-
new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
113+
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
117114
parallelism, storageLevel)
118115
}
119116
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 84 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink}
2929
import scala.collection.JavaConversions._
3030
import org.apache.flume.event.EventBuilder
3131
import org.apache.spark.streaming.dstream.ReceiverInputDStream
32+
import java.net.InetSocketAddress
33+
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
3234

3335
class FlumePollingReceiverSuite extends TestSuiteBase {
3436

@@ -38,7 +40,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
3840
// Set up the streaming context and input streams
3941
val ssc = new StreamingContext(conf, batchDuration)
4042
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
41-
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1,
43+
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
4244
StorageLevel.MEMORY_AND_DISK)
4345
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
4446
with SynchronizedBuffer[Seq[SparkPollingEvent]]
@@ -60,42 +62,81 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
6062
sink.setChannel(channel)
6163
sink.start()
6264
ssc.start()
65+
writeAndVerify(Seq(channel), ssc, outputBuffer)
66+
sink.stop()
67+
channel.stop()
68+
}
6369

64-
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
65-
var t = 0
66-
for (i <- 0 until 5) {
67-
val tx = channel.getTransaction
68-
tx.begin()
69-
for (j <- 0 until 5) {
70-
channel.put(EventBuilder.withBody(
71-
String.valueOf(t).getBytes("utf-8"),
72-
Map[String, String]("test-" + t.toString -> "header")))
73-
t += 1
74-
}
70+
test("flume polling test multiple hosts") {
71+
// Set up the streaming context and input streams
72+
val ssc = new StreamingContext(conf, batchDuration)
73+
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
74+
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
75+
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
76+
StorageLevel.MEMORY_AND_DISK)
77+
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
78+
with SynchronizedBuffer[Seq[SparkPollingEvent]]
79+
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
80+
outputStream.register()
81+
82+
// Start the channel and sink.
83+
val context = new Context()
84+
context.put("capacity", "5000")
85+
context.put("transactionCapacity", "1000")
86+
context.put("keep-alive", "0")
87+
val channel = new MemoryChannel()
88+
Configurables.configure(channel, context)
89+
90+
val channel2 = new MemoryChannel()
91+
Configurables.configure(channel2, context)
7592

76-
tx.commit()
77-
tx.close()
78-
Thread.sleep(500) // Allow some time for the events to reach
79-
clock.addToTime(batchDuration.milliseconds)
93+
val sink = new SparkSink()
94+
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
95+
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
96+
Configurables.configure(sink, context)
97+
sink.setChannel(channel)
98+
sink.start()
99+
100+
val sink2 = new SparkSink()
101+
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
102+
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
103+
Configurables.configure(sink2, context)
104+
sink2.setChannel(channel2)
105+
sink2.start()
106+
ssc.start()
107+
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
108+
sink.stop()
109+
channel.stop()
110+
111+
}
112+
113+
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
114+
outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) {
115+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
116+
val executor = Executors.newCachedThreadPool()
117+
val executorCompletion = new ExecutorCompletionService[Void](executor)
118+
channels.map(channel => {
119+
executorCompletion.submit(new TxnSubmitter(channel, clock))
120+
})
121+
for(i <- 0 until channels.size) {
122+
executorCompletion.take()
80123
}
81124
val startTime = System.currentTimeMillis()
82-
while (outputBuffer.size < 5 && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
125+
while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
83126
logInfo("output.size = " + outputBuffer.size)
84127
Thread.sleep(100)
85128
}
86129
val timeTaken = System.currentTimeMillis() - startTime
87130
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
88131
logInfo("Stopping context")
89132
ssc.stop()
90-
sink.stop()
91-
channel.stop()
92133

93134
val flattenedBuffer = outputBuffer.flatten
94-
assert(flattenedBuffer.size === 25)
135+
assert(flattenedBuffer.size === 25 * channels.size)
95136
var counter = 0
96-
for (i <- 0 until 25) {
97-
val eventToVerify = EventBuilder.withBody(
98-
String.valueOf(i).getBytes("utf-8"),
137+
for (k <- 0 until channels.size; i <- 0 until 25) {
138+
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
139+
String.valueOf(i)).getBytes("utf-8"),
99140
Map[String, String]("test-" + i.toString -> "header"))
100141
var found = false
101142
var j = 0
@@ -110,7 +151,26 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
110151
j += 1
111152
}
112153
}
113-
assert (counter === 25)
154+
assert(counter === 25 * channels.size)
114155
}
115156

157+
private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
158+
override def call(): Void = {
159+
var t = 0
160+
for (i <- 0 until 5) {
161+
val tx = channel.getTransaction
162+
tx.begin()
163+
for (j <- 0 until 5) {
164+
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"),
165+
Map[String, String]("test-" + t.toString -> "header")))
166+
t += 1
167+
}
168+
tx.commit()
169+
tx.close()
170+
Thread.sleep(500) // Allow some time for the events to reach
171+
clock.addToTime(batchDuration.milliseconds)
172+
}
173+
null
174+
}
175+
}
116176
}

0 commit comments

Comments
 (0)