Skip to content

Commit 800ecff

Browse files
harishreedharantdas
authored andcommitted
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu...
...sh model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance. Author: Hari Shreedharan <[email protected]> Author: Hari Shreedharan <[email protected]> Author: Tathagata Das <[email protected]> Author: harishreedharan <[email protected]> Closes apache#807 from harishreedharan/master and squashes the following commits: e7f70a3 [Hari Shreedharan] Merge remote-tracking branch 'asf-git/master' 96cfb6f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' e48d785 [Hari Shreedharan] Documenting flume-sink being ignored for Mima checks. 5f212ce [Hari Shreedharan] Ignore Spark Sink from mima. 981bf62 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 7a1bc6e [Hari Shreedharan] Fix SparkBuild.scala a082eb3 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 1f47364 [Hari Shreedharan] Minor fixes. 73d6f6d [Hari Shreedharan] Cleaned up tests a bit. Added some docs in multiple places. 65b76b4 [Hari Shreedharan] Fixing the unit test. e59cc20 [Hari Shreedharan] Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method. f3c99d1 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 3572180 [Hari Shreedharan] Adding a license header, making Jenkins happy. 799509f [Hari Shreedharan] Fix a compile issue. 3c5194c [Hari Shreedharan] Merge remote-tracking branch 'asf/master' d248d22 [harishreedharan] Merge pull request #1 from tdas/flume-polling 10b6214 [Tathagata Das] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java. 1edc806 [Hari Shreedharan] SPARK-1729. Update logging in Spark Sink. 8c00289 [Hari Shreedharan] More debug messages 393bd94 [Hari Shreedharan] SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections. 120e2a1 [Hari Shreedharan] SPARK-1729. Some test changes and changes to utils classes. 9fd0da7 [Hari Shreedharan] SPARK-1729. Use foreach instead of map for all Options. 8136aa6 [Hari Shreedharan] Adding TransactionProcessor to map on returning batch of data 86aa274 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 205034d [Hari Shreedharan] Merging master in 4b0c7fc [Hari Shreedharan] FLUME-1729. New Flume-Spark integration. bda01fc [Hari Shreedharan] FLUME-1729. Flume-Spark integration. 0d69604 [Hari Shreedharan] FLUME-1729. Better Flume-Spark integration. 3c23c18 [Hari Shreedharan] SPARK-1729. New Spark-Flume integration. 70bcc2a [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. d6fa3aa [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. e7da512 [Hari Shreedharan] SPARK-1729. Fixing import order 9741683 [Hari Shreedharan] SPARK-1729. Fixes based on review. c604a3c [Hari Shreedharan] SPARK-1729. Optimize imports. 0f10788 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 87775aa [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 8df37e4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 03d6c1c [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 08176ad [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model d24d9d4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 6d6776a [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
1 parent fc4d057 commit 800ecff

File tree

18 files changed

+1524
-13
lines changed

18 files changed

+1524
-13
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.streaming._
23+
import org.apache.spark.streaming.flume._
24+
import org.apache.spark.util.IntParam
25+
import java.net.InetSocketAddress
26+
27+
/**
28+
* Produces a count of events received from Flume.
29+
*
30+
* This should be used in conjunction with the Spark Sink running in a Flume agent. See
31+
* the Spark Streaming programming guide for more details.
32+
*
33+
* Usage: FlumePollingEventCount <host> <port>
34+
* `host` is the host on which the Spark Sink is running.
35+
* `port` is the port at which the Spark Sink is listening.
36+
*
37+
* To run this example:
38+
* `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
39+
*/
40+
object FlumePollingEventCount {
41+
def main(args: Array[String]) {
42+
if (args.length < 2) {
43+
System.err.println(
44+
"Usage: FlumePollingEventCount <host> <port>")
45+
System.exit(1)
46+
}
47+
48+
StreamingExamples.setStreamingLogLevels()
49+
50+
val Array(host, IntParam(port)) = args
51+
52+
val batchInterval = Milliseconds(2000)
53+
54+
// Create the context and set the batch size
55+
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
56+
val ssc = new StreamingContext(sparkConf, batchInterval)
57+
58+
// Create a flume stream that polls the Spark Sink running in a Flume agent
59+
val stream = FlumeUtils.createPollingStream(ssc, host, port)
60+
61+
// Print out the count of events received from this server in each batch
62+
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
63+
64+
ssc.start()
65+
ssc.awaitTermination()
66+
}
67+
}

