Skip to content

Commit 1f4c8e4

Browse files
Kimahrimancloud-fan
authored andcommitted
[SPARK-40775][SQL] Fix duplicate description entries for V2 file scans
### What changes were proposed in this pull request? Remove overriding the description method in the V2 file sources. `FileScan` already uses all the metadata to create the description, so adding the same fields to the overridden description creates duplicates. ### Why are the changes needed? Example parquet scan from the agg pushdown suite: Before: ``` +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count(_1):bigint,count(_2)..., PushedFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedGroupBy: [] RuntimeFilters: [] ``` After: ``` +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count(_1):bigint,count(_2)... RuntimeFilters: [] ``` ### Does this PR introduce _any_ user-facing change? Just description change in explain output. ### How was this patch tested? Updated a few UTs to accommodate checking explain string. Closes #38229 from Kimahriman/remove-file-source-description. Authored-by: Adam Binford <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 1b2d700 commit 1f4c8e4

File tree

7 files changed

+9
-37
lines changed

7 files changed

+9
-37
lines changed

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,6 @@ case class AvroScan(
7070

7171
override def hashCode(): Int = super.hashCode()
7272

73-
override def description(): String = {
74-
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
75-
}
76-
7773
override def getMetaData(): Map[String, String] = {
7874
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
7975
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ case class CSVScan(
9191

9292
override def hashCode(): Int = super.hashCode()
9393

94-
override def description(): String = {
95-
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
96-
}
97-
9894
override def getMetaData(): Map[String, String] = {
9995
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
10096
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ case class JsonScan(
9191

9292
override def hashCode(): Int = super.hashCode()
9393

94-
override def description(): String = {
95-
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
94+
override def getMetaData(): Map[String, String] = {
95+
super.getMetaData() ++ Map("PushedFilters" -> pushedFilters.mkString("[", ", ", "]"))
9696
}
9797
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,6 @@ case class OrcScan(
9292
("[]", "[]")
9393
}
9494

95-
override def description(): String = {
96-
super.description() + ", PushedFilters: " + seqToString(pushedFilters) +
97-
", PushedAggregation: " + pushedAggregationsStr +
98-
", PushedGroupBy: " + pushedGroupByStr
99-
}
100-
10195
override def getMetaData(): Map[String, String] = {
10296
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) ++
10397
Map("PushedAggregation" -> pushedAggregationsStr) ++

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,6 @@ case class ParquetScan(
127127
("[]", "[]")
128128
}
129129

130-
override def description(): String = {
131-
super.description() + ", PushedFilters: " + seqToString(pushedFilters) +
132-
", PushedAggregation: " + pushedAggregationsStr +
133-
", PushedGroupBy: " + pushedGroupByStr
134-
}
135-
136130
override def getMetaData(): Map[String, String] = {
137131
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) ++
138132
Map("PushedAggregation" -> pushedAggregationsStr) ++

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -462,27 +462,18 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
462462
withTempDir { dir =>
463463
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
464464
val basePath = dir.getCanonicalPath + "/" + fmt
465-
val pushFilterMaps = Map (
466-
"parquet" ->
467-
"|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
468-
"orc" ->
469-
"|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
470-
"csv" ->
471-
"|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
472-
"json" ->
473-
"|remove_marker"
474-
)
475-
val expected_plan_fragment1 =
465+
466+
val expectedPlanFragment =
476467
s"""
477468
|\\(1\\) BatchScan $fmt file:$basePath
478469
|Output \\[2\\]: \\[value#x, id#x\\]
479470
|DataFilters: \\[isnotnull\\(value#x\\), \\(value#x > 2\\)\\]
480471
|Format: $fmt
481472
|Location: InMemoryFileIndex\\([0-9]+ paths\\)\\[.*\\]
482473
|PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\]
483-
${pushFilterMaps.get(fmt).get}
474+
|PushedFilters: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]
484475
|ReadSchema: struct\\<value:int\\>
485-
|""".stripMargin.replaceAll("\nremove_marker", "").trim
476+
|""".stripMargin.trim
486477

487478
spark.range(10)
488479
.select(col("id"), col("id").as("value"))
@@ -500,7 +491,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
500491
.format(fmt)
501492
.load(basePath).where($"id" > 1 && $"value" > 2)
502493
val normalizedOutput = getNormalizedExplain(df, FormattedMode)
503-
assert(expected_plan_fragment1.r.findAllMatchIn(normalizedOutput).length == 1)
494+
assert(expectedPlanFragment.r.findAllMatchIn(normalizedOutput).length == 1)
504495
}
505496
}
506497
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,8 @@ trait FileSourceAggregatePushDownSuite
347347
spark.read.format(format).load(file.getCanonicalPath).createOrReplaceTempView("test")
348348
Seq("false", "true").foreach { enableVectorizedReader =>
349349
withSQLConf(aggPushDownEnabledKey -> "true",
350-
vectorizedReaderEnabledKey -> enableVectorizedReader) {
350+
vectorizedReaderEnabledKey -> enableVectorizedReader,
351+
SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") {
351352

352353
val testMinWithAllTypes = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
353354
"min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +

0 commit comments

Comments
 (0)