-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-32216][SQL] Remove redundant ProjectExec #29031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -2935,6 +2944,8 @@ class SQLConf extends Serializable with Logging { | |||
|
|||
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED) | |||
|
|||
def removeRedundantProjectsEnabled: Boolean = getConf(REMOVE_REDUNDANT_PROJECTS_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since this conf is used only once, we can remove this variable.
sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
Show resolved
Hide resolved
.internal() | ||
.doc("Whether to remove redundant project exec node based on children's output and " + | ||
"ordering requirement.") | ||
.version("3.0.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> 3.1.0
ok to test |
cc @cloud-fan |
Test build #125436 has finished for PR 29031 at commit
|
retest this please |
val keepOrdering = a.aggregateExpressions | ||
.exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge)) | ||
a.mapChildren(removeProject(_, keepOrdering)) | ||
case w: WindowExec => w.mapChildren(removeProject(_, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WindowExec.output
is implemented as child.output ++ windowExpression.map(_.toAttribute)
. I think we require the ordering for window children.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting require ordering to be true, I am wondering should WindowExec
inherit this ordering requirement from its parent? For example in this case
Project[a, avg, key]
WindowExec[avg] [key] [a]
WindowExec
actually does't require column to be ordered. Is there any scenario where WindowExec
must require child output column to be ordered? I am having trouble coming up with a test case for it.
spark.range(100).selectExpr("id % 10 as key", "id * 2 as a", | ||
"id * 3 as b", "cast(id as string) as c", "array(id, id + 1, id + 3) as d") | ||
.write.partitionBy("key").parquet(path) | ||
spark.read.parquet(path).createOrReplaceTempView("testView") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we put the view creation in beforeAll
? Then we only need to do it once for the entire test suite.
sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
Show resolved
Hide resolved
Test build #125453 has finished for PR 29031 at commit
|
case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false | ||
case _ => | ||
if (requireOrdering) { | ||
project.output.map(_.exprId.id) == child.output.map(_.exprId.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I am wondering if the qualifier
in Attribute should be considered here as well (besides exprId
). Would an attribute qualifier in a ProjectExec
be different from its child?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. AttributeReferece.sameRef
doesn't consider qualifier as well.
Test build #125546 has finished for PR 29031 at commit
|
} | ||
} | ||
|
||
private def assertProjectExec(query: String, enabled: Integer, disabled: Integer): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integer
-> Int
?
|
||
class RemoveRedundantProjectsSuite extends QueryTest with SharedSparkSession with SQLTestUtils { | ||
|
||
private def assertProjectExecCount(df: DataFrame, expected: Integer): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
test("subquery") { | ||
testData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more clear to create a new view here for testing.
Test build #127019 has finished for PR 29031 at commit
|
retest this please |
1 similar comment
retest this please |
add to whitelist |
Test build #127077 has finished for PR 29031 at commit
|
seems this breaks the DPP test, @allisonwang-db please take a look. |
Test build #127150 has finished for PR 29031 at commit
|
abca971
to
1632028
Compare
Test build #127155 has finished for PR 29031 at commit
|
retest this please |
Test build #127159 has finished for PR 29031 at commit
|
Test build #127163 has finished for PR 29031 at commit
|
Test build #127212 has finished for PR 29031 at commit
|
} else { | ||
project.output.map(_.exprId.id).sorted == child.output.map(_.exprId.id).sorted | ||
project.output.map(_.exprId.id).sorted == child.output.map(_.exprId.id).sorted && | ||
checkNullability(project.output, child.output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be
val orderedProjectOutput = project.output.sortBy(_.exprId.id)
val orderedChildOutput = child.output.sortBy(_.exprId.id)
orderedProjectOutput.map(_.expr.id) == orderedChildOutput.map(_.exprId.id) &&
checkNullability(orderedProjectOutput, orderedChildOutput)
can you rebase/merge with the master branch to fix conflicts? |
8495025
to
394126d
Compare
Test build #127289 has finished for PR 29031 at commit
|
Test build #127290 has finished for PR 29031 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
This PR added a physical rule to remove redundant project nodes. A
ProjectExec
is redundant whenFor example:
After Filter:
The
Project a#14L, b#15L, c#16, key#17
is redundant because its output is exactly the same as filter's output.Before Aggregate:
The
Project key#17, a#14L, b#15L
is redundant because hash aggregate doesn't require child plan's output to be in a specific order.Why are the changes needed?
It removes unnecessary query nodes and makes query plan cleaner.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests