Skip to content

Commit d248d22

Browse files
author
harishreedharan
committed
Merge pull request #1 from tdas/flume-polling
Bunch of changes to the new Flume stuff
2 parents 1edc806 + 10b6214 commit d248d22

File tree

11 files changed

+226
-136
lines changed

11 files changed

+226
-136
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.streaming._
23+
import org.apache.spark.streaming.flume._
24+
import org.apache.spark.util.IntParam
25+
import java.net.InetSocketAddress
26+
27+
/**
28+
* Produces a count of events received from Flume.
29+
*
30+
* This should be used in conjunction with the Spark Sink running in a Flume agent. See
31+
* the Spark Streaming programming guide for more details.
32+
*
33+
* Usage: FlumePollingEventCount <host> <port>
34+
* `host` is the host on which the Spark Sink is running.
35+
* `port` is the port at which the Spark Sink is listening.
36+
*
37+
* To run this example:
38+
* `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
39+
*/
40+
object FlumePollingEventCount {
41+
def main(args: Array[String]) {
42+
if (args.length < 2) {
43+
System.err.println(
44+
"Usage: FlumePollingEventCount <host> <port>")
45+
System.exit(1)
46+
}
47+
48+
StreamingExamples.setStreamingLogLevels()
49+
50+
val Array(host, IntParam(port)) = args
51+
52+
val batchInterval = Milliseconds(2000)
53+
54+
// Create the context and set the batch size
55+
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
56+
val ssc = new StreamingContext(sparkConf, batchInterval)
57+
58+
// Create a flume stream that polls the Spark Sink running in a Flume agent
59+
val stream = FlumeUtils.createPollingStream(ssc, host, port)
60+
61+
// Print out the count of events received from this server in each batch
62+
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
63+
64+
ssc.start()
65+
ssc.awaitTermination()
66+
}
67+
}

external/flume-sink/src/main/avro/sparkflume.avdl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
@namespace("org.apache.spark.flume")
20+
@namespace("org.apache.spark.streaming.flume.sink")
2121

