Skip to content

Commit 231af3a

Browse files
committed
Limit/TakeOrdered:
1. Renamed StopAfter to Limit to be more consistent with naming in other relational databases. 2. Renamed TopK to TakeOrdered to be more consistent with Spark RDD API. 3. Avoid breaking lineage in Limit. 4. Added a bunch of override's to execution/basicOperators.scala.
1 parent 8237df8 commit 231af3a

File tree

7 files changed

+44
-31
lines changed

7 files changed

+44
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class SqlParser extends StandardTokenParsers {
178178
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
179179
val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
180180
val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
181-
val withLimit = l.map { l => StopAfter(l, withOrder) }.getOrElse(withOrder)
181+
val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
182182
withLimit
183183
}
184184

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ case class Aggregate(
132132
def references = child.references
133133
}
134134

135-
case class StopAfter(limit: Expression, child: LogicalPlan) extends UnaryNode {
135+
case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {
136136
def output = child.output
137137
def references = limit.references
138138
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
115115
val sparkContext = self.sparkContext
116116

117117
val strategies: Seq[Strategy] =
118-
TopK ::
118+
TakeOrdered ::
119119
PartialAggregation ::
120120
SparkEquiInnerJoin ::
121121
ParquetOperations ::

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
158158
case other => other
159159
}
160160

161-
object TopK extends Strategy {
161+
object TakeOrdered extends Strategy {
162162
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
163-
case logical.StopAfter(IntegerLiteral(limit), logical.Sort(order, child)) =>
164-
execution.TopK(limit, order, planLater(child))(sparkContext) :: Nil
163+
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
164+
execution.TakeOrdered(limit, order, planLater(child))(sparkContext) :: Nil
165165
case _ => Nil
166166
}
167167
}
@@ -213,8 +213,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
213213
sparkContext.parallelize(data.map(r =>
214214
new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))
215215
execution.ExistingRdd(output, dataAsRdd) :: Nil
216-
case logical.StopAfter(IntegerLiteral(limit), child) =>
217-
execution.StopAfter(limit, planLater(child))(sparkContext) :: Nil
216+
case logical.Limit(IntegerLiteral(limit), child) =>
217+
execution.Limit(limit, planLater(child))(sparkContext) :: Nil
218218
case Unions(unionChildren) =>
219219
execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil
220220
case logical.Generate(generator, join, outer, _, child) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,56 +29,69 @@ import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, Unspec
2929
import org.apache.spark.sql.catalyst.ScalaReflection
3030

3131
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
32-
def output = projectList.map(_.toAttribute)
32+
override def output = projectList.map(_.toAttribute)
3333

34-
def execute() = child.execute().mapPartitions { iter =>
34+
override def execute() = child.execute().mapPartitions { iter =>
3535
@transient val reusableProjection = new MutableProjection(projectList)
3636
iter.map(reusableProjection)
3737
}
3838
}
3939

4040
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
41-
def output = child.output
41+
override def output = child.output
4242

43-
def execute() = child.execute().mapPartitions { iter =>
43+
override def execute() = child.execute().mapPartitions { iter =>
4444
iter.filter(condition.apply(_).asInstanceOf[Boolean])
4545
}
4646
}
4747

4848
case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan)
4949
extends UnaryNode {
5050

51-
def output = child.output
51+
override def output = child.output
5252

5353
// TODO: How to pick seed?
54-
def execute() = child.execute().sample(withReplacement, fraction, seed)
54+
override def execute() = child.execute().sample(withReplacement, fraction, seed)
5555
}
5656

5757
case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan {
5858
// TODO: attributes output by union should be distinct for nullability purposes
59-
def output = children.head.output
60-
def execute() = sc.union(children.map(_.execute()))
59+
override def output = children.head.output
60+
override def execute() = sc.union(children.map(_.execute()))
6161

6262
override def otherCopyArgs = sc :: Nil
6363
}
6464

65-
case class StopAfter(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
65+
/**
66+
* Take the first limit elements.
67+
*/
68+
case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
6669
override def otherCopyArgs = sc :: Nil
70+
// Note that the implementation is different depending on
71+
// whether this is a terminal operator or not.
6772

68-
def output = child.output
73+
override def output = child.output
6974

7075
override def executeCollect() = child.execute().map(_.copy()).take(limit)
7176

72-
// TODO: Terminal split should be implemented differently from non-terminal split.
73-
// TODO: Pick num splits based on |limit|.
74-
def execute() = sc.makeRDD(executeCollect(), 1)
77+
override def execute() = {
78+
child.execute()
79+
.mapPartitions(_.take(limit))
80+
.coalesce(1, shuffle = true)
81+
.mapPartitions(_.take(limit))
82+
}
7583
}
7684

77-
case class TopK(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
78-
(@transient sc: SparkContext) extends UnaryNode {
85+
/**
86+
* Take the first limit elements as defined by the sortOrder. This is logically equivalent to
87+
* having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but
88+
* Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
89+
*/
90+
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
91+
(@transient sc: SparkContext) extends UnaryNode {
7992
override def otherCopyArgs = sc :: Nil
8093

81-
def output = child.output
94+
override def output = child.output
8295

8396
@transient
8497
lazy val ordering = new RowOrdering(sortOrder)
@@ -87,7 +100,7 @@ case class TopK(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
87100

88101
// TODO: Terminal split should be implemented differently from non-terminal split.
89102
// TODO: Pick num splits based on |limit|.
90-
def execute() = sc.makeRDD(executeCollect(), 1)
103+
override def execute() = sc.makeRDD(executeCollect(), 1)
91104
}
92105

93106

@@ -102,15 +115,15 @@ case class Sort(
102115
@transient
103116
lazy val ordering = new RowOrdering(sortOrder)
104117

105-
def execute() = attachTree(this, "sort") {
118+
override def execute() = attachTree(this, "sort") {
106119
// TODO: Optimize sorting operation?
107120
child.execute()
108121
.mapPartitions(
109122
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator,
110123
preservesPartitioning = true)
111124
}
112125

113-
def output = child.output
126+
override def output = child.output
114127
}
115128

116129
object ExistingRdd {
@@ -131,6 +144,6 @@ object ExistingRdd {
131144
}
132145

133146
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
134-
def execute() = rdd
147+
override def execute() = rdd
135148
}
136149

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
188188
val hiveContext = self
189189

190190
override val strategies: Seq[Strategy] = Seq(
191-
TopK,
191+
TakeOrdered,
192192
ParquetOperations,
193193
HiveTableScans,
194194
DataSinks,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ object HiveQl {
530530

531531
val withLimit =
532532
limitClause.map(l => nodeToExpr(l.getChildren.head))
533-
.map(StopAfter(_, withSort))
533+
.map(Limit(_, withSort))
534534
.getOrElse(withSort)
535535

536536
// TOK_INSERT_INTO means to add files to the table.
@@ -603,7 +603,7 @@ object HiveQl {
603603
case Token("TOK_TABLESPLITSAMPLE",
604604
Token("TOK_ROWCOUNT", Nil) ::
605605
Token(count, Nil) :: Nil) =>
606-
StopAfter(Literal(count.toInt), relation)
606+
Limit(Literal(count.toInt), relation)
607607
case Token("TOK_TABLESPLITSAMPLE",
608608
Token("TOK_PERCENT", Nil) ::
609609
Token(fraction, Nil) :: Nil) =>

0 commit comments

Comments
 (0)