-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-27393][SQL] Show ReusedSubquery in the plan when the subquery is reused #24258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer | |
|
||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.{expressions, InternalRow} | ||
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} | ||
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression, ExprId, InSet, Literal, PlanExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
@@ -31,11 +31,16 @@ import org.apache.spark.sql.types.{BooleanType, DataType, StructType} | |
/** | ||
* The base class for subquery that is used in SparkPlan. | ||
*/ | ||
abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] { | ||
abstract class ExecSubqueryExpression extends PlanExpression[BaseSubqueryExec] { | ||
/** | ||
* Fill the expression with collected result from executed plan. | ||
*/ | ||
def updateResult(): Unit | ||
|
||
override def canonicalize(attrs: AttributeSeq): ExecSubqueryExpression = { | ||
withNewPlan(plan.canonicalized.asInstanceOf[BaseSubqueryExec]) | ||
.asInstanceOf[ExecSubqueryExpression] | ||
} | ||
} | ||
|
||
object ExecSubqueryExpression { | ||
|
@@ -56,15 +61,15 @@ object ExecSubqueryExpression { | |
* This is the physical copy of ScalarSubquery to be used inside SparkPlan. | ||
*/ | ||
case class ScalarSubquery( | ||
plan: SubqueryExec, | ||
plan: BaseSubqueryExec, | ||
exprId: ExprId) | ||
extends ExecSubqueryExpression { | ||
|
||
override def dataType: DataType = plan.schema.fields.head.dataType | ||
override def children: Seq[Expression] = Nil | ||
override def nullable: Boolean = true | ||
override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields) | ||
override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) | ||
override def withNewPlan(query: BaseSubqueryExec): ScalarSubquery = copy(plan = query) | ||
|
||
override def semanticEquals(other: Expression): Boolean = other match { | ||
case s: ScalarSubquery => plan.sameResult(s.plan) | ||
|
@@ -129,13 +134,14 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { | |
return plan | ||
} | ||
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. | ||
val subqueries = mutable.HashMap[StructType, ArrayBuffer[SubqueryExec]]() | ||
val subqueries = mutable.HashMap[StructType, ArrayBuffer[BaseSubqueryExec]]() | ||
plan transformAllExpressions { | ||
case sub: ExecSubqueryExpression => | ||
val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) | ||
val sameSchema = | ||
subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change it to BaseSubqueryExec |
||
val sameResult = sameSchema.find(_.sameResult(sub.plan)) | ||
if (sameResult.isDefined) { | ||
sub.withNewPlan(sameResult.get) | ||
sub.withNewPlan(ReusedSubqueryExec(sameResult.get)) | ||
} else { | ||
sameSchema += sub.plan | ||
sub | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean | |
import org.apache.spark.{AccumulatorSuite, SparkException} | ||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} | ||
import org.apache.spark.sql.catalyst.util.StringUtils | ||
import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec} | ||
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} | ||
import org.apache.spark.sql.execution.datasources.FilePartition | ||
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} | ||
|
@@ -113,33 +112,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
} | ||
} | ||
|
||
test("Reuse Subquery") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to SubquerySuite.scala |
||
Seq(true, false).foreach { reuse => | ||
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { | ||
val df = sql( | ||
""" | ||
|SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData) | ||
|FROM testData | ||
|LIMIT 1 | ||
""".stripMargin) | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
val subqueries = ArrayBuffer[SubqueryExec]() | ||
df.queryExecution.executedPlan.transformAllExpressions { | ||
case s @ ScalarSubquery(plan: SubqueryExec, _) => | ||
subqueries += plan | ||
s | ||
} | ||
|
||
if (reuse) { | ||
assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly") | ||
} else { | ||
assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing") | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("SPARK-6743: no columns from cache") { | ||
Seq( | ||
(83, 0, 38), | ||
|
@@ -288,7 +260,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
val df = sql(sqlText) | ||
// First, check if we have GeneratedAggregate. | ||
val hasGeneratedAgg = df.queryExecution.sparkPlan | ||
.collect { case _: aggregate.HashAggregateExec => true } | ||
.collect { case _: HashAggregateExec => true } | ||
.nonEmpty | ||
if (!hasGeneratedAgg) { | ||
fail( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need
name
in this base class for subquery exec?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both ReusedSubqueryExec and SubqueryExec have the name