Skip to content

Commit 3c23c18

Browse files
SPARK-1729. New Spark-Flume integration.
Minor formatting changes.
1 parent 70bcc2a commit 3c23c18

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.flume.{Channel, Transaction, FlumeException, Context}
3535
import org.slf4j.LoggerFactory
3636

3737
import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol}
38+
3839
/**
3940
* A sink that uses Avro RPC to run a server that can be polled by Spark's
4041
* FlumePollingInputDStream. This sink has the following configuration parameters:

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ private[streaming] class FlumePollingReceiver(
105105
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
106106
try {
107107
// Convert each Flume event to a serializable SparkPollingEvent
108-
events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event)))
108+
events.foreach(event => {
109+
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
110+
})
109111
// Send an ack to Flume so that Flume discards the events from its channels.
110112
client.ack(seq)
111113
} catch {
@@ -153,7 +155,7 @@ private[streaming] class FlumePollingReceiver(
153155
* @param client The client that the callbacks are received on.
154156
*/
155157
private class FlumeConnection(val transceiver: NettyTransceiver,
156-
val client: SparkFlumeProtocol.Callback)
158+
val client: SparkFlumeProtocol.Callback)
157159

158160
private[streaming] object SparkFlumePollingEvent {
159161
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
@@ -162,13 +164,14 @@ private[streaming] object SparkFlumePollingEvent {
162164
event
163165
}
164166
}
167+
165168
/*
166169
* Unfortunately Avro does not allow including pre-compiled classes - so even though
167170
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
168171
* around that to make it externalizable.
169172
*/
170173
class SparkFlumePollingEvent() extends Externalizable with Logging {
171-
var event : SparkSinkEvent = new SparkSinkEvent()
174+
var event: SparkSinkEvent = new SparkSinkEvent()
172175

173176
/* De-serialize from bytes. */
174177
def readExternal(in: ObjectInput) {

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,19 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
114114
}
115115

116116
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
117-
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
117+
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
118118
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
119119
val executor = Executors.newCachedThreadPool()
120120
val executorCompletion = new ExecutorCompletionService[Void](executor)
121121
channels.map(channel => {
122122
executorCompletion.submit(new TxnSubmitter(channel, clock))
123123
})
124-
for(i <- 0 until channels.size) {
124+
for (i <- 0 until channels.size) {
125125
executorCompletion.take()
126126
}
127127
val startTime = System.currentTimeMillis()
128-
while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
128+
while (outputBuffer.size < 5 * channels.size &&
129+
System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
129130
logInfo("output.size = " + outputBuffer.size)
130131
Thread.sleep(100)
131132
}
@@ -164,7 +165,8 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
164165
val tx = channel.getTransaction
165166
tx.begin()
166167
for (j <- 0 until 5) {
167-
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"),
168+
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
169+
"utf-8"),
168170
Map[String, String]("test-" + t.toString -> "header")))
169171
t += 1
170172
}
@@ -176,4 +178,5 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
176178
null
177179
}
178180
}
181+
179182
}

0 commit comments

Comments
 (0)