Skip to content

Commit 70bcc2a

Browse files
SPARK-1729. New Flume-Spark integration.
Renamed the SparkPollingEvent to SparkFlumePollingEvent.
1 parent d6fa3aa commit 70bcc2a

File tree

3 files changed

+20
-20
lines changed

3 files changed

+20
-20
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ class FlumePollingInputDStream[T: ClassTag](
5353
val maxBatchSize: Int,
5454
val parallelism: Int,
5555
storageLevel: StorageLevel
56-
) extends ReceiverInputDStream[SparkPollingEvent](ssc_) {
56+
) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) {
5757
/**
5858
* Gets the receiver object that will be sent to the worker nodes
5959
* to receive data. This method needs to defined by any specific implementation
6060
* of a NetworkInputDStream.
6161
*/
62-
override def getReceiver(): Receiver[SparkPollingEvent] = {
62+
override def getReceiver(): Receiver[SparkFlumePollingEvent] = {
6363
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
6464
}
6565
}
@@ -69,7 +69,7 @@ private[streaming] class FlumePollingReceiver(
6969
maxBatchSize: Int,
7070
parallelism: Int,
7171
storageLevel: StorageLevel
72-
) extends Receiver[SparkPollingEvent](storageLevel) with Logging {
72+
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {
7373

7474
lazy val channelFactoryExecutor =
7575
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
@@ -105,7 +105,7 @@ private[streaming] class FlumePollingReceiver(
105105
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
106106
try {
107107
// Convert each Flume event to a serializable SparkPollingEvent
108-
events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event)))
108+
events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event)))
109109
// Send an ack to Flume so that Flume discards the events from its channels.
110110
client.ack(seq)
111111
} catch {
@@ -129,7 +129,7 @@ private[streaming] class FlumePollingReceiver(
129129
}
130130
}
131131

132-
override def store(dataItem: SparkPollingEvent) {
132+
override def store(dataItem: SparkFlumePollingEvent) {
133133
// Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
134134
// This takes a performance hit, since the parallelism is useful only for pulling data now.
135135
this.synchronized {
@@ -155,9 +155,9 @@ private[streaming] class FlumePollingReceiver(
155155
private class FlumeConnection(val transceiver: NettyTransceiver,
156156
val client: SparkFlumeProtocol.Callback)
157157

158-
private[streaming] object SparkPollingEvent {
159-
def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = {
160-
val event = new SparkPollingEvent()
158+
private[streaming] object SparkFlumePollingEvent {
159+
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
160+
val event = new SparkFlumePollingEvent()
161161
event.event = in
162162
event
163163
}
@@ -167,7 +167,7 @@ private[streaming] object SparkPollingEvent {
167167
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
168168
* around that to make it externalizable.
169169
*/
170-
class SparkPollingEvent() extends Externalizable with Logging {
170+
class SparkFlumePollingEvent() extends Externalizable with Logging {
171171
var event : SparkSinkEvent = new SparkSinkEvent()
172172

173173
/* De-serialize from bytes. */

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ object FlumeUtils {
8989
maxBatchSize: Int = 100,
9090
parallelism: Int = 5,
9191
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
92-
): ReceiverInputDStream[SparkPollingEvent] = {
93-
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
92+
): ReceiverInputDStream[SparkFlumePollingEvent] = {
93+
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
9494
parallelism, storageLevel)
9595
}
9696

@@ -111,8 +111,8 @@ object FlumeUtils {
111111
maxBatchSize: Int = 100,
112112
parallelism: Int = 5,
113113
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
114-
): JavaReceiverInputDStream[SparkPollingEvent] = {
115-
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
114+
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
115+
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
116116
parallelism, storageLevel)
117117
}
118118
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
4242
test("flume polling test") {
4343
// Set up the streaming context and input streams
4444
val ssc = new StreamingContext(conf, batchDuration)
45-
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
45+
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
4646
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
4747
StorageLevel.MEMORY_AND_DISK)
48-
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
49-
with SynchronizedBuffer[Seq[SparkPollingEvent]]
48+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
49+
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
5050
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
5151
outputStream.register()
5252

@@ -73,12 +73,12 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
7373
test("flume polling test multiple hosts") {
7474
// Set up the streaming context and input streams
7575
val ssc = new StreamingContext(conf, batchDuration)
76-
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
76+
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
7777
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
7878
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
7979
StorageLevel.MEMORY_AND_DISK)
80-
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
81-
with SynchronizedBuffer[Seq[SparkPollingEvent]]
80+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
81+
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
8282
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
8383
outputStream.register()
8484

@@ -114,7 +114,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
114114
}
115115

116116
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
117-
outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) {
117+
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
118118
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
119119
val executor = Executors.newCachedThreadPool()
120120
val executorCompletion = new ExecutorCompletionService[Void](executor)

0 commit comments

Comments
 (0)