Skip to content

Commit 6d6a1e6

Browse files
committed
Centralize logic for picking sort operator implementations
1 parent 9869ec2 commit 6d6a1e6

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
302302
}
303303

304304
val withSort = if (needSort) {
305-
if (sqlContext.conf.unsafeEnabled
306-
&& UnsafeRowConverter.supportsSchema(withShuffle.schema)) {
307-
UnsafeExternalSort(rowOrdering, global = false, withShuffle)
308-
} else if (sqlContext.conf.externalSortEnabled) {
309-
ExternalSort(rowOrdering, global = false, withShuffle)
310-
} else {
311-
Sort(rowOrdering, global = false, withShuffle)
312-
}
305+
sqlContext.planner.BasicOperators.getSortOperator(
306+
rowOrdering, global = false, withShuffle)
313307
} else {
314308
withShuffle
315309
}
@@ -337,11 +331,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
337331
case (UnspecifiedDistribution, Seq(), child) =>
338332
child
339333
case (UnspecifiedDistribution, rowOrdering, child) =>
340-
if (sqlContext.conf.externalSortEnabled) {
341-
ExternalSort(rowOrdering, global = false, child)
342-
} else {
343-
Sort(rowOrdering, global = false, child)
344-
}
334+
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
345335

346336
case (dist, ordering, _) =>
347337
sys.error(s"Don't know how to ensure $dist with ordering $ordering")

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
291291
object BasicOperators extends Strategy {
292292
def numPartitions: Int = self.numPartitions
293293

294+
/**
295+
* Picks an appropriate sort operator.
296+
*
297+
* @param global when true performs a global sort of all partitions by shuffling the data first
298+
* if necessary.
299+
*/
300+
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
301+
if (sqlContext.conf.unsafeEnabled && UnsafeRowConverter.supportsSchema(child.schema)) {
302+
execution.UnsafeExternalSort(sortExprs, global, child)
303+
} else if (sqlContext.conf.externalSortEnabled) {
304+
execution.ExternalSort(sortExprs, global, child)
305+
} else {
306+
execution.Sort(sortExprs, global, child)
307+
}
308+
}
309+
294310
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
295311
case r: RunnableCommand => ExecutedCommand(r) :: Nil
296312

@@ -302,11 +318,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
302318
case logical.SortPartitions(sortExprs, child) =>
303319
// This sort only sorts tuples within a partition. Its requiredDistribution will be
304320
// an UnspecifiedDistribution.
305-
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
306-
case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
307-
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
321+
getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
308322
case logical.Sort(sortExprs, global, child) =>
309-
execution.Sort(sortExprs, global, planLater(child)):: Nil
323+
getSortOperator(sortExprs, global, planLater(child)):: Nil
310324
case logical.Project(projectList, child) =>
311325
execution.Project(projectList, planLater(child)) :: Nil
312326
case logical.Filter(condition, child) =>

0 commit comments

Comments
 (0)