Skip to content

Commit 0eb17f8

Browse files
committed
[SC-5559] Support logical delete and gc of commit markers in DatabricksAtomicCommitProtocol
## What changes were proposed in this pull request? This implements the newly added `deleteWithJob()` hook in `DatabricksAtomicCommitProtocol`. It can be flag-enabled independent of the commit protocol choice. I also implemented support for eventual gc of the commit markers using the mechanism we discussed of marking them as deleted in another commit marker. ### Config flags: com.databricks.sql.enableLogicalDelete -- whether to enable atomic overwrites (default true) ## How was this patch tested? Existing unit tests. Author: Eric Liang <[email protected]> Author: Eric Liang <[email protected]> Closes apache#167 from ericl/overwrite.
1 parent e40daf8 commit 0eb17f8

File tree

6 files changed

+313
-37
lines changed

6 files changed

+313
-37
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.internal.io
1919

20+
import org.apache.hadoop.fs._
2021
import org.apache.hadoop.mapreduce._
2122

2223
import org.apache.spark.util.Utils
@@ -112,6 +113,15 @@ abstract class FileCommitProtocol {
112113
* just crashes (or killed) before it can call abort.
113114
*/
114115
def abortTask(taskContext: TaskAttemptContext): Unit
116+
117+
/**
118+
* Specifies that a file should be deleted with the commit of this job. The default
119+
* implementation deletes the file immediately, but this may be overriden to delay the physical
120+
* deletion of the file until commit time.
121+
*/
122+
def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
123+
fs.delete(path, recursive)
124+
}
115125
}
116126

117127

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
4848
import DatabricksAtomicCommitProtocol._
4949

5050
// Globally unique alphanumeric string. We decouple this from jobId for possible future use.
51-
private val txnId: TxnId = math.abs(scala.util.Random.nextLong).toString
51+
private val txnId: TxnId = newTxnId()
5252

5353
// The list of files staged by this committer. These are collected to the driver on task commit.
5454
private val stagedFiles = mutable.Set[String]()
@@ -79,6 +79,42 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
7979
finalPath.toString
8080
}
8181

82+
override def deleteWithJob(_fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
83+
val fs = testingFs.getOrElse(_fs)
84+
val sparkSession = SparkSession.getActiveSession.get
85+
if (!sparkSession.sqlContext.getConf(
86+
"com.databricks.sql.enableLogicalDelete", "true").toBoolean) {
87+
return super.deleteWithJob(fs, path, recursive)
88+
}
89+
if (recursive && fs.getFileStatus(path).isFile) {
90+
// In this case Spark is attempting to delete a file to make room for a directory.
91+
// We cannot stage this sort of deletion, so just perform it immediately.
92+
logWarning(s"Deleting $path immediately since it is a file not directory.")
93+
return super.deleteWithJob(fs, path, true)
94+
}
95+
if (recursive) {
96+
val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory)
97+
val resolvedFiles = filterDirectoryListing(fs, path, initialFiles)
98+
stagedDeletions ++= resolvedFiles.map(_.getPath).filter { path =>
99+
path.getName match {
100+
// Don't allow our metadata markers to be deleted with this API. That can result in
101+
// unexpected results if e.g. a start marker is deleted in the middle of a job.
102+
case STARTED_MARKER(_) | COMMITTED_MARKER(_) => false
103+
case _ => true
104+
}
105+
}.toList
106+
dirs.foreach { dir =>
107+
deleteWithJob(fs, dir.getPath, true)
108+
}
109+
} else {
110+
if (fs.getFileStatus(path).isDirectory) {
111+
throw new IOException(s"Cannot delete directory $path unless recursive=true.")
112+
}
113+
stagedDeletions += path
114+
}
115+
true
116+
}
117+
82118
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
83119
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
84120
// the file name is fine and won't overflow.
@@ -172,7 +208,7 @@ object DatabricksAtomicCommitProtocol extends Logging {
172208
* @return the list of deleted files
173209
*/
174210
def vacuum(path: Path, horizon: Long): List[Path] = {
175-
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
211+
val fs = testingFs.getOrElse(path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration))
176212
val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory)
177213

