Skip to content

Commit c068d90

Browse files
zsxwingandrewor14
authored andcommitted
SPARK-1656: Fix potential resource leaks
JIRA: https://issues.apache.org/jira/browse/SPARK-1656 Author: zsxwing <[email protected]> Closes #577 from zsxwing/SPARK-1656 and squashes the following commits: c431095 [zsxwing] Add a comment and fix the code style 2de96e5 [zsxwing] Make sure file will be deleted if exception happens 28b90dc [zsxwing] Update to follow the code style 4521d6e [zsxwing] Merge branch 'master' into SPARK-1656 afc3383 [zsxwing] Update to follow the code style 071fdd1 [zsxwing] SPARK-1656: Fix potential resource leaks (cherry picked from commit a7c7313) Signed-off-by: Andrew Or <[email protected]>
1 parent d9cf4d0 commit c068d90

File tree

3 files changed

+40
-15
lines changed

3 files changed

+40
-15
lines changed

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {
163163

164164
private def write(id: Long, value: Any) {
165165
val file = getFile(id)
166-
val out: OutputStream = {
167-
if (compress) {
168-
compressionCodec.compressedOutputStream(new FileOutputStream(file))
169-
} else {
170-
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
166+
val fileOutputStream = new FileOutputStream(file)
167+
try {
168+
val out: OutputStream = {
169+
if (compress) {
170+
compressionCodec.compressedOutputStream(fileOutputStream)
171+
} else {
172+
new BufferedOutputStream(fileOutputStream, bufferSize)
173+
}
171174
}
175+
val ser = SparkEnv.get.serializer.newInstance()
176+
val serOut = ser.serializeStream(out)
177+
serOut.writeObject(value)
178+
serOut.close()
179+
files += file
180+
} finally {
181+
fileOutputStream.close()
172182
}
173-
val ser = SparkEnv.get.serializer.newInstance()
174-
val serOut = ser.serializeStream(out)
175-
serOut.writeObject(value)
176-
serOut.close()
177-
files += file
178183
}
179184

180185
private def read[T: ClassTag](id: Long): T = {

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
8383
val serialized = serializer.toBinary(value)
8484

8585
val out = new FileOutputStream(file)
86-
out.write(serialized)
87-
out.close()
86+
try {
87+
out.write(serialized)
88+
} finally {
89+
out.close()
90+
}
8891
}
8992

9093
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
9194
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
9295
val dis = new DataInputStream(new FileInputStream(file))
93-
dis.readFully(fileData)
94-
dis.close()
96+
try {
97+
dis.readFully(fileData)
98+
} finally {
99+
dis.close()
100+
}
95101

96102
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
97103
val serializer = serialization.serializerFor(clazz)

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
7373
val startTime = System.currentTimeMillis
7474
val file = diskManager.getFile(blockId)
7575
val outputStream = new FileOutputStream(file)
76-
blockManager.dataSerializeStream(blockId, outputStream, values)
76+
try {
77+
try {
78+
blockManager.dataSerializeStream(blockId, outputStream, values)
79+
} finally {
80+
// Close outputStream here because it should be closed before file is deleted.
81+
outputStream.close()
82+
}
83+
} catch {
84+
case e: Throwable =>
85+
if (file.exists()) {
86+
file.delete()
87+
}
88+
throw e
89+
}
90+
7791
val length = file.length
7892

7993
val timeTaken = System.currentTimeMillis - startTime

0 commit comments

Comments
 (0)