Skip to content

Commit 7880909

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-21743][SQL][FOLLOW-UP] top-most limit should not cause memory leak
## What changes were proposed in this pull request? This is a follow-up of apache#18955 , to fix a bug that we break whole stage codegen for `Limit`. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#18993 from cloud-fan/bug.
1 parent 23ea898 commit 7880909

File tree

3 files changed

+20
-30
lines changed

3 files changed

+20
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,7 @@ object DecimalAggregates extends Rule[LogicalPlan] {
11711171
* Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to
11721172
* another LocalRelation.
11731173
*
1174-
* This is relatively simple as it currently handles only a single case: Project.
1174+
* This is relatively simple as it currently handles only 2 single case: Project and Limit.
11751175
*/
11761176
object ConvertToLocalRelation extends Rule[LogicalPlan] {
11771177
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -1180,6 +1180,9 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
11801180
val projection = new InterpretedProjection(projectList, output)
11811181
projection.initialize(0)
11821182
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
1183+
1184+
case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
1185+
LocalRelation(output, data.take(limit))
11831186
}
11841187

11851188
private def hasUnevaluableExpr(expr: Expression): Boolean = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
6363
*/
6464
object SpecialLimits extends Strategy {
6565
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
66-
case logical.ReturnAnswer(rootPlan) => rootPlan match {
67-
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
68-
execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
69-
case logical.Limit(
70-
IntegerLiteral(limit),
71-
logical.Project(projectList, logical.Sort(order, true, child))) =>
72-
execution.TakeOrderedAndProjectExec(
73-
limit, order, projectList, planLater(child)) :: Nil
74-
case logical.Limit(IntegerLiteral(limit), child) =>
75-
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
76-
// `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
77-
// calls `child.executeTake`. If child supports whole stage codegen, adding this
78-
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
79-
// resource releasing work, after we consume `limit` rows.
80-
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
66+
case ReturnAnswer(rootPlan) => rootPlan match {
67+
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
68+
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
69+
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
70+
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
71+
case Limit(IntegerLiteral(limit), child) =>
72+
// With whole stage codegen, Spark releases resources only when all the output data of the
73+
// query plan are consumed. It's possible that `CollectLimitExec` only consumes a little
74+
// data from child plan and finishes the query without releasing resources. Here we wrap
75+
// the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and
76+
// trigger the resource releasing work, after we consume `limit` rows.
77+
CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
8178
case other => planLater(other) :: Nil
8279
}
83-
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
84-
execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
85-
case logical.Limit(
86-
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
87-
execution.TakeOrderedAndProjectExec(
88-
limit, order, projectList, planLater(child)) :: Nil
80+
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
81+
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
82+
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
83+
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
8984
case _ => Nil
9085
}
9186
}

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,6 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
5454
val limit: Int
5555
override def output: Seq[Attribute] = child.output
5656

57-
// Do not enable whole stage codegen for a single limit.
58-
override def supportCodegen: Boolean = child match {
59-
case plan: CodegenSupport => plan.supportCodegen
60-
case _ => false
61-
}
62-
63-
override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))
64-
6557
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
6658
iter.take(limit)
6759
}

0 commit comments

Comments
 (0)