external/flume-sink/pom.xml

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent</artifactId>
24+
<version>1.1.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
29+
<properties>
30+
<sbt.project.name>streaming-flume-sink</sbt.project.name>
31+
</properties>
32+
33+
<packaging>jar</packaging>
34+
<name>Spark Project External Flume Sink</name>
35+
<url>http://spark.apache.org/</url>
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.flume</groupId>
39+
<artifactId>flume-ng-sdk</artifactId>
40+
<version>1.4.0</version>
41+
<exclusions>
42+
<exclusion>
43+
<groupId>io.netty</groupId>
44+
<artifactId>netty</artifactId>
45+
</exclusion>
46+
<exclusion>
47+
<groupId>org.apache.thrift</groupId>
48+
<artifactId>libthrift</artifactId>
49+
</exclusion>
50+
</exclusions>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.flume</groupId>
54+
<artifactId>flume-ng-core</artifactId>
55+
<version>1.4.0</version>
56+
<exclusions>
57+
<exclusion>
58+
<groupId>io.netty</groupId>
59+
<artifactId>netty</artifactId>
60+
</exclusion>
61+
<exclusion>
62+
<groupId>org.apache.thrift</groupId>
63+
<artifactId>libthrift</artifactId>
64+
</exclusion>
65+
</exclusions>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.scala-lang</groupId>
69+
<artifactId>scala-library</artifactId>
70+
<version>2.10.4</version>
71+
</dependency>
72+
</dependencies>
73+
<build>
74+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
75+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
76+
<plugins>
77+
<plugin>
78+
<groupId>org.scalatest</groupId>
79+
<artifactId>scalatest-maven-plugin</artifactId>
80+
</plugin>
81+
<plugin>
82+
<groupId>org.apache.avro</groupId>
83+
<artifactId>avro-maven-plugin</artifactId>
84+
<version>1.7.3</version>
85+
<configuration>
86+
<!-- Generate the output in the same directory as the sbt-avro-plugin -->
87+
<outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
88+
</configuration>
89+
<executions>
90+
<execution>
91+
<phase>generate-sources</phase>
92+
<goals>
93+
<goal>idl-protocol</goal>
94+
</goals>
95+
</execution>
96+
</executions>
97+
</plugin>
98+
</plugins>
99+
</build>
100+
</project>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
@namespace("org.apache.spark.streaming.flume.sink")
21+
22+
protocol SparkFlumeProtocol {
23+
24+
record SparkSinkEvent {
25+
map<string> headers;
26+
bytes body;
27+
}
28+
29+
record EventBatch {
30+
string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
31+
string sequenceNumber;
32+
array<SparkSinkEvent> events;
33+
}
34+
35+
EventBatch getEventBatch (int n);
36+
37+
void ack (string sequenceNumber);
38+
39+
void nack (string sequenceNumber);
40+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.flume.sink
18+
19+
import org.slf4j.{Logger, LoggerFactory}
20+
21+
/**
22+
* Copy of the org.apache.spark.Logging for being used in the Spark Sink.
23+
* The org.apache.spark.Logging is not used so that all of Spark is not brought
24+
* in as a dependency.
25+
*/
26+
private[sink] trait Logging {
27+
// Make the log field transient so that objects with Logging can
28+
// be serialized and used on another machine
29+
@transient private var log_ : Logger = null
30+
31+
// Method to get or create the logger for this object
32+
protected def log: Logger = {
33+
if (log_ == null) {
34+
initializeIfNecessary()
35+
var className = this.getClass.getName
36+
// Ignore trailing $'s in the class names for Scala objects
37+
if (className.endsWith("$")) {
38+
className = className.substring(0, className.length - 1)
39+
}
40+
log_ = LoggerFactory.getLogger(className)
41+
}
42+
log_
43+
}
44+
45+
// Log methods that take only a String
46+
protected def logInfo(msg: => String) {
47+
if (log.isInfoEnabled) log.info(msg)
48+
}
49+
50+
protected def logDebug(msg: => String) {
51+
if (log.isDebugEnabled) log.debug(msg)
52+
}
53+
54+
protected def logTrace(msg: => String) {
55+
if (log.isTraceEnabled) log.trace(msg)
56+
}
57+
58+
protected def logWarning(msg: => String) {
59+
if (log.isWarnEnabled) log.warn(msg)
60+
}
61+
62+
protected def logError(msg: => String) {
63+
if (log.isErrorEnabled) log.error(msg)
64+
}
65+
66+
// Log methods that take Throwables (Exceptions/Errors) too
67+
protected def logInfo(msg: => String, throwable: Throwable) {
68+
if (log.isInfoEnabled) log.info(msg, throwable)
69+
}
70+
71+
protected def logDebug(msg: => String, throwable: Throwable) {
72+
if (log.isDebugEnabled) log.debug(msg, throwable)
73+
}
74+
75+
protected def logTrace(msg: => String, throwable: Throwable) {
76+
if (log.isTraceEnabled) log.trace(msg, throwable)
77+
}
78+
79+
protected def logWarning(msg: => String, throwable: Throwable) {
80+
if (log.isWarnEnabled) log.warn(msg, throwable)
81+
}
82+
83+
protected def logError(msg: => String, throwable: Throwable) {
84+
if (log.isErrorEnabled) log.error(msg, throwable)
85+
}
86+
87+
protected def isTraceEnabled(): Boolean = {
88+
log.isTraceEnabled
89+
}
90+
91+
private def initializeIfNecessary() {
92+
if (!Logging.initialized) {
93+
Logging.initLock.synchronized {
94+
if (!Logging.initialized) {
95+
initializeLogging()
96+
}
97+
}
98+
}
99+
}
100+
101+
private def initializeLogging() {
102+
Logging.initialized = true
103+
104+
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
105+
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
106+
log
107+
}
108+
}
109+
110+
private[sink] object Logging {
111+
@volatile private var initialized = false
112+
val initLock = new Object()
113+
try {
114+
// We use reflection here to handle the case where users remove the
115+
// slf4j-to-jul bridge order to route their logs to JUL.
116+
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
117+
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
118+
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
119+
if (!installed) {
120+
bridgeClass.getMethod("install").invoke(null)
121+
}
122+
} catch {
123+
case e: ClassNotFoundException => // can't log anything yet so just fail silently
124+
}
125+
}

0 commit comments

Comments
 (0)