Skip to content

Commit 5346dcf

Browse files
francis0407mccheah
authored andcommitted
[SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake after normalizing filters. We have a sql with a scalar subquery: ``` scala val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from t1)") plan.explain(true) ``` And we get the log info of DataSourceV2Strategy: ``` Pushing operators to csv:examples/src/main/resources/t2.txt Pushed Filters: Post-Scan Filters: isnotnull(t2a#30) Output: t2a#30, t2b#31 ``` The `Post-Scan Filters` should contain the scalar subquery, but we eliminate it by mistake. ``` == Parsed Logical Plan == 'Project [*] +- 'Filter ('t2a > scalar-subquery#56 []) : +- 'Project [unresolvedalias('max('t1a), None)] : +- 'UnresolvedRelation `t1` +- 'UnresolvedRelation `t2` == Analyzed Logical Plan == t2a: string, t2b: string Project [t2a#30, t2b#31] +- Filter (t2a#30 > scalar-subquery#56 []) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- SubqueryAlias `t1` : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- SubqueryAlias `t2` +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Optimized Logical Plan == Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 [])) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- Project [t1a#13] : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Physical Plan == *(1) Project [t2a#30, t2b#31] +- *(1) Filter isnotnull(t2a#30) +- *(1) BatchScan[t2a#30, t2b#31] class org.apache.spark.sql.execution.datasources.v2.csv.CSVScan ``` ut Closes apache#24321 from francis0407/SPARK-27411. Authored-by: francis0407 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2cd8078 commit 5346dcf

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,23 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
108108
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
109109
val scanBuilder = relation.newScanBuilder()
110110

111+
<<<<<<< HEAD
111112
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)
113+
||||||| parent of 601fac2cb3... [SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
114+
val normalizedFilters = DataSourceStrategy.normalizeFilters(
115+
filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
116+
=======
117+
val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
118+
val normalizedFilters = DataSourceStrategy.normalizeFilters(
119+
withoutSubquery, relation.output)
120+
>>>>>>> 601fac2cb3... [SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
112121

113122
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
114123
// `postScanFilters` need to be evaluated after the scan.
115124
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
116-
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
125+
val (pushedFilters, postScanFiltersWithoutSubquery) =
126+
pushFilters(scanBuilder, normalizedFilters)
127+
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
117128
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
118129
logInfo(
119130
s"""

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,19 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
387387
checkAnswer(df, (0 until 3).map(i => Row(i)))
388388
}
389389
}
390+
391+
test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
392+
withTempView("t1") {
393+
val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
394+
Seq(2, 3).toDF("a").createTempView("t1")
395+
val df = t2.where("i < (select max(a) from t1)").select('i)
396+
val subqueries = df.queryExecution.executedPlan.collect {
397+
case p => p.subqueries
398+
}.flatten
399+
assert(subqueries.length == 1)
400+
checkAnswer(df, (0 until 3).map(i => Row(i)))
401+
}
402+
}
390403
}
391404

392405

0 commit comments

Comments
 (0)