Skip to content

Commit cca1dda

Browse files
committed
top-most limit should not cause memory leak
1 parent 12411b5 commit cca1dda

File tree

4 files changed

+19
-1
lines changed

4 files changed

+19
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7272
execution.TakeOrderedAndProjectExec(
7373
limit, order, projectList, planLater(child)) :: Nil
7474
case logical.Limit(IntegerLiteral(limit), child) =>
75-
execution.CollectLimitExec(limit, planLater(child)) :: Nil
75+
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
76+
// `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which
77+
// calls `child.executeTake`. If child supports whole stage codegen, adding this
78+
// `LocalLimitExec` can break the input consuming loop inside whole stage codegen and
79+
// trigger the resource releasing work, after we consume `limit` rows.
80+
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
7681
case other => planLater(other) :: Nil
7782
}
7883
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,10 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
474474
}
475475

476476
private def supportCodegen(plan: SparkPlan): Boolean = plan match {
477+
// Do not enable whole stage codegen for a single limit.
478+
case limit: BaseLimitExec if !limit.child.isInstanceOf[CodegenSupport] ||
479+
!limit.child.asInstanceOf[CodegenSupport].supportCodegen =>
480+
false
477481
case plan: CodegenSupport if plan.supportCodegen =>
478482
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
479483
// the generated code will be huge if there are too many columns

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
5454
val limit: Int
5555
override def output: Seq[Attribute] = child.output
5656

57+
override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))
58+
59+
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
60+
5761
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
5862
iter.take(limit)
5963
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
26582658
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
26592659
}
26602660
}
2661+
2662+
test("SPARK-21743: top-most limit should not cause memory leak") {
2663+
// In unit test, Spark will fail the query if memory leak detected.
2664+
spark.range(100).groupBy("id").count().limit(1).collect()
2665+
}
26612666
}

0 commit comments

Comments
 (0)