Skip to content

Commit 601fac2

Browse files
francis0407cloud-fan
authored andcommitted
[SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
## What changes were proposed in this pull request? In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake after normalizing filters. We have a sql with a scalar subquery: ``` scala val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from t1)") plan.explain(true) ``` And we get the log info of DataSourceV2Strategy: ``` Pushing operators to csv:examples/src/main/resources/t2.txt Pushed Filters: Post-Scan Filters: isnotnull(t2a#30) Output: t2a#30, t2b#31 ``` The `Post-Scan Filters` should contain the scalar subquery, but we eliminate it by mistake. ``` == Parsed Logical Plan == 'Project [*] +- 'Filter ('t2a > scalar-subquery#56 []) : +- 'Project [unresolvedalias('max('t1a), None)] : +- 'UnresolvedRelation `t1` +- 'UnresolvedRelation `t2` == Analyzed Logical Plan == t2a: string, t2b: string Project [t2a#30, t2b#31] +- Filter (t2a#30 > scalar-subquery#56 []) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- SubqueryAlias `t1` : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- SubqueryAlias `t2` +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Optimized Logical Plan == Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 [])) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- Project [t1a#13] : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Physical Plan == *(1) Project [t2a#30, t2b#31] +- *(1) Filter isnotnull(t2a#30) +- *(1) BatchScan[t2a#30, t2b#31] class org.apache.spark.sql.execution.datasources.v2.csv.CSVScan ``` ## How was this patch tested? ut Closes #24321 from francis0407/SPARK-27411. Authored-by: francis0407 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 3e4cfe9 commit 601fac2

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
106106
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
107107
val scanBuilder = relation.newScanBuilder()
108108

109+
val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
109110
val normalizedFilters = DataSourceStrategy.normalizeFilters(
110-
filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
111+
withoutSubquery, relation.output)
111112

112113
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
113114
// `postScanFilters` need to be evaluated after the scan.
114115
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
115-
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
116+
val (pushedFilters, postScanFiltersWithoutSubquery) =
117+
pushFilters(scanBuilder, normalizedFilters)
118+
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
116119
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
117120
logInfo(
118121
s"""

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,19 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
392392
}
393393
}
394394
}
395+
396+
test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
397+
withTempView("t1") {
398+
val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
399+
Seq(2, 3).toDF("a").createTempView("t1")
400+
val df = t2.where("i < (select max(a) from t1)").select('i)
401+
val subqueries = df.queryExecution.executedPlan.collect {
402+
case p => p.subqueries
403+
}.flatten
404+
assert(subqueries.length == 1)
405+
checkAnswer(df, (0 until 3).map(i => Row(i)))
406+
}
407+
}
395408
}
396409

397410

0 commit comments

Comments
 (0)