Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 63 additions & 41 deletions spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import scala.concurrent.duration._

// scalastyle:off import.ordering.noEmptyLine
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord}
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.coordinatedcommits.CatalogOwnedTestBaseSuite
import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
Expand All @@ -50,7 +51,8 @@ class CheckpointsSuite
with SharedSparkSession
with DeltaCheckpointTestUtils
with DeltaSQLCommandTest
with CoordinatedCommitsBaseSuite {
with DeltaSQLTestUtils
with CatalogOwnedTestBaseSuite {

def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = {
for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) {
Expand All @@ -65,6 +67,15 @@ class CheckpointsSuite
}
}

def testWithV1Checkpoints(testName: String)(f: => Unit): Unit = {
test(s"$testName [Checkpoint V1]") {
withSQLConf(
DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name) {
f
}
}
}

/** Get V2 [[CheckpointProvider]] from the underlying deltalog snapshot */
def getV2CheckpointProvider(
deltaLog: DeltaLog,
Expand All @@ -91,11 +102,12 @@ class CheckpointsSuite
super.sparkConf.set("spark.delta.logStore.gs.impl", classOf[LocalLogStore].getName)
}

test("checkpoint metadata - checkpoint schema above the configured threshold are not" +
" written to LAST_CHECKPOINT") {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
testWithV1Checkpoints(
"checkpoint metadata - checkpoint schema above the configured threshold are not" +
" written to LAST_CHECKPOINT") {
withTempTable(createTable = false) { tableName =>
spark.range(10).write.format("delta").saveAsTable(tableName)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))
deltaLog.checkpoint()
val lastCheckpointOpt = deltaLog.readLastCheckpointFile()
assert(lastCheckpointOpt.nonEmpty)
Expand All @@ -105,8 +117,8 @@ class CheckpointsSuite
assert(lastCheckpointOpt.get.checkpointSchema.get.fieldNames.toSeq ===
expectedCheckpointSchema)

spark.range(10).write.mode("append").format("delta").save(tempDir.getAbsolutePath)
withSQLConf(DeltaSQLConf.CHECKPOINT_SCHEMA_WRITE_THRESHOLD_LENGTH.key-> "10") {
spark.range(10).write.mode("append").format("delta").saveAsTable(tableName)
withSQLConf(DeltaSQLConf.CHECKPOINT_SCHEMA_WRITE_THRESHOLD_LENGTH.key -> "10") {
deltaLog.checkpoint()
val lastCheckpointOpt = deltaLog.readLastCheckpointFile()
assert(lastCheckpointOpt.nonEmpty)
Expand Down Expand Up @@ -262,19 +274,18 @@ class CheckpointsSuite
}
}

test("multipart checkpoints") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
testWithV1Checkpoints("multipart checkpoints") {
withTempTable(createTable = false) { tableName =>

withSQLConf(
DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "10",
DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "1") {
// 1 file actions
spark.range(1).repartition(1).write.format("delta").save(path)
val deltaLog = DeltaLog.forTable(spark, path)
spark.range(1).repartition(1).write.format("delta").saveAsTable(tableName)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))

// 2 file actions, 1 new file
spark.range(1).repartition(1).write.format("delta").mode("append").save(path)
spark.range(1).repartition(1).write.format("delta").mode("append").saveAsTable(tableName)

verifyCheckpoint(deltaLog.readLastCheckpointFile(), 1, None)

Expand All @@ -283,15 +294,15 @@ class CheckpointsSuite
assert(new File(checkpointPath).exists())

// 11 total file actions, 9 new files
spark.range(30).repartition(9).write.format("delta").mode("append").save(path)
spark.range(30).repartition(9).write.format("delta").mode("append").saveAsTable(tableName)
verifyCheckpoint(deltaLog.readLastCheckpointFile(), 2, Some(2))

var checkpointPaths =
FileNames.checkpointFileWithParts(deltaLog.logPath, deltaLog.snapshot.version, 2)
checkpointPaths.foreach(p => assert(new File(p.toUri).exists()))

// 20 total actions, 9 new files
spark.range(100).repartition(9).write.format("delta").mode("append").save(path)
spark.range(100).repartition(9).write.format("delta").mode("append").saveAsTable(tableName)
verifyCheckpoint(deltaLog.readLastCheckpointFile(), 3, Some(2))

assert(deltaLog.snapshot.version == 3)
Expand All @@ -300,7 +311,7 @@ class CheckpointsSuite
checkpointPaths.foreach(p => assert(new File(p.toUri).exists()))

// 31 total actions, 11 new files
spark.range(100).repartition(11).write.format("delta").mode("append").save(path)
spark.range(100).repartition(11).write.format("delta").mode("append").saveAsTable(tableName)
verifyCheckpoint(deltaLog.readLastCheckpointFile(), 4, Some(4))

assert(deltaLog.snapshot.version == 4)
Expand All @@ -311,16 +322,17 @@ class CheckpointsSuite

// Increase max actions
withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "100") {
val deltaLog = DeltaLog.forTable(spark, path)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))
// 100 total actions, 69 new files
spark.range(1000).repartition(69).write.format("delta").mode("append").save(path)
spark.range(1000)
.repartition(69).write.format("delta").mode("append").saveAsTable(tableName)
verifyCheckpoint(deltaLog.readLastCheckpointFile(), 5, None)
val checkpointPath =
FileNames.checkpointFileSingular(deltaLog.logPath, deltaLog.snapshot.version).toUri
assert(new File(checkpointPath).exists())

// 101 total actions, 1 new file
spark.range(1).repartition(1).write.format("delta").mode("append").save(path)
spark.range(1).repartition(1).write.format("delta").mode("append").saveAsTable(tableName)
verifyCheckpoint(deltaLog.readLastCheckpointFile(), 6, Some(2))
var checkpointPaths =
FileNames.checkpointFileWithParts(deltaLog.logPath, deltaLog.snapshot.version, 2)
Expand Down Expand Up @@ -415,9 +427,11 @@ class CheckpointsSuite
assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1)
val checkpointSchema = spark.read.format("parquet").load(checkpointFile.toString).schema
val expectedCheckpointSchema =
val checkpointPathStr = DeltaLog.forTableWithSnapshot(spark, tempDir.getAbsolutePath)._2
.checkpointProvider.topLevelFiles.head.getPath.toString
val checkpointFormat = checkpointPathStr.substring(checkpointPathStr.lastIndexOf('.') + 1)
val checkpointSchema = spark.read.format(checkpointFormat).load(checkpointPathStr).schema
var expectedCheckpointSchema =
Seq(
"txn",
"add",
Expand Down Expand Up @@ -503,7 +517,7 @@ class CheckpointsSuite
}
}

