Skip to content

Commit d0a000e

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6324] Support bucket skew detection (#1130)
1 parent 9f3b826 commit d0a000e

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.TableParallelInfo
3838
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
3939
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
4040
import org.apache.spark.sql.catalyst.util.truncatedString
41+
import org.apache.spark.sql.execution.adaptive.SkewHandlingUtil
4142
import org.apache.spark.sql.execution.datasources._
4243
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
4344
import org.apache.spark.sql.execution.index.IndexMetaLookup
@@ -421,6 +422,18 @@ case class FileSourceScanExec(
421422
tableParallelInfo.get.partitionNumber.isEmpty))
422423
}
423424

425+
lazy val isBucketSkew = {
426+
bucketedScan && {
427+
val files = selectedPartitions.flatMap(partition => partition.files)
428+
val bucketFilesGroupingSize = files.map(file => (file.getPath.getName, file.getLen))
429+
.groupBy(file => BucketingUtils.getBucketId(file._1))
430+
.map(bucket => bucket._2.map(_._2).sum).toArray
431+
432+
SkewHandlingUtil.isSkewed(bucketFilesGroupingSize.max,
433+
Utils.median(bucketFilesGroupingSize, false), conf)
434+
}
435+
}
436+
424437
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
425438
if (bucketedScan) {
426439
// For bucketed columns:
@@ -522,7 +535,11 @@ case class FileSourceScanExec(
522535
val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
523536
val bucketedKey = "Bucketed"
524537
val withBucketedScanStatus = if (bucketedScan) {
525-
metadata + (bucketedKey -> "true")
538+
if (isBucketSkew) {
539+
metadata + (bucketedKey -> s"true skewed")
540+
} else {
541+
metadata + (bucketedKey -> s"true")
542+
}
526543
} else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
527544
metadata + (bucketedKey -> "false (disabled by configuration)")
528545
} else if (disableBucketedScan) {

0 commit comments

Comments
 (0)