Skip to content

Commit 011c2d3

Browse files
JkSelfcarsonwang
authored andcommitted
Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times (apache#70)
* Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times * only add the check in prepareExecuteStage method to avoid duplicate check in other methods * small fix
1 parent 1ab87f9 commit 011c2d3

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,18 @@ abstract class QueryStage extends UnaryExecNode {
8585
Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf)
8686
}
8787

88+
private var prepared = false
89+
8890
/**
8991
* Before executing the plan in this query stage, we execute all child stages, optimize the plan
9092
* in this stage and determine the reducer number based on the child stages' statistics. Finally
9193
* we do a codegen for this query stage and update the UI with the new plan.
9294
*/
93-
def prepareExecuteStage(): Unit = {
95+
def prepareExecuteStage(): Unit = synchronized {
96+
// Ensure the prepareExecuteStage method only be executed once.
97+
if (prepared) {
98+
return
99+
}
94100
// 1. Execute childStages
95101
executeChildStages()
96102

@@ -152,6 +158,7 @@ abstract class QueryStage extends UnaryExecNode {
152158
queryExecution.toString,
153159
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)))
154160
}
161+
prepared = true
155162
}
156163

157164
// Caches the created ShuffleRowRDD so we can reuse that.

0 commit comments

Comments
 (0)