178214
def checkPositive(time: Long): Long = { assert(time > 0); time }
@@ -203,6 +239,7 @@ object DatabricksAtomicCommitProtocol extends Logging {
203239
s"(${state.getStartTime(txnId)} < $horizon).")
204240
delete(file.getPath)
205241

242+
// always safe to delete since the commit marker is present
206243
case STARTED_MARKER(txnId) if state.isCommitted(txnId) &&
207244
checkPositive(file.getModificationTime) < horizon =>
208245
logInfo(s"Garbage collecting start marker ${file.getPath} of committed job.")
@@ -212,6 +249,39 @@ object DatabricksAtomicCommitProtocol extends Logging {
212249
}
213250
}
214251

252+
// Queue up stale markers for deletion. We do this by writing out a _committed file that
253+
// will cause them to be garbage collected in the next cycle.
254+
var deleteLater: List[Path] = Nil
255+
for (file <- resolvedFiles) {
256+
file.getPath.getName match {
257+
case name @ COMMITTED_MARKER(txnId) if state.getDeletionTime(name) == 0 &&
258+
checkPositive(file.getModificationTime) < horizon =>
259+
val startMarker = new Path(file.getPath.getParent, s"_started_$txnId")
260+
if (fs.exists(startMarker)) {
261+
delete(startMarker) // make sure we delete it just in case
262+
}
263+
deleteLater ::= file.getPath
264+
265+
// the data files were deleted above, but we need to delay marker deletion
266+
case STARTED_MARKER(txnId) if !state.isCommitted(txnId) &&
267+
checkPositive(file.getModificationTime) < horizon =>
268+
deleteLater ::= file.getPath
269+
270+
case _ =>
271+
}
272+
}
273+
274+
if (deleteLater.nonEmpty) {
275+
val vacuumCommitMarker = new Path(path, "_committed_vacuum" + newTxnId())
276+
val output = fs.create(vacuumCommitMarker)
277+
deleteLater ::= vacuumCommitMarker // it's self-deleting!
278+
try {
279+
serializeFileChanges(Nil, deleteLater.map(_.getName), output)
280+
} finally {
281+
output.close()
282+
}
283+
}
284+
215285
// recurse
216286
for (d <- dirs) {
217287
deletedPaths :::= vacuum(d.getPath, horizon)
@@ -223,4 +293,6 @@ object DatabricksAtomicCommitProtocol extends Logging {
223293

224294
deletedPaths
225295
}
296+
297+
private def newTxnId(): String = math.abs(scala.util.Random.nextLong).toString
226298
}

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package org.apache.spark.sql.transaction
1010

11-
import java.io.{File, InputStream, InputStreamReader, IOException, OutputStream}
11+
import java.io.{File, FileNotFoundException, InputStream, InputStreamReader, IOException, OutputStream}
1212
import java.nio.charset.StandardCharsets
1313

