Skip to content

Commit 171001f

Browse files
committed
change default outputordering
1 parent 47455c9 commit 171001f

File tree

5 files changed

+23
-9
lines changed

5 files changed

+23
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ case class Aggregate(
6060

6161
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
6262

63+
override def outputOrdering: Seq[SortOrder] = Nil
64+
6365
/**
6466
* An aggregate that needs to be computed for each row in a group.
6567
*

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl
161161
def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan): SparkPlan =
162162
if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child
163163

164-
// Check if the partitioning we want to ensure is the same as the child's output
165-
// partitioning. If so, we do not need to add the Exchange operator.
164+
// Check if the ordering we want to ensure is the same as the child's output
165+
// ordering. If so, we do not need to add the Sort operator.
166166
def addSortIfNecessary(ordering: Seq[SortOrder], child: SparkPlan): SparkPlan =
167167
if (child.outputOrdering != ordering) Sort(ordering, global = false, child) else child
168168

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
183183
private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {
184184
self: Product =>
185185
override def outputPartitioning: Partitioning = child.outputPartitioning
186+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
186187
}
187188

188189
private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child:
7070
override def execute(): RDD[Row] = {
7171
child.execute().map(_.copy()).sample(withReplacement, fraction, seed)
7272
}
73+
74+
override def outputOrdering: Seq[SortOrder] = Nil
7375
}
7476

7577
/**
@@ -146,6 +148,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
146148
// TODO: Terminal split should be implemented differently from non-terminal split.
147149
// TODO: Pick num splits based on |limit|.
148150
override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
151+
152+
override def outputOrdering: Seq[SortOrder] = sortOrder
149153
}
150154

151155
/**
@@ -171,6 +175,8 @@ case class Sort(
171175
}
172176

173177
override def output: Seq[Attribute] = child.output
178+
179+
override def outputOrdering: Seq[SortOrder] = sortOrder
174180
}
175181

176182
/**
@@ -201,6 +207,8 @@ case class ExternalSort(
201207
}
202208

203209
override def output: Seq[Attribute] = child.output
210+
211+
override def outputOrdering: Seq[SortOrder] = sortOrder
204212
}
205213

206214
/**

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,16 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
9595
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
9696
classOf[BroadcastNestedLoopJoin])
9797
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
98-
conf.setConf("spark.sql.autoSortMergeJoin", "true")
99-
Seq(
100-
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
101-
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
102-
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
103-
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
104-
conf.setConf("spark.sql.autoSortMergeJoin", AUTO_SORTMERGEJOIN.toString)
98+
try {
99+
conf.setConf("spark.sql.autoSortMergeJoin", "true")
100+
Seq(
101+
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
102+
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
103+
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
104+
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
105+
} finally {
106+
conf.setConf("spark.sql.autoSortMergeJoin", AUTO_SORTMERGEJOIN.toString)
107+
}
105108
}
106109

107110
test("broadcasted hash join operator selection") {

0 commit comments

Comments
 (0)