Skip to content

Commit acb55ae

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown
In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API. Author: Cheng Lian <[email protected]> Closes #3175 from liancheng/fix-op-state and squashes the following commits: 6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown
1 parent 534b231 commit acb55ae

File tree

3 files changed

+21
-29
lines changed

3 files changed

+21
-29
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import scala.collection.JavaConversions._
2121

22-
import java.util.{ArrayList => JArrayList}
23-
2422
import org.apache.commons.lang.exception.ExceptionUtils
2523
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
2624
import org.apache.hadoop.hive.ql.Driver

sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
2525
import scala.math._
2626

2727
import org.apache.hadoop.hive.common.`type`.HiveDecimal
28-
import org.apache.hadoop.hive.conf.HiveConf
2928
import org.apache.hadoop.hive.metastore.api.FieldSchema
30-
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
3129
import org.apache.hadoop.hive.shims.ShimLoader
3230
import org.apache.hadoop.security.UserGroupInformation
3331
import org.apache.hive.service.cli._
@@ -37,9 +35,9 @@ import org.apache.hive.service.cli.session.HiveSession
3735
import org.apache.spark.Logging
3836
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
3937
import org.apache.spark.sql.catalyst.types._
40-
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
41-
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
4238
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
39+
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
40+
import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
4341

4442
/**
4543
* A compatibility layer for interacting with Hive version 0.12.0.
@@ -71,8 +69,9 @@ private[hive] class SparkExecuteStatementOperation(
7169
statement: String,
7270
confOverlay: JMap[String, String])(
7371
hiveContext: HiveContext,
74-
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
75-
parentSession, statement, confOverlay) with Logging {
72+
sessionToActivePool: SMap[HiveSession, String])
73+
extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
74+
7675
private var result: SchemaRDD = _
7776
private var iter: Iterator[SparkRow] = _
7877
private var dataTypes: Array[DataType] = _
@@ -216,6 +215,7 @@ private[hive] class SparkExecuteStatementOperation(
216215
// Actually do need to catch Throwable as some failures don't inherit from Exception and
217216
// HiveServer will silently swallow them.
218217
case e: Throwable =>
218+
setState(OperationState.ERROR)
219219
logError("Error executing query:",e)
220220
throw new HiveSQLException(e.toString)
221221
}

sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
2727
import scala.math._
2828

2929
import org.apache.hadoop.hive.conf.HiveConf
30+
import org.apache.hadoop.hive.metastore.api.FieldSchema
3031
import org.apache.hadoop.hive.ql.metadata.Hive
31-
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
3232
import org.apache.hadoop.hive.ql.session.SessionState
33-
import org.apache.hadoop.hive.metastore.api.FieldSchema
3433
import org.apache.hadoop.hive.shims.ShimLoader
3534
import org.apache.hadoop.security.UserGroupInformation
3635
import org.apache.hive.service.cli._
@@ -39,9 +38,9 @@ import org.apache.hive.service.cli.session.HiveSession
3938

4039
import org.apache.spark.Logging
4140
import org.apache.spark.sql.catalyst.types._
42-
import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
43-
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
4441
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
42+
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
43+
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
4544

4645
/**
4746
* A compatibility layer for interacting with Hive version 0.12.0.
@@ -100,6 +99,7 @@ private[hive] class SparkExecuteStatementOperation(
10099
// Actually do need to catch Throwable as some failures don't inherit from Exception and
101100
// HiveServer will silently swallow them.
102101
case e: Throwable =>
102+
setState(OperationState.ERROR)
103103
logError("Error executing query:",e)
104104
throw new HiveSQLException(e.toString)
105105
}
@@ -194,14 +194,12 @@ private[hive] class SparkExecuteStatementOperation(
194194
try {
195195
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
196196
}
197-
catch {
198-
case e: IllegalArgumentException => {
199-
throw new HiveSQLException("Error applying statement specific settings", e)
200-
}
197+
catch { case e: IllegalArgumentException =>
198+
throw new HiveSQLException("Error applying statement specific settings", e)
201199
}
202200
}
203201
}
204-
return sqlOperationConf
202+
sqlOperationConf
205203
}
206204

207205
def run(): Unit = {
@@ -219,7 +217,7 @@ private[hive] class SparkExecuteStatementOperation(
219217
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
220218

221219
val backgroundOperation: Runnable = new Runnable {
222-
def run {
220+
def run() {
223221
val doAsAction: PrivilegedExceptionAction[AnyRef] =
224222
new PrivilegedExceptionAction[AnyRef] {
225223
def run: AnyRef = {
@@ -228,23 +226,19 @@ private[hive] class SparkExecuteStatementOperation(
228226
try {
229227
runInternal(statement)
230228
}
231-
catch {
232-
case e: HiveSQLException => {
233-
setOperationException(e)
234-
logError("Error running hive query: ", e)
235-
}
229+
catch { case e: HiveSQLException =>
230+
setOperationException(e)
231+
logError("Error running hive query: ", e)
236232
}
237-
return null
233+
null
238234
}
239235
}
240236
try {
241237
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
242238
}
243-
catch {
244-
case e: Exception => {
245-
setOperationException(new HiveSQLException(e))
246-
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
247-
}
239+
catch { case e: Exception =>
240+
setOperationException(new HiveSQLException(e))
241+
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
248242
}
249243
setState(OperationState.FINISHED)
250244
}

0 commit comments

Comments
 (0)