Skip to content

[SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes #30300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

prakharjain09
Copy link
Contributor

@prakharjain09 prakharjain09 commented Nov 9, 2020

What changes were proposed in this pull request?

This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

 |SELECT t2id, t3.id as t3id
 |FROM (
 |    SELECT t1.id as t1id, t2.id as t2id
 |    FROM t1, t2
 |    WHERE t1.id = t2.id
 |) t12, t3
 |WHERE t1id = t3.id

The plan for this looks like:

  *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
  +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
     :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
     :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
     :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
     :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
     :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
     :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
     :           :     +- *(1) Range (0, 10, step=1, splits=2)
     :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
     :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
     :                 +- *(3) Range (0, 20, step=1, splits=2)
     +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
        +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
           +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

Why are the changes needed?

To remove unneeded exchanges.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

@github-actions github-actions bot added the SQL label Nov 9, 2020
@prakharjain09
Copy link
Contributor Author

cc - @cloud-fan @dongjoon-hyun @imback82

@imback82
Copy link
Contributor

imback82 commented Nov 9, 2020

cc @maropu as well.

@maropu
Copy link
Member

maropu commented Nov 10, 2020

You need to update the plan stability checks for TPCDS;

* To re-generate golden files for entire suite, run:
* {{{
* SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *PlanStability[WithStats]Suite"
* }}}

@maropu
Copy link
Member

maropu commented Nov 10, 2020

ok to test

@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35420/

@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35420/

@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Test build #130810 has finished for PR 30300 at commit dd6b841.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09 prakharjain09 changed the title [SPARK-33399][SQL] Normalize output partitioning of Project with respect to aliases [SPARK-33399][SQL] Normalize output partitioning with respect to aliases to avoid unneeded exchanges Nov 11, 2020
@prakharjain09
Copy link
Contributor Author

@maropu @viirya Thanks for the review. I have addressed majority of the review comments.

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35531/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35531/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Test build #130926 has finished for PR 30300 at commit adf3a66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09 prakharjain09 changed the title [SPARK-33399][SQL] Normalize output partitioning with respect to aliases to avoid unneeded exchanges [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes Nov 12, 2020
@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35596/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35596/

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement! LGTM

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Test build #130990 has finished for PR 30300 at commit d5a0fbe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35615/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35615/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Test build #131009 has finished for PR 30300 at commit f4fd12e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait HasMaxBlockSizeInMB extends Params
  • class HasMaxBlockSizeInMB(Params):
  • case class ElementAt(
  • case class GetArrayItem(
  • case class Elt(

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well.

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35646/

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Test build #131040 has finished for PR 30300 at commit c66874a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35646/

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35654/

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35654/

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Test build #131048 has finished for PR 30300 at commit c66874a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35739/

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Test build #131136 has finished for PR 30300 at commit 16e1db2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • >>> class VectorAccumulatorParam(AccumulatorParam):
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging
  • abstract class AbstractSqlParser extends ParserInterface with Logging
  • class CatalystSqlParser extends AbstractSqlParser
  • class SparkSqlParser extends AbstractSqlParser
  • class SparkSqlAstBuilder extends AstBuilder
  • class VariableSubstitution

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35739/

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35745/

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35745/

@SparkQA
Copy link

SparkQA commented Nov 16, 2020

Test build #131143 has finished for PR 30300 at commit 16e1db2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • >>> class VectorAccumulatorParam(AccumulatorParam):
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • fully qualified classname of key Writable class (e.g. \"org.apache.hadoop.io.Text\")
  • class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging
  • abstract class AbstractSqlParser extends ParserInterface with Logging
  • class CatalystSqlParser extends AbstractSqlParser
  • class SparkSqlParser extends AbstractSqlParser
  • class SparkSqlAstBuilder extends AstBuilder
  • class VariableSubstitution

@maropu
Copy link
Member

maropu commented Nov 17, 2020

Thanks! Merged to master.

@maropu maropu closed this in f5e3302 Nov 17, 2020
@prakharjain09
Copy link
Contributor Author

Thanks @cloud-fan @maropu @imback82 @viirya for the code reviews and providing suggestions.

@maropu
Copy link
Member

maropu commented Nov 18, 2020

NOTE: It seems this update makes TPCDS(sf=20) q95 much faster (176324ms->129644ms). Nice.
https://docs.google.com/spreadsheets/d/1V8xoKR9ElU-rOXMH84gb5BbLEw0XAPTJY8c8aZeIqus/edit?usp=sharing

--

val projects = planned.collect { case p: ProjectExec => p }
assert(projects.exists(_.outputPartitioning match {
case PartitioningCollection(Seq(HashPartitioning(Seq(k1: AttributeReference), _),
HashPartitioning(Seq(k2: AttributeReference), _))) if k1.name == "t1id" =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR: The ProjectExec only outputs t1id (after column pruning), and it's a bit redundant to return PartitioningCollection here, as t1id is the only output and other partitionings are just invalid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it looks interesting. Hi, @prakharjain09, are you interested in the improvement above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Sure. Basically the idea is to stop propagating partitionings and sortOrders corresponding to attributes which are not part of outputset?

Working on this as part of https://issues.apache.org/jira/browse/SPARK-33758.

wangyum pushed a commit that referenced this pull request May 26, 2023
…rtitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes (#1092)

* [SPARK-31078][SQL] Respect aliases in output ordering

Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
  val df = (0 until 20).toDF("i").as("df")
  df.repartition(8, df("i")).write.format("parquet")
    .bucketBy(8, "i").sortBy("i").saveAsTable("t")
  val t1 = spark.table("t")
  val t2 = t1.selectExpr("i as ii")
  t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0    <==== UNNECESSARY
   +- *(2) Project [i#8 AS ii#10]
      +- *(2) Filter isnotnull(i#8)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.

To better handle aliases in `outputOrdering`.

Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
   +- *(2) Filter isnotnull(i#8)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```

Tests added.

Closes #27842 from imback82/alias_aware_sort_order.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes

This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

     |SELECT t2id, t3.id as t3id
     |FROM (
     |    SELECT t1.id as t1id, t2.id as t2id
     |    FROM t1, t2
     |    WHERE t1.id = t2.id
     |) t12, t3
     |WHERE t1id = t3.id

The plan for this looks like:

      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
      +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
         :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
         :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
         :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
         :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
         :           :     +- *(1) Range (0, 10, step=1, splits=2)
         :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
         :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
         :                 +- *(3) Range (0, 20, step=1, splits=2)
         +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
               +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

To remove unneeded exchanges.

No

New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>

* [CARMEL-6306] Fix ut

* [CARMEL-6306] Fix alias not compatible with ebay skew implementation

Co-authored-by: Terry Kim <[email protected]>
Co-authored-by: Prakhar Jain <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants