Skip to content

Commit 065f173

Browse files
Udbhav30dongjoon-hyun
authored andcommitted
[SPARK-32481][CORE][SQL] Support truncate table to move data to trash
### What changes were proposed in this pull request? Instead of deleting the data, we can move the data to trash. Based on the configuration provided by the user it will be deleted permanently from the trash. ### Why are the changes needed? Instead of directly deleting the data, we can provide flexibility to move data to the trash and then delete it permanently. ### Does this PR introduce _any_ user-facing change? Yes, After truncate table the data is not permanently deleted now. It is first moved to the trash and then after the given time deleted permanently; ### How was this patch tested? new UTs added Closes #29552 from Udbhav30/truncate. Authored-by: Udbhav30 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cfe012a commit 065f173

File tree

4 files changed

+116
-2
lines changed
  • core/src/main/scala/org/apache/spark/util
  • sql
    • catalyst/src/main/scala/org/apache/spark/sql/internal
    • core/src

4 files changed

+116
-2
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
5050
import org.apache.commons.codec.binary.Hex
5151
import org.apache.commons.lang3.SystemUtils
5252
import org.apache.hadoop.conf.Configuration
53-
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
53+
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
5454
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
5555
import org.apache.hadoop.security.UserGroupInformation
5656
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -269,6 +269,29 @@ private[spark] object Utils extends Logging {
269269
file.setExecutable(true, true)
270270
}
271271

272+
/**
273+
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
274+
* delete the data permanently. If move data to trash failed fallback to hard deletion.
275+
*/
276+
def moveToTrashOrDelete(
277+
fs: FileSystem,
278+
partitionPath: Path,
279+
isTrashEnabled: Boolean,
280+
hadoopConf: Configuration): Boolean = {
281+
if (isTrashEnabled) {
282+
logDebug(s"Try to move data ${partitionPath.toString} to trash")
283+
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
284+
if (!isSuccess) {
285+
logWarning(s"Failed to move data ${partitionPath.toString} to trash. " +
286+
"Fallback to hard deletion")
287+
return fs.delete(partitionPath, true)
288+
}
289+
isSuccess
290+
} else {
291+
fs.delete(partitionPath, true)
292+
}
293+
}
294+
272295
/**
273296
* Create a directory given the abstract pathname
274297
* @return true, if the directory is successfully created; otherwise, return false.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,18 @@ object SQLConf {
27322732
.booleanConf
27332733
.createWithDefault(false)
27342734

2735+
val TRUNCATE_TRASH_ENABLED =
2736+
buildConf("spark.sql.truncate.trash.enabled")
2737+
.doc("This configuration decides when truncating table, whether data files will be moved " +
2738+
"to trash directory or deleted permanently. The trash retention time is controlled by " +
2739+
"'fs.trash.interval', and in default, the server side configuration value takes " +
2740+
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
2741+
"this will be a no-op and log a warning message. If the data fails to be moved to " +
2742+
"trash, Spark will turn to delete it permanently.")
2743+
.version("3.1.0")
2744+
.booleanConf
2745+
.createWithDefault(false)
2746+
27352747
/**
27362748
* Holds information about keys that have been deprecated.
27372749
*
@@ -3350,6 +3362,8 @@ class SQLConf extends Serializable with Logging {
33503362

33513363
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
33523364

3365+
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
3366+
33533367
/** ********************** SQLConf functionality methods ************ */
33543368

33553369
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
4848
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
4949
import org.apache.spark.sql.types._
5050
import org.apache.spark.sql.util.SchemaUtils
51+
import org.apache.spark.util.Utils
5152

5253
/**
5354
* A command to create a table with the same definition of the given existing table.
@@ -489,6 +490,7 @@ case class TruncateTableCommand(
489490
}
490491
val hadoopConf = spark.sessionState.newHadoopConf()
491492
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
493+
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
492494
locations.foreach { location =>
493495
if (location.isDefined) {
494496
val path = new Path(location.get)
@@ -513,7 +515,7 @@ case class TruncateTableCommand(
513515
}
514516
}
515517

516-
fs.delete(path, true)
518+
Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf)
517519

518520
// We should keep original permission/acl of the path.
519521
// For owner/group, only super-user can set it, for example on HDFS. Because

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3101,6 +3101,81 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31013101
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
31023102
}
31033103
}
3104+
3105+
test("SPARK-32481 Move data to trash on truncate table if enabled") {
3106+
val trashIntervalKey = "fs.trash.interval"
3107+
withTable("tab1") {
3108+
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3109+
sql("CREATE TABLE tab1 (col INT) USING parquet")
3110+
sql("INSERT INTO tab1 SELECT 1")
3111+
// scalastyle:off hadoopconfiguration
3112+
val hadoopConf = spark.sparkContext.hadoopConfiguration
3113+
// scalastyle:on hadoopconfiguration
3114+
val originalValue = hadoopConf.get(trashIntervalKey, "0")
3115+
val tablePath = new Path(spark.sessionState.catalog
3116+
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3117+
3118+
val fs = tablePath.getFileSystem(hadoopConf)
3119+
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3120+
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3121+
assert(!fs.exists(trashPath))
3122+
try {
3123+
hadoopConf.set(trashIntervalKey, "5")
3124+
sql("TRUNCATE TABLE tab1")
3125+
} finally {
3126+
hadoopConf.set(trashIntervalKey, originalValue)
3127+
}
3128+
assert(fs.exists(trashPath))
3129+
fs.delete(trashPath, true)
3130+
}
3131+
}
3132+
}
3133+
3134+
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
3135+
val trashIntervalKey = "fs.trash.interval"
3136+
withTable("tab1") {
3137+
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3138+
sql("CREATE TABLE tab1 (col INT) USING parquet")
3139+
sql("INSERT INTO tab1 SELECT 1")
3140+
// scalastyle:off hadoopconfiguration
3141+
val hadoopConf = spark.sparkContext.hadoopConfiguration
3142+
// scalastyle:on hadoopconfiguration
3143+
val originalValue = hadoopConf.get(trashIntervalKey, "0")
3144+
val tablePath = new Path(spark.sessionState.catalog
3145+
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3146+
3147+
val fs = tablePath.getFileSystem(hadoopConf)
3148+
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3149+
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3150+
assert(!fs.exists(trashPath))
3151+
try {
3152+
hadoopConf.set(trashIntervalKey, "0")
3153+
sql("TRUNCATE TABLE tab1")
3154+
} finally {
3155+
hadoopConf.set(trashIntervalKey, originalValue)
3156+
}
3157+
assert(!fs.exists(trashPath))
3158+
}
3159+
}
3160+
}
3161+
3162+
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
3163+
withTable("tab1") {
3164+
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
3165+
sql("CREATE TABLE tab1 (col INT) USING parquet")
3166+
sql("INSERT INTO tab1 SELECT 1")
3167+
val hadoopConf = spark.sessionState.newHadoopConf()
3168+
val tablePath = new Path(spark.sessionState.catalog
3169+
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3170+
3171+
val fs = tablePath.getFileSystem(hadoopConf)
3172+
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3173+
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3174+
sql("TRUNCATE TABLE tab1")
3175+
assert(!fs.exists(trashPath))
3176+
}
3177+
}
3178+
}
31043179
}
31053180

31063181
object FakeLocalFsFileSystem {

0 commit comments

Comments
 (0)