Skip to content

Commit bf0e7dc

Browse files
committed
Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest.
1 parent 6d1c642 commit bf0e7dc

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,16 @@ object Unions {
168168
case other => other :: Nil
169169
}
170170
}
171+
172+
/**
173+
* A pattern that matches (some) sorted operations and returns corresponding sorting orders.
174+
* Currently operations matched by this pattern are guaranteed to be sorted, but not all sorted
175+
* operations are matched by this pattern.
176+
*/
177+
object SortedOperation {
178+
// TODO (lian) detect more sorted operations
179+
def unapply(plan: LogicalPlan): Option[Seq[SortOrder]] = plan match {
180+
case FilteredOperation(_, Sort(order, _)) => Some(order)
181+
case _ => None
182+
}
183+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ case class Aggregate(
116116
*/
117117
@transient
118118
private[this] lazy val resultMap =
119-
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap
119+
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
120120

121121
/**
122122
* Substituted version of aggregateExpressions expressions which are used to compute final

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution
1919

2020
import java.io._
2121

22+
import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
23+
2224
import org.apache.spark.sql.Logging
25+
import org.apache.spark.sql.catalyst.planning.SortedOperation
2326
import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand}
2427
import org.apache.spark.sql.catalyst.util._
25-
import org.apache.spark.sql.execution.Sort
26-
import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
2728
import org.apache.spark.sql.hive.test.TestHive
2829

2930
/**
@@ -131,14 +132,8 @@ abstract class HiveComparisonTest
131132
val orderedAnswer = hiveQuery.logical match {
132133
// Clean out non-deterministic time schema info.
133134
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
134-
case _: ExplainCommand => answer
135-
case _ =>
136-
// TODO: Really we only care about the final total ordering here...
137-
val isOrdered = hiveQuery.executedPlan.collect {
138-
case s @ Sort(_, global, _) if global => s
139-
}.nonEmpty
140-
// If the query results aren't sorted, then sort them to ensure deterministic answers.
141-
if (!isOrdered) answer.sorted else answer
135+
case _: ExplainCommand | SortedOperation(_) => answer
136+
case _ => answer.sorted
142137
}
143138
orderedAnswer.map(cleanPaths)
144139
}
@@ -161,7 +156,7 @@ abstract class HiveComparisonTest
161156
"minFileSize"
162157
)
163158
protected def nonDeterministicLine(line: String) =
164-
nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_)
159+
nonDeterministicLineIndicators.exists(line contains _)
165160

166161
/**
167162
* Removes non-deterministic paths from `str` so cached answers will compare correctly.

0 commit comments

Comments
 (0)