2222
protocol SparkFlumeProtocol {
2323

@@ -37,5 +37,4 @@ protocol SparkFlumeProtocol {
3737
void ack (string sequenceNumber);
3838

3939
void nack (string sequenceNumber);
40-
4140
}

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala renamed to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.flume.sink
17+
package org.apache.spark.streaming.flume.sink
18+
1819
import org.apache.log4j.{LogManager, PropertyConfigurator}
1920
import org.slf4j.{Logger, LoggerFactory}
2021
import org.slf4j.impl.StaticLoggerBinder
2122

22-
trait Logging {
23+
/**
24+
* Copy of the org.apache.spark.Logging for being used in the Spark Sink.
25+
* The org.apache.spark.Logging is not used so that all of Spark is not brought
26+
* in as a dependency.
27+
*/
28+
private[sink] trait Logging {
2329
// Make the log field transient so that objects with Logging can
2430
// be serialized and used on another machine
2531
@transient private var log_ : Logger = null
@@ -95,20 +101,6 @@ trait Logging {
95101
}
96102

97103
private def initializeLogging() {
98-
// If Log4j is being used, but is not initialized, load a default properties file
99-
val binder = StaticLoggerBinder.getSingleton
100-
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
101-
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
102-
if (!log4jInitialized && usingLog4j) {
103-
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
104-
Option(getClass.getClassLoader.getResource(defaultLogProps)) match {
105-
case Some(url) =>
106-
PropertyConfigurator.configure(url)
107-
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
108-
case None =>
109-
System.err.println(s"Spark was unable to load $defaultLogProps")
110-
}
111-
}
112104
Logging.initialized = true
113105

114106
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
@@ -117,7 +109,7 @@ trait Logging {
117109
}
118110
}
119111

120-
private object Logging {
112+
private[sink] object Logging {
121113
@volatile private var initialized = false
122114
val initLock = new Object()
123115
try {

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala renamed to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,14 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.flume.sink
17+
package org.apache.spark.streaming.flume.sink
1818

19-
import java.util.concurrent.atomic.AtomicLong
2019
import java.util.concurrent.{ConcurrentHashMap, Executors}
20+
import java.util.concurrent.atomic.AtomicLong
2121

22-
import com.google.common.util.concurrent.ThreadFactoryBuilder
23-
24-
import org.apache.commons.lang.RandomStringUtils
2522
import org.apache.flume.Channel
26-
import org.apache.spark.flume.{EventBatch, SparkFlumeProtocol}
27-
import org.slf4j.LoggerFactory
23+
import org.apache.commons.lang.RandomStringUtils
24+
import com.google.common.util.concurrent.ThreadFactoryBuilder
2825

2926
/**
3027
* Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
@@ -34,7 +31,7 @@ import org.slf4j.LoggerFactory
3431
* @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
3532
* is rolled back.
3633
*/
37-
private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
34+
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
3835
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
3936
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
4037
new ThreadFactoryBuilder().setDaemon(true)
@@ -109,7 +106,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
109106
* @return The transaction processor for the corresponding batch. Note that this instance is no
110107
* longer tracked and the caller is responsible for that txn processor.
111108
*/
112-
private[flume] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
109+
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
113110
processorMap.remove(sequenceNumber.toString) // The toString is required!
114111
}
115112

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala renamed to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,17 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.flume.sink
17+
package org.apache.spark.streaming.flume.sink
1818

1919
import java.net.InetSocketAddress
2020
import java.util.concurrent._
2121

2222
import org.apache.avro.ipc.NettyServer
2323
import org.apache.avro.ipc.specific.SpecificResponder
24+
import org.apache.flume.Context
2425
import org.apache.flume.Sink.Status
2526
import org.apache.flume.conf.{Configurable, ConfigurationException}
2627
import org.apache.flume.sink.AbstractSink
27-
import org.apache.flume.Context
28-
import org.slf4j.LoggerFactory
29-
30-
import org.apache.spark.flume.SparkFlumeProtocol
3128

3229
/**
3330
* A sink that uses Avro RPC to run a server that can be polled by Spark's
@@ -48,6 +45,7 @@ import org.apache.spark.flume.SparkFlumeProtocol
4845
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
4946
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
5047
// at which point the transaction is committed or rolled back.
48+
private[flume]
5149
class SparkSink extends AbstractSink with Logging with Configurable {
5250

5351
// Size of the pool to use for holding transaction processors.
@@ -130,6 +128,7 @@ class SparkSink extends AbstractSink with Logging with Configurable {
130128
/**
131129
* Configuration parameters and their defaults.
132130
*/
131+
private[flume]
133132
object SparkSinkConfig {
134133
val THREADS = "threads"
135134
val DEFAULT_THREADS = 10

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala renamed to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.flume.sink
17+
package org.apache.spark.streaming.flume.sink
1818

19-
import org.apache.spark.flume.EventBatch
20-
21-
object SparkSinkUtils {
19+
private[flume] object SparkSinkUtils {
2220
/**
2321
* This method determines if this batch represents an error or not.
2422
* @param batch - The batch to check

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala renamed to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,15 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.flume.sink
17+
package org.apache.spark.streaming.flume.sink
1818

1919
import java.nio.ByteBuffer
2020
import java.util
21-
import java.util.concurrent.{TimeUnit, CountDownLatch, Callable}
21+
import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
2222

2323
import scala.util.control.Breaks
2424

2525
import org.apache.flume.{Transaction, Channel}
26-
import org.apache.spark.flume.{SparkSinkEvent, EventBatch}
27-
import org.slf4j.LoggerFactory
28-
2926

3027
// Flume forces transactions to be thread-local (horrible, I know!)
3128
// So the sink basically spawns a new thread to pull the events out within a transaction.

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import java.net.InetSocketAddress
2222
import java.nio.ByteBuffer
2323
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
2424

25-
import org.apache.spark.flume.sink.SparkSinkUtils
26-
2725
import scala.collection.JavaConversions._
2826
import scala.reflect.ClassTag
2927

@@ -33,45 +31,44 @@ import org.apache.avro.ipc.specific.SpecificRequestor
3331
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
3432

3533
import org.apache.spark.Logging
36-
import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol}
3734
import org.apache.spark.storage.StorageLevel
3835
import org.apache.spark.streaming.StreamingContext
3936
import org.apache.spark.streaming.dstream.ReceiverInputDStream
4037
import org.apache.spark.streaming.receiver.Receiver
38+
import org.apache.spark.streaming.flume.sink._
39+
4140

4241
/**
4342
* A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
4443
* [[org.apache.spark.flume.sink.SparkSink]]s.
45-
* @param ssc_ Streaming context that will execute this input stream
44+
* @param _ssc Streaming context that will execute this input stream
4645
* @param addresses List of addresses at which SparkSinks are listening
4746
* @param maxBatchSize Maximum size of a batch
4847
* @param parallelism Number of parallel connections to open
4948
* @param storageLevel The storage level to use.
5049
* @tparam T Class type of the object of this stream
5150
*/
51+
private[streaming]
5252
class FlumePollingInputDStream[T: ClassTag](
53-
@transient ssc_ : StreamingContext,
54-
val addresses: Seq[InetSocketAddress],
55-
val maxBatchSize: Int,
56-
val parallelism: Int,
57-
storageLevel: StorageLevel
58-
) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) {
59-
/**
60-
* Gets the receiver object that will be sent to the worker nodes
61-
* to receive data. This method needs to defined by any specific implementation
62-
* of a NetworkInputDStream.
63-
*/
53+
@transient _ssc: StreamingContext,
54+
val addresses: Seq[InetSocketAddress],
55+
val maxBatchSize: Int,
56+
val parallelism: Int,
57+
storageLevel: StorageLevel
58+
) extends ReceiverInputDStream[SparkFlumePollingEvent](_ssc) {
59+
6460
override def getReceiver(): Receiver[SparkFlumePollingEvent] = {
6561
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
6662
}
6763
}
6864

69-
private[streaming] class FlumePollingReceiver(
70-
addresses: Seq[InetSocketAddress],
71-
maxBatchSize: Int,
72-
parallelism: Int,
73-
storageLevel: StorageLevel
74-
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {
65+
private[streaming]
66+
class FlumePollingReceiver(
67+
addresses: Seq[InetSocketAddress],
68+
maxBatchSize: Int,
69+
parallelism: Int,
70+
storageLevel: StorageLevel
71+
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {
7572

7673
lazy val channelFactoryExecutor =
7774
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
@@ -150,14 +147,6 @@ private[streaming] class FlumePollingReceiver(
150147
}
151148
}
152149

153-
override def store(dataItem: SparkFlumePollingEvent) {
154-
// Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
155-
// This takes a performance hit, since the parallelism is useful only for pulling data now.
156-
this.synchronized {
157-
super.store(dataItem)
158-
}
159-
}
160-
161150
override def onStop(): Unit = {
162151
logInfo("Shutting down Flume Polling Receiver")
163152
receiverExecutor.shutdownNow()
@@ -176,6 +165,9 @@ private[streaming] class FlumePollingReceiver(
176165
private class FlumeConnection(val transceiver: NettyTransceiver,
177166
val client: SparkFlumeProtocol.Callback)
178167

168+
/**
169+
* Companion object of [[SparkFlumePollingEvent]]
170+
*/
179171
private[streaming] object SparkFlumePollingEvent {
180172
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
181173
val event = new SparkFlumePollingEvent()
@@ -189,7 +181,7 @@ private[streaming] object SparkFlumePollingEvent {
189181
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
190182
* around that to make it externalizable.
191183
*/
192-
class SparkFlumePollingEvent() extends Externalizable with Logging {
184+
class SparkFlumePollingEvent extends Externalizable with Logging {
193185
var event: SparkSinkEvent = new SparkSinkEvent()
194186

195187
/* De-serialize from bytes. */

0 commit comments

Comments
 (0)