17
17
18
18
package org .apache .spark .sql .hive .thriftserver
19
19
20
- import java .security .PrivilegedExceptionAction
21
20
import java .sql .{Date , Timestamp }
22
- import java .util .concurrent .Future
23
21
import java .util .{ArrayList => JArrayList , List => JList , Map => JMap }
24
22
25
23
import scala .collection .JavaConversions ._
26
24
import scala .collection .mutable .{ArrayBuffer , Map => SMap }
27
25
import scala .math ._
28
26
29
- import org .apache .hadoop .hive .conf .HiveConf
30
27
import org .apache .hadoop .hive .metastore .api .FieldSchema
31
- import org .apache .hadoop .hive .ql .metadata .Hive
32
- import org .apache .hadoop .hive .ql .session .SessionState
33
- import org .apache .hadoop .hive .shims .ShimLoader
34
28
import org .apache .hadoop .security .UserGroupInformation
35
29
import org .apache .hive .service .cli ._
36
30
import org .apache .hive .service .cli .operation .ExecuteStatementOperation
37
31
import org .apache .hive .service .cli .session .HiveSession
38
32
39
33
import org .apache .spark .Logging
34
+ import org .apache .spark .sql .catalyst .plans .logical .SetCommand
40
35
import org .apache .spark .sql .catalyst .types ._
41
36
import org .apache .spark .sql .hive .thriftserver .ReflectionUtils ._
42
37
import org .apache .spark .sql .hive .{HiveContext , HiveMetastoreTypes }
43
- import org .apache .spark .sql .{SchemaRDD , Row => SparkRow }
38
+ import org .apache .spark .sql .{Row => SparkRow , SQLConf , SchemaRDD }
44
39
45
40
/**
46
41
* A compatibility layer for interacting with Hive version 0.13.1.
47
42
*/
48
43
private [thriftserver] object HiveThriftServerShim {
49
44
val version = " 0.13.1"
50
45
51
- def setServerUserName (sparkServiceUGI : UserGroupInformation , sparkCliService: SparkSQLCLIService ) = {
46
+ def setServerUserName (
47
+ sparkServiceUGI : UserGroupInformation ,
48
+ sparkCliService: SparkSQLCLIService ) = {
52
49
setSuperField(sparkCliService, " serviceUGI" , sparkServiceUGI)
53
50
}
54
51
}
@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
72
69
confOverlay : JMap [String , String ],
73
70
runInBackground : Boolean = true )(
74
71
hiveContext : HiveContext ,
75
- sessionToActivePool : SMap [HiveSession , String ]) extends ExecuteStatementOperation (
76
- parentSession, statement, confOverlay, runInBackground) with Logging {
72
+ sessionToActivePool : SMap [HiveSession , String ])
73
+ // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
74
+ extends ExecuteStatementOperation (parentSession, statement, confOverlay, false ) with Logging {
77
75
78
76
private var result : SchemaRDD = _
79
77
private var iter : Iterator [SparkRow ] = _
80
78
private var dataTypes : Array [DataType ] = _
81
79
82
- private def runInternal (cmd : String ) = {
83
- try {
84
- result = hiveContext.sql(cmd)
85
- logDebug(result.queryExecution.toString())
86
- val groupId = round(random * 1000000 ).toString
87
- hiveContext.sparkContext.setJobGroup(groupId, statement)
88
- iter = {
89
- val useIncrementalCollect =
90
- hiveContext.getConf(" spark.sql.thriftServer.incrementalCollect" , " false" ).toBoolean
91
- if (useIncrementalCollect) {
92
- result.toLocalIterator
93
- } else {
94
- result.collect().iterator
95
- }
96
- }
97
- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
98
- } catch {
99
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
100
- // HiveServer will silently swallow them.
101
- case e : Throwable =>
102
- setState(OperationState .ERROR )
103
- logError(" Error executing query:" ,e)
104
- throw new HiveSQLException (e.toString)
105
- }
106
- }
107
-
108
80
def close (): Unit = {
109
81
// RDDs will be cleaned automatically upon garbage collection.
110
82
logDebug(" CLOSING" )
@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
182
154
}
183
155
}
184
156
185
- private def getConfigForOperation : HiveConf = {
186
- var sqlOperationConf : HiveConf = getParentSession.getHiveConf
187
- if (! getConfOverlay.isEmpty || shouldRunAsync) {
188
- sqlOperationConf = new HiveConf (sqlOperationConf)
189
- import scala .collection .JavaConversions ._
190
- for (confEntry <- getConfOverlay.entrySet) {
191
- try {
192
- sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
193
- }
194
- catch { case e : IllegalArgumentException =>
195
- throw new HiveSQLException (" Error applying statement specific settings" , e)
196
- }
197
- }
198
- }
199
- sqlOperationConf
200
- }
201
-
202
157
def run (): Unit = {
203
158
logInfo(s " Running query ' $statement' " )
204
- val opConfig : HiveConf = getConfigForOperation
205
159
setState(OperationState .RUNNING )
206
- setHasResultSet(true )
207
-
208
- if (! shouldRunAsync) {
209
- runInternal(statement)
210
- setState(OperationState .FINISHED )
211
- } else {
212
- val parentSessionState = SessionState .get
213
- val sessionHive : Hive = Hive .get
214
- val currentUGI : UserGroupInformation = ShimLoader .getHadoopShims.getUGIForConf(opConfig)
215
-
216
- val backgroundOperation : Runnable = new Runnable {
217
- def run () {
218
- val doAsAction : PrivilegedExceptionAction [AnyRef ] =
219
- new PrivilegedExceptionAction [AnyRef ] {
220
- def run : AnyRef = {
221
- Hive .set(sessionHive)
222
- SessionState .setCurrentSessionState(parentSessionState)
223
- try {
224
- runInternal(statement)
225
- }
226
- catch { case e : HiveSQLException =>
227
- setOperationException(e)
228
- logError(" Error running hive query: " , e)
229
- }
230
- null
231
- }
232
- }
233
- try {
234
- ShimLoader .getHadoopShims.doAs(currentUGI, doAsAction)
235
- }
236
- catch { case e : Exception =>
237
- setOperationException(new HiveSQLException (e))
238
- logError(" Error running hive query as user : " + currentUGI.getShortUserName, e)
239
- }
240
- setState(OperationState .FINISHED )
241
- }
160
+ try {
161
+ result = hiveContext.sql(statement)
162
+ logDebug(result.queryExecution.toString())
163
+ result.queryExecution.logical match {
164
+ case SetCommand (Some ((SQLConf .THRIFTSERVER_POOL , Some (value)))) =>
165
+ sessionToActivePool(parentSession) = value
166
+ logInfo(s " Setting spark.scheduler.pool= $value for future statements in this session. " )
167
+ case _ =>
242
168
}
243
169
244
- try {
245
- val backgroundHandle : Future [_] = getParentSession.getSessionManager.
246
- submitBackgroundOperation(backgroundOperation)
247
- setBackgroundHandle(backgroundHandle)
248
- } catch {
249
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
250
- // HiveServer will silently swallow them.
251
- case e : Throwable =>
252
- logError(" Error executing query:" ,e)
253
- throw new HiveSQLException (e.toString)
170
+ val groupId = round(random * 1000000 ).toString
171
+ hiveContext.sparkContext.setJobGroup(groupId, statement)
172
+ sessionToActivePool.get(parentSession).foreach { pool =>
173
+ hiveContext.sparkContext.setLocalProperty(" spark.scheduler.pool" , pool)
174
+ }
175
+ iter = {
176
+ val useIncrementalCollect =
177
+ hiveContext.getConf(" spark.sql.thriftServer.incrementalCollect" , " false" ).toBoolean
178
+ if (useIncrementalCollect) {
179
+ result.toLocalIterator
180
+ } else {
181
+ result.collect().iterator
182
+ }
254
183
}
184
+ dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
185
+ setHasResultSet(true )
186
+ } catch {
187
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
188
+ // HiveServer will silently swallow them.
189
+ case e : Throwable =>
190
+ setState(OperationState .ERROR )
191
+ logError(" Error executing query:" , e)
192
+ throw new HiveSQLException (e.toString)
255
193
}
194
+ setState(OperationState .FINISHED )
256
195
}
257
196
}
0 commit comments