Skip to content

Commit 67f6a3f

Browse files
committed
[SPARK-55043][SQL] Fix time travel with subquery containing table references
### What changes were proposed in this pull request? This PR fixes an issue where `TIMESTAMP AS OF (subquery)` fails when the subquery references a table. Before this fix, queries like: ```sql SELECT * FROM t TIMESTAMP AS OF (SELECT MIN(ts) FROM t) ``` would fail with: ``` assertion failed: No plan for SubqueryAlias testcat.t ``` The fix changes `EvalSubqueriesForTimeTravel` to wrap the scalar subquery in a `Project` over `OneRowRelation` and execute it through the normal query execution path (`sessionState.executePlan`), which properly handles table references including V2 tables. ### Why are the changes needed? The `EvalSubqueriesForTimeTravel` analyzer rule was directly calling `QueryExecution.prepareExecutedPlan` on the subquery's inner plan, which failed to properly plan V2 table relations. ### Does this PR introduce _any_ user-facing change? Yes. Users can now use subqueries with table references in `TIMESTAMP AS OF` expressions. ### How was this patch tested? Added a new test case in `DataSourceV2SQLSuite` that verifies time travel with a subquery containing a table reference. ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes #53811 from cloud-fan/udf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 70a3ab5 commit 67f6a3f

File tree

2 files changed

+23
-24
lines changed

2 files changed

+23
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/EvalSubqueriesForTimeTravel.scala

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Literal, ScalarSubquery, SubqueryExpression}
21-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, ScalarSubquery, SubqueryExpression}
21+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreePattern.RELATION_TIME_TRAVEL
2424
import org.apache.spark.sql.classic.SparkSession
25-
import org.apache.spark.sql.execution.{QueryExecution, ScalarSubquery => ScalarSubqueryExec, SubqueryExec}
2625

2726
class EvalSubqueriesForTimeTravel extends Rule[LogicalPlan] {
2827
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
@@ -35,26 +34,15 @@ class EvalSubqueriesForTimeTravel extends Rule[LogicalPlan] {
3534
// outer references and should not be correlated.
3635
assert(!s.isCorrelated, "Correlated subquery should not appear in " +
3736
classOf[EvalSubqueriesForTimeTravel].getSimpleName)
38-
SimpleAnalyzer.checkSubqueryExpression(r, s)
39-
val executedPlan = QueryExecution.prepareExecutedPlan(SparkSession.active, s.plan)
40-
val physicalSubquery = ScalarSubqueryExec(
41-
SubqueryExec.createForScalarSubquery(
42-
s"scalar-subquery#${s.exprId.id}", executedPlan),
43-
s.exprId)
44-
evalSubqueries(physicalSubquery)
45-
Literal(physicalSubquery.eval(), s.dataType)
37+
// Wrap the scalar subquery in a Project over OneRowRelation to execute it
38+
// through the normal query execution path. This properly handles table
39+
// references in the subquery (e.g., V2 tables).
40+
val wrappedPlan = Project(Seq(Alias(s, "result")()), OneRowRelation())
41+
val spark = SparkSession.active
42+
val qe = spark.sessionState.executePlan(wrappedPlan)
43+
val result = qe.executedPlan.executeCollect().head.get(0, s.dataType)
44+
Literal(result, s.dataType)
4645
}
4746
r.copy(timestamp = Some(subqueryEvaluated))
4847
}
49-
50-
// Evaluate subqueries in a bottom-up way.
51-
private def evalSubqueries(subquery: ScalarSubqueryExec): Unit = {
52-
subquery.plan.foreachUp { plan =>
53-
plan.expressions.foreach(_.foreachUp {
54-
case s: ScalarSubqueryExec => evalSubqueries(s)
55-
case _ =>
56-
})
57-
}
58-
subquery.updateResult()
59-
}
6048
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3241,13 +3241,16 @@ class DataSourceV2SQLSuiteV1Filter
32413241
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
32423242
val ts1InSeconds = MICROSECONDS.toSeconds(ts1).toString
32433243
val ts2InSeconds = MICROSECONDS.toSeconds(ts2).toString
3244+
3245+
val t = "testcat.t"
32443246
val t3 = s"testcat.t$ts1"
32453247
val t4 = s"testcat.t$ts2"
3246-
3247-
withTable(t3, t4) {
3248+
withTable(t, t3, t4) {
3249+
sql(s"CREATE TABLE $t (ts STRING) USING foo")
32483250
sql(s"CREATE TABLE $t3 (id int) USING foo")
32493251
sql(s"CREATE TABLE $t4 (id int) USING foo")
32503252

3253+
sql(s"INSERT INTO $t VALUES ('2019-01-29 00:37:58')")
32513254
sql(s"INSERT INTO $t3 VALUES (5)")
32523255
sql(s"INSERT INTO $t3 VALUES (6)")
32533256
sql(s"INSERT INTO $t4 VALUES (7)")
@@ -3282,6 +3285,9 @@ class DataSourceV2SQLSuiteV1Filter
32823285
val res10 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))")
32833286
.collect()
32843287
assert(res10 === Array(Row(7), Row(8)))
3288+
// Subquery with table reference
3289+
val res11 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT MIN(ts) FROM t)").collect()
3290+
assert(res11 === Array(Row(5), Row(6)))
32853291

32863292
checkError(
32873293
exception = intercept[AnalysisException] {
@@ -3307,6 +3313,11 @@ class DataSourceV2SQLSuiteV1Filter
33073313
condition = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT",
33083314
parameters = Map("expr" -> "\"abc\""))
33093315

3316+
checkError(
3317+
exception = analysisException(s"SELECT * FROM $t TIMESTAMP AS OF NULL"),
3318+
condition = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT",
3319+
parameters = Map("expr" -> "\"NULL\""))
3320+
33103321
checkError(
33113322
exception = intercept[AnalysisException] {
33123323
spark.read.option("timestampAsOf", "abc").table("t").collect()

0 commit comments

Comments
 (0)