Skip to content

Commit 34285fa

Browse files
windpigercarsonwang
authored andcommitted
fix AE job desc (apache#59)
1 parent 40256bf commit 34285fa

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,21 @@ object SQLExecution {
9999
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
100100
}
101101
}
102+
103+
def withExecutionIdAndJobDesc[T](
104+
sc: SparkContext,
105+
executionId: String,
106+
jobDesc: String)(body: => T): T = {
107+
val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
108+
val oldJobDesc = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
109+
110+
try {
111+
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
112+
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, jobDesc)
113+
body
114+
} finally {
115+
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
116+
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, oldJobDesc)
117+
}
118+
}
102119
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.adaptive
1919

2020
import scala.concurrent.{ExecutionContext, Future}
2121
import scala.concurrent.duration.Duration
22-
23-
import org.apache.spark.MapOutputStatistics
24-
import org.apache.spark.broadcast
22+
import org.apache.spark.{MapOutputStatistics, SparkContext, broadcast}
2523
import org.apache.spark.rdd.RDD
2624
import org.apache.spark.sql.catalyst.InternalRow
2725
import org.apache.spark.sql.catalyst.expressions._
@@ -54,14 +52,15 @@ abstract class QueryStage extends UnaryExecNode {
5452
*/
5553
def executeChildStages(): Unit = {
5654
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
55+
val jobDesc = sqlContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
5756

5857
// Handle broadcast stages
5958
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
6059
case bqs: BroadcastQueryStageInput => bqs.childStage
6160
}
6261
val broadcastFutures = broadcastQueryStages.map { queryStage =>
6362
Future {
64-
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
63+
SQLExecution.withExecutionIdAndJobDesc(sqlContext.sparkContext, executionId, jobDesc) {
6564
queryStage.prepareBroadcast()
6665
}
6766
}(QueryStage.executionContext)
@@ -73,7 +72,7 @@ abstract class QueryStage extends UnaryExecNode {
7372
}
7473
val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
7574
Future {
76-
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
75+
SQLExecution.withExecutionIdAndJobDesc(sqlContext.sparkContext, executionId, jobDesc) {
7776
queryStage.execute()
7877
}
7978
}(QueryStage.executionContext)

0 commit comments

Comments
 (0)