Skip to content

[SPARK-4636] [SQL] Cluster By & Distribute By should follows the Hive behavior #3496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
// This sort only sorts partitions (No sorting will be performed within the partition).
// Its requiredDistribution will be an OrderedDistribution.
execution.SortPartitions(sortExprs, planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
case logical.Sort(sortExprs, global, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
* Performs a sort on-heap.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
* @param sortOrder The Sort Order will be applied within the partition, and also will be applied
* on partitions if `global` is set to true.
*/
@DeveloperApi
case class Sort(
Expand All @@ -213,6 +215,30 @@ case class Sort(
override def output = child.output
}

/**
* :: DeveloperApi ::
* Performs a partitions sorting only
* Compare to global sort, we will not do the sort within the partition.
* Compare to non global sort, we don't have overlap keys among partitions.
* @param sortOrder Sort Order will be applied on partitions, so we will not get the overlap keys
* among partitions.
*/
@DeveloperApi
case class SortPartitions(
sortOrder: Seq[SortOrder],
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution = OrderedDistribution(sortOrder) :: Nil

override def execute() = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
iterator.map(_.copy())
}, preservesPartitioning = true)
}

override def output = child.output
}

/**
* :: DeveloperApi ::
* Performs a sort, spilling to disk as needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"input12",
"input12_hadoop20",
"input14",
"input14_limit",
"input15",
"input19",
"input1_limit",
Expand All @@ -490,6 +491,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"input28",
"input2_limit",
"input3",
"input3_limit",
"input4",
"input40",
"input41",
Expand Down Expand Up @@ -625,6 +627,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"mapreduce8",
"merge1",
"merge2",
"merge3",
"mergejoins",
"multiMapJoin1",
"multiMapJoin2",
Expand Down
12 changes: 9 additions & 3 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,19 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case (None, Some(perPartitionOrdering), None, None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
case (None, None, Some(partitionExprs), None) =>
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
SortPartitions(
partitionExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
withHaving)
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
SortPartitions(
partitionExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
withHaving))
case (None, None, None, Some(clusterExprs)) =>
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
SortPartitions(
clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
withHaving))
case (None, None, None, None) => withHaving
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
0 val_0
0 val_0
0 val_0
10 val_10
11 val_11
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
0 val_0
0 val_0
0 val_0
0 val_1
0 val_1
1 val_2
10 val_10
10 val_11
100 val_100
100 val_100
100 val_101
100 val_101
101 val_102
102 val_103
103 val_103
103 val_103
104 val_104
104 val_104
104 val_105
104 val_105
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0
Loading