Skip to content

Commit e59cc20

Browse files
Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method.
1 parent f3c99d1 commit e59cc20

File tree

5 files changed

+76
-67
lines changed

5 files changed

+76
-67
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ import org.apache.flume.source.avro.AvroFlumeEvent
3030
import org.apache.flume.source.avro.Status
3131
import org.apache.avro.ipc.specific.SpecificResponder
3232
import org.apache.avro.ipc.NettyServer
33-
34-
import org.apache.spark.util.Utils
35-
3633
import org.apache.spark.Logging
3734
import org.apache.spark.util.Utils
3835
import org.apache.spark.storage.StorageLevel
@@ -42,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver
4239

4340
import org.jboss.netty.channel.ChannelPipelineFactory
4441
import org.jboss.netty.channel.Channels
45-
import org.jboss.netty.channel.ChannelPipeline
46-
import org.jboss.netty.channel.ChannelFactory
4742
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
4843
import org.jboss.netty.handler.codec.compression._
49-
import org.jboss.netty.handler.execution.ExecutionHandler
5044

5145
private[streaming]
5246
class FlumeInputDStream[T: ClassTag](
@@ -73,14 +67,47 @@ class SparkFlumeEvent() extends Externalizable {
7367

7468
/* De-serialize from bytes. */
7569
def readExternal(in: ObjectInput) {
76-
val (headers, bodyBuff) = EventTransformer.readExternal(in)
70+
val bodyLength = in.readInt()
71+
val bodyBuff = new Array[Byte](bodyLength)
72+
in.readFully(bodyBuff)
73+
74+
val numHeaders = in.readInt()
75+
val headers = new java.util.HashMap[CharSequence, CharSequence]
76+
77+
for (i <- 0 until numHeaders) {
78+
val keyLength = in.readInt()
79+
val keyBuff = new Array[Byte](keyLength)
80+
in.readFully(keyBuff)
81+
val key : String = Utils.deserialize(keyBuff)
82+
83+
val valLength = in.readInt()
84+
val valBuff = new Array[Byte](valLength)
85+
in.readFully(valBuff)
86+
val value : String = Utils.deserialize(valBuff)
87+
88+
headers.put(key, value)
89+
}
90+
7791
event.setBody(ByteBuffer.wrap(bodyBuff))
7892
event.setHeaders(headers)
7993
}
8094

8195
/* Serialize to bytes. */
8296
def writeExternal(out: ObjectOutput) {
83-
EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array())
97+
val body = event.getBody.array()
98+
out.writeInt(body.length)
99+
out.write(body)
100+
101+
val numHeaders = event.getHeaders.size()
102+
out.writeInt(numHeaders)
103+
for ((k, v) <- event.getHeaders) {
104+
val keyBuff = Utils.serialize(k.toString)
105+
out.writeInt(keyBuff.length)
106+
out.write(keyBuff)
107+
val valBuff = Utils.serialize(v.toString)
108+
out.writeInt(valBuff.length)
109+
out.write(valBuff)
110+
}
84111
}
85112
}
86113

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717
package org.apache.spark.streaming.flume
1818

1919

20-
import java.io.{ObjectOutput, ObjectInput, Externalizable}
2120
import java.net.InetSocketAddress
22-
import java.nio.ByteBuffer
2321
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
2422

2523
import scala.collection.JavaConversions._
24+
import scala.collection.mutable.ArrayBuffer
2625
import scala.reflect.ClassTag
2726

2827
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -53,9 +52,9 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
5352
val maxBatchSize: Int,
5453
val parallelism: Int,
5554
storageLevel: StorageLevel
56-
) extends ReceiverInputDStream[SparkFlumePollingEvent](_ssc) {
55+
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
5756

58-
override def getReceiver(): Receiver[SparkFlumePollingEvent] = {
57+
override def getReceiver(): Receiver[SparkFlumeEvent] = {
5958
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
6059
}
6160
}
@@ -65,7 +64,7 @@ private[streaming] class FlumePollingReceiver(
6564
maxBatchSize: Int,
6665
parallelism: Int,
6766
storageLevel: StorageLevel
68-
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {
67+
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
6968

7069
lazy val channelFactoryExecutor =
7170
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
@@ -104,12 +103,13 @@ private[streaming] class FlumePollingReceiver(
104103
"Received batch of " + events.size() + " events with sequence number: " + seq)
105104
try {
106105
// Convert each Flume event to a serializable SparkPollingEvent
106+
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
107107
var j = 0
108108
while (j < events.size()) {
109-
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
110-
logDebug("Stored events with seq:" + seq)
109+
buffer += sparkSinkEventToSparkFlumeEvent(events(j))
111110
j += 1
112111
}
112+
store(buffer)
113113
logDebug("Sending ack for sequence number: " + seq)
114114
// Send an ack to Flume so that Flume discards the events from its channels.
115115
client.ack(seq)
@@ -152,6 +152,18 @@ private[streaming] class FlumePollingReceiver(
152152
})
153153
channelFactory.releaseExternalResources()
154154
}
155+
156+
/**
157+
* Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
158+
* @param event - Event to convert to SparkFlumeEvent
159+
* @return - The SparkSinkEvent generated from Spar
160+
*/
161+
private def sparkSinkEventToSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
162+
val sparkFlumeEvent = new SparkFlumeEvent()
163+
sparkFlumeEvent.event.setBody(event.getBody)
164+
sparkFlumeEvent.event.setHeaders(event.getHeaders)
165+
sparkFlumeEvent
166+
}
155167
}
156168

157169
/**
@@ -162,36 +174,5 @@ private[streaming] class FlumePollingReceiver(
162174
private class FlumeConnection(val transceiver: NettyTransceiver,
163175
val client: SparkFlumeProtocol.Callback)
164176

165-
/**
166-
* Companion object of [[SparkFlumePollingEvent]]
167-
*/
168-
private[streaming] object SparkFlumePollingEvent {
169-
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
170-
val event = new SparkFlumePollingEvent()
171-
event.event = in
172-
event
173-
}
174-
}
175-
176-
/*
177-
* Unfortunately Avro does not allow including pre-compiled classes - so even though
178-
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
179-
* around that to make it externalizable.
180-
*/
181-
class SparkFlumePollingEvent extends Externalizable with Logging {
182-
var event: SparkSinkEvent = new SparkSinkEvent()
183-
184-
/* De-serialize from bytes. */
185-
def readExternal(in: ObjectInput) {
186-
val (headers, bodyBuff) = EventTransformer.readExternal(in)
187-
event.setBody(ByteBuffer.wrap(bodyBuff))
188-
event.setHeaders(headers)
189-
}
190-
191-
/* Serialize to bytes. */
192-
def writeExternal(out: ObjectOutput) {
193-
EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array())
194-
}
195-
}
196177

197178

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ object FlumeUtils {
117117
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
118118
* This stream will poll the sink for data and will pull events as they are available.
119119
* This stream will use a batch size of 1000 events and run 5 threads to pull data.
120-
* @param host Address of the host on which the Spark Sink is running
120+
* @param hostname Address of the host on which the Spark Sink is running
121121
* @param port Port of the host at which the Spark Sink is listening
122122
* @param storageLevel Storage level to use for storing the received objects
123123
*/
@@ -127,7 +127,7 @@ object FlumeUtils {
127127
hostname: String,
128128
port: Int,
129129
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
130-
): ReceiverInputDStream[SparkFlumePollingEvent] = {
130+
): ReceiverInputDStream[SparkFlumeEvent] = {
131131
createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
132132
}
133133

@@ -143,7 +143,7 @@ object FlumeUtils {
143143
ssc: StreamingContext,
144144
addresses: Seq[InetSocketAddress],
145145
storageLevel: StorageLevel
146-
): ReceiverInputDStream[SparkFlumePollingEvent] = {
146+
): ReceiverInputDStream[SparkFlumeEvent] = {
147147
createPollingStream(ssc, addresses, storageLevel,
148148
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
149149
}
@@ -166,8 +166,8 @@ object FlumeUtils {
166166
storageLevel: StorageLevel,
167167
maxBatchSize: Int,
168168
parallelism: Int
169-
): ReceiverInputDStream[SparkFlumePollingEvent] = {
170-
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
169+
): ReceiverInputDStream[SparkFlumeEvent] = {
170+
new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
171171
parallelism, storageLevel)
172172
}
173173

@@ -183,7 +183,7 @@ object FlumeUtils {
183183
jssc: JavaStreamingContext,
184184
hostname: String,
185185
port: Int
186-
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
186+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
187187
createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
188188
}
189189

@@ -201,7 +201,7 @@ object FlumeUtils {
201201
hostname: String,
202202
port: Int,
203203
storageLevel: StorageLevel
204-
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
204+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
205205
createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
206206
}
207207

@@ -217,7 +217,7 @@ object FlumeUtils {
217217
jssc: JavaStreamingContext,
218218
addresses: Array[InetSocketAddress],
219219
storageLevel: StorageLevel
220-
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
220+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
221221
createPollingStream(jssc, addresses, storageLevel,
222222
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
223223
}
@@ -240,7 +240,7 @@ object FlumeUtils {
240240
storageLevel: StorageLevel,
241241
maxBatchSize: Int,
242242
parallelism: Int
243-
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
243+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
244244
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
245245
}
246246
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ public void testFlumeStream() {
3232
InetSocketAddress[] addresses = new InetSocketAddress[] {
3333
new InetSocketAddress("localhost", 12345)
3434
};
35-
JavaReceiverInputDStream<SparkFlumePollingEvent> test1 =
35+
JavaReceiverInputDStream<SparkFlumeEvent> test1 =
3636
FlumeUtils.createPollingStream(ssc, "localhost", 12345);
37-
JavaReceiverInputDStream<SparkFlumePollingEvent> test2 = FlumeUtils.createPollingStream(
37+
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
3838
ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
39-
JavaReceiverInputDStream<SparkFlumePollingEvent> test3 = FlumeUtils.createPollingStream(
39+
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
4040
ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
41-
JavaReceiverInputDStream<SparkFlumePollingEvent> test4 = FlumeUtils.createPollingStream(
41+
JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
4242
ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
4343
}
4444
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ import org.apache.spark.streaming.flume.sink._
4242
test("flume polling test") {
4343
// Set up the streaming context and input streams
4444
val ssc = new StreamingContext(conf, batchDuration)
45-
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
46-
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), StorageLevel.MEMORY_AND_DISK, 100, 1)
47-
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
48-
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
45+
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
46+
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
47+
StorageLevel.MEMORY_AND_DISK, 100, 1)
48+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
49+
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
4950
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
5051
outputStream.register()
5152

@@ -75,10 +76,10 @@ import org.apache.spark.streaming.flume.sink._
7576
// Set up the streaming context and input streams
7677
val ssc = new StreamingContext(conf, batchDuration)
7778
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
78-
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
79+
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
7980
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5)
80-
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
81-
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
81+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
82+
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
8283
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
8384
outputStream.register()
8485

@@ -115,7 +116,7 @@ import org.apache.spark.streaming.flume.sink._
115116
}
116117

117118
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
118-
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
119+
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
119120
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
120121
val executor = Executors.newCachedThreadPool()
121122
val executorCompletion = new ExecutorCompletionService[Void](executor)

0 commit comments

Comments
 (0)