Skip to content

[SPARK-12961][Core] Prevent snappy-java memory leak #10875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
*/
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException(e)
}
val version = SnappyCompressionCodec.version

override def compressedOutputStream(s: OutputStream): OutputStream = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added by @ksakellis in #3119; the original motivation related to error reporting. I suppose there's a chance that the change in this patch changes that behavior slightly, but that might not be a huge deal. Let's see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I guess getNativeLibraryVersion might be kind of expensive due to the IO that it does, so maybe calling it less frequently will give us a perf. boost?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so.

val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
Expand All @@ -164,6 +159,19 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Object guards against memory leak bug in snappy-java library:
* (https://github.com/xerial/snappy-java/issues/131).
* Before a new version of the library, we only call the method once and cache the result.
*/
private final object SnappyCompressionCodec {
private lazy val version: String = try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException(e)
}
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
Expand Down