Skip to content

SPARK-1656: Fix potential resource leaks #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,23 @@ private[broadcast] object HttpBroadcast extends Logging {

private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
val fileOutputStream = new FileOutputStream(file)
try {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
} else {
new BufferedOutputStream(fileOutputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
} finally {
fileOutputStream.close()
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
}

private def read[T: ClassTag](id: Long): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
val serialized = serializer.toBinary(value)

val out = new FileOutputStream(file)
out.write(serialized)
out.close()
try {
out.write(serialized)
} finally {
out.close()
}
}

def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
try {
dis.readFully(fileData)
} finally {
dis.close()
}

val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,22 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values)
try {
try {
blockManager.dataSerializeStream(blockId, outputStream, values)
} finally {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
} catch {
case e: Throwable => {
if (file.exists()) {
file.delete()
}
throw e
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you actually want to handle this case; you should delete the file in case write failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you had before was actually clearer. Nested trys is a little more complicated. If you were worried about the 1 line duplicate code then maybe you can just add a comment to explain that "outputStream should closed before file is delete" or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that outputStream.close also may throw IOException. Therefore I changed to this approach to make sure if any exception is thrown, we can always try to delete the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I'm still not so sure about having the nested trys. Doesn't this argument also apply to other changes made in this PR? I think it might be OK to just keep the close in both try and catch and have only 1 level of try-catch. It's already strictly better than before (maybe others disagree).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous codes are:

1    try {
2     blockManager.dataSerializeStream(blockId, outputStream, values)
3      outputStream.close()
4    } catch {
5      case e: Throwable => {
6        outputStream.close()
7        if(file.exists()) {
8          file.delete()
9        }
10        throw e
11      }
12   }

My concern is if L6 outputStream.close() throws an exception, if(file.exists()) { file.delete() } won't be executed. So I changed to the nested trys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 do you agree my concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that seems fine.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do the full try catch finally here, and put outputStream.close() in finally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to close the stream before deleting the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Unix at least that's not necessary. It's possible to delete a file with open fds. The inodes will be released when you close the fd.

Don't know if Windows follows the same semantics, though.

val length = file.length

val timeTaken = System.currentTimeMillis - startTime
Expand Down