test("checkpoint does not contain remove.tags and remove.numRecords") {
testWithV1Checkpoints("checkpoint does not contain remove.tags and remove.numRecords") {
withTempDir { tempDir =>
val expectedRemoveFileSchema = Seq(
"path",
Expand Down Expand Up @@ -707,12 +721,10 @@ class CheckpointsSuite
DeltaSQLConf.LAST_CHECKPOINT_SIDECARS_THRESHOLD.key -> s"$sidecarActionThreshold"
) {
val addFiles = (1 to adds).map(_ =>
AddFile(
path = java.util.UUID.randomUUID.toString,
createTestAddFile(
encodedPath = java.util.UUID.randomUUID.toString,
partitionValues = Map(),
size = 128L,
modificationTime = 1L,
dataChange = true
size = 128L
))
deltaLog.startTransaction().commit(addFiles, DeltaOperations.ManualUpdate)
deltaLog.checkpoint()
Expand All @@ -726,16 +738,24 @@ class CheckpointsSuite
val lc1 = writeCheckpoint(adds = 1, nonFileActionThreshold = 10, sidecarActionThreshold = 10)
assert(lc1.v2Checkpoint.nonEmpty)
// 3 non file actions - protocol/metadata/checkpointMetadata, 1 sidecar
assert(lc1.v2Checkpoint.get.nonFileActions.get.size === 3)
assert(
lc1.v2Checkpoint.get.nonFileActions.get.size === 3
)
assert(lc1.v2Checkpoint.get.sidecarFiles.get.size === 1)

// Append 1 SetTxn, 8 more AddFiles [SetTxn-1, AddFile-10]
deltaLog.startTransaction()
.commit(Seq(SetTransaction("app-1", 2, None)), DeltaOperations.ManualUpdate)
val lc2 = writeCheckpoint(adds = 8, nonFileActionThreshold = 4, sidecarActionThreshold = 10)
val lc2 = writeCheckpoint(
adds = 8,
sidecarActionThreshold = 10,
nonFileActionThreshold = 4
)
assert(lc2.v2Checkpoint.nonEmpty)
// 4 non file actions - protocol/metadata/checkpointMetadata/setTxn, 1 sidecar
assert(lc2.v2Checkpoint.get.nonFileActions.get.size === 4)
assert(
lc2.v2Checkpoint.get.nonFileActions.get.size === 4
)
assert(lc2.v2Checkpoint.get.sidecarFiles.get.size === 1)

// Append 10 more AddFiles [SetTxn-1, AddFile-20]
Expand Down Expand Up @@ -763,7 +783,9 @@ class CheckpointsSuite
assert(lc5.v2Checkpoint.nonEmpty)
// 4 non file actions - protocol/metadata/checkpointMetadata/setTxn
// total 30 file actions, across 15 sidecar files (2 actions per file)
assert(lc5.v2Checkpoint.get.nonFileActions.get.size === 4)
assert(
lc5.v2Checkpoint.get.nonFileActions.get.size === 4
)
assert(lc5.v2Checkpoint.get.sidecarFiles.isEmpty)
}
}
Expand Down Expand Up @@ -1106,15 +1128,15 @@ class FakeGCSFileSystemValidatingCommits extends FakeGCSFileSystemValidatingChec
override protected def shouldValidateFilePattern(f: Path): Boolean = f.getName.contains(".json")
}

class CheckpointsWithCoordinatedCommitsBatch1Suite extends CheckpointsSuite {
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
class CheckpointsWithCatalogOwnedBatch1Suite extends CheckpointsSuite {
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(1)
}

class CheckpointsWithCoordinatedCommitsBatch2Suite extends CheckpointsSuite {
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(2)
class CheckpointsWithCatalogOwnedBatch2Suite extends CheckpointsSuite {
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(2)
}

class CheckpointsWithCoordinatedCommitsBatch100Suite extends CheckpointsSuite {
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
class CheckpointsWithCatalogOwnedBatch100Suite extends CheckpointsSuite {
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(100)
}

Loading