Skip to content

[SPARK-8015] [flume] Remove Guava dependency from flume-sink. #6555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,46 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<exclusions>
<!-- Guava is excluded to avoid its use in this module. -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<!--
Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
dependency.
-->
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<!-- Add Guava in test scope since flume actually needs it. -->
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to add this in test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I missed the earlier conversation. But instead of randomly adding guava back for test, its better to remove the wrong version of guava that is coming in from somewhere, and allow Flume's guava dependency to be used. At least, it needs to be understood. Could you check the dependency tree to find the wrong guava version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no wrong version of Guava.

I added this because I removed Guava from compile-scope dependencies, to avoid having people use it in this module. Not because of a conflict.

If instead you'd rather keep it, but at the same version as flume (whatever that is), it could be done also. I kinda prefer avoiding it, though.

As I mention in the PR description, the minimal fix doesn't remove Guava at all, just removes the code to relocate it. I just thought it would be better to avoid another potential dependency mess here since it's not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I was misunderstanding the problem. It makes sense now. Can you add comments on why Guava and thrift has been excluded, explaining the problem and mentioning the JIRA.

<scope>test</scope>
</dependency>
<dependency>
<!--
Netty explicitly added in test as it has been excluded from
Expand Down Expand Up @@ -85,6 +116,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<!-- Disable all relocations defined in the parent pom. -->
<relocations combine.self="override" />
</configuration>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth adding a short comment here explaining why this is necessary? maybe this is obvious to others more versed in maven, but it was a little mysterious to me. Then again we don't doc a lot of the intricacies of the build system. anyway, just a suggestion, I'll leave it up to your judgement.

</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flume.Channel
import org.apache.commons.lang3.RandomStringUtils

Expand All @@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
// Protected by `sequenceNumberToProcessor`
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume.sink

import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicLong

/**
* Thread factory that generates daemon threads with a specified name format.
*/
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {

private val threadId = new AtomicLong()

override def newThread(r: Runnable): Thread = {
val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
t.setDaemon(true)
t
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.Context
Expand Down Expand Up @@ -194,9 +193,8 @@ class SparkSinkSuite extends FunSuite {
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {

(1 to count).map(_ => {
lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("Flume Receiver Channel Thread - %d").build())
lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
lazy val channelFactory =
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
val transceiver = new NettyTransceiver(address, channelFactory)
Expand Down