Skip to content

Commit 681243a

Browse files
committed
optimize limit using coalesce
1 parent 5e7a6dc commit 681243a

File tree

1 file changed

+5
-10
lines changed

1 file changed

+5
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,20 +148,15 @@ case class Limit(limit: Int, child: SparkPlan)
148148
}
149149

150150
override def execute() = {
151-
val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
152-
child.execute().mapPartitions { iter =>
153-
iter.take(limit).map(row => (false, row.copy()))
151+
if (sortBasedShuffleOn) {
152+
child.execute().map(_.copy).coalesce(1).mapPartitions { iter =>
153+
iter.take(limit)
154154
}
155155
} else {
156-
child.execute().mapPartitions { iter =>
157-
val mutablePair = new MutablePair[Boolean, Row]()
158-
iter.take(limit).map(row => mutablePair.update(false, row))
156+
child.execute().coalesce(1).mapPartitions { iter =>
157+
iter.take(limit)
159158
}
160159
}
161-
val part = new HashPartitioner(1)
162-
val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)
163-
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
164-
shuffled.mapPartitions(_.take(limit).map(_._2))
165160
}
166161
}
167162

0 commit comments

Comments
 (0)