1414
import scala.collection.mutable
@@ -60,10 +60,10 @@ object DatabricksAtomicReadProtocol extends Logging {
6060
val name = f.getPath.getName
6161
name match {
6262
case _ if state.getDeletionTime(name) > 0 =>
63-
logInfo(s"Ignoring ${f.getPath} since it is marked as deleted.")
63+
logDebug(s"Ignoring ${f.getPath} since it is marked as deleted.")
6464
false
6565
case FILE_WITH_TXN_ID(txnId) if !state.isFileCommitted(txnId, name) =>
66-
logInfo(s"Ignoring ${f.getPath} since it is not marked as committed.")
66+
logDebug(s"Ignoring ${f.getPath} since it is not marked as committed.")
6767
false
6868
case _ =>
6969
true
@@ -155,7 +155,7 @@ object DatabricksAtomicReadProtocol extends Logging {
155155

156156
if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) &&
157157
state.lastModified > clock.getTimeMillis - horizonMillis) {
158-
logDebug("Repeating list request since some files are suspected to be missing.")
158+
logInfo("Repeating list request since some files are suspected to be missing.")
159159
val newlyCommitted = mutable.Set[TxnId]()
160160
val extraStatuses = fs.listStatus(dir).filter { f =>
161161
f.isFile && (f.getPath.getName match {
@@ -199,6 +199,9 @@ object DatabricksAtomicReadProtocol extends Logging {
199199
(state, initialFiles)
200200
}
201201
} else {
202+
logDebug("List request was not repeated since " + state.missingMarkers.nonEmpty + " " +
203+
state.missingDataFiles.nonEmpty + " " + state.lastModified + " " + clock.getTimeMillis +
204+
" " + horizonMillis)
202205
(state, initialFiles)
203206
}
204207
}
@@ -232,12 +235,16 @@ object DatabricksAtomicReadProtocol extends Logging {
232235
}
233236
commitMarkers(txnId) = filesAdded.toSet
234237
} catch {
238+
case e: FileNotFoundException =>
239+
logWarning("Job commit marker disappeared before we could read it: " + stat)
240+
corruptCommitMarkers.add(txnId)
241+
235242
case NonFatal(e) =>
236243
// we use SparkEnv for this escape-hatch flag since this may be called on executors
237244
if (SparkEnv.get.conf.getBoolean(
238245
"spark.databricks.sql.ignoreCorruptCommitMarkers", false)) {
239246
logWarning("Failed to read job commit marker: " + stat, e)
240-
corruptCommitMarkers += txnId
247+
corruptCommitMarkers.add(txnId)
241248
} else {
242249
throw new IOException("Failed to read job commit marker: " + stat, e)
243250
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,20 @@ case class InsertIntoHadoopFsRelationCommand(
7575
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
7676

7777
val pathExists = fs.exists(qualifiedOutputPath)
78+
// If we are appending data to an existing dir.
79+
val isAppend = pathExists && (mode == SaveMode.Append)
80+
81+
val committer = FileCommitProtocol.instantiate(
82+
sparkSession.sessionState.conf.fileCommitProtocolClass,
83+
jobId = java.util.UUID.randomUUID().toString,
84+
outputPath = outputPath.toString,
85+
isAppend = isAppend)
86+
7887
val doInsertion = (mode, pathExists) match {
7988
case (SaveMode.ErrorIfExists, true) =>
8089
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
8190
case (SaveMode.Overwrite, true) =>
82-
deleteMatchingPartitions(fs, qualifiedOutputPath)
91+
deleteMatchingPartitions(fs, qualifiedOutputPath, committer)
8392
true
8493
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
8594
true
@@ -88,15 +97,8 @@ case class InsertIntoHadoopFsRelationCommand(
8897
case (s, exists) =>
8998
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
9099
}
91-
// If we are appending data to an existing dir.
92-
val isAppend = pathExists && (mode == SaveMode.Append)
93100

94101
if (doInsertion) {
95-
val committer = FileCommitProtocol.instantiate(
96-
sparkSession.sessionState.conf.fileCommitProtocolClass,
97-
jobId = java.util.UUID.randomUUID().toString,
98-
outputPath = outputPath.toString,
99-
isAppend = isAppend)
100102

101103
FileFormatWriter.write(
102104
sparkSession = sparkSession,
@@ -121,7 +123,8 @@ case class InsertIntoHadoopFsRelationCommand(
121123
* Deletes all partition files that match the specified static prefix. Partitions with custom
122124
* locations are also cleared based on the custom locations map given to this class.
123125
*/
124-
private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
126+
private def deleteMatchingPartitions(
127+
fs: FileSystem, qualifiedOutputPath: Path, committer: FileCommitProtocol): Unit = {
125128
val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
126129
"/" + partitionColumns.flatMap { p =>
127130
staticPartitionKeys.get(p.name) match {
@@ -136,7 +139,7 @@ case class InsertIntoHadoopFsRelationCommand(
136139
}
137140
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
138141
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
139-
if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
142+
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
140143
throw new IOException(s"Unable to clear output " +
141144
s"directory $staticPrefixPath prior to writing to it")
142145
}
@@ -146,7 +149,7 @@ case class InsertIntoHadoopFsRelationCommand(
146149
(staticPartitionKeys.toSet -- spec).isEmpty,
147150
"Custom partition location did not match static partitioning keys")
148151
val path = new Path(customLoc)
149-
if (fs.exists(path) && !fs.delete(path, true)) {
152+
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
150153
throw new IOException(s"Unable to clear partition " +
151154
s"directory $path prior to writing to it")
152155
}

0 commit comments

Comments
 (0)