@@ -22,20 +22,27 @@ import org.apache.hadoop.hive.conf.HiveConf
22
22
import org .apache .hadoop .hive .conf .HiveConf .ConfVars
23
23
import org .apache .hive .service .cli .thrift .{ThriftBinaryCLIService , ThriftHttpCLIService }
24
24
import org .apache .hive .service .server .{HiveServer2 , ServerOptionsProcessor }
25
+ import org .apache .spark .sql .SQLConf
25
26
26
- import org .apache .spark .Logging
27
+ import org .apache .spark .{ SparkContext , SparkConf , Logging }
27
28
import org .apache .spark .annotation .DeveloperApi
28
29
import org .apache .spark .sql .hive .HiveContext
29
30
import org .apache .spark .sql .hive .thriftserver .ReflectionUtils ._
30
- import org .apache .spark .scheduler .{SparkListenerApplicationEnd , SparkListener }
31
+ import org .apache .spark .scheduler .{SparkListenerJobStart , SparkListenerApplicationEnd , SparkListener }
32
+ import org .apache .spark .sql .hive .thriftserver .ui .ThriftServerTab
31
33
import org .apache .spark .util .Utils
32
34
35
+ import scala .collection .mutable
36
+ import scala .collection .mutable .ArrayBuffer
37
+
33
38
/**
34
39
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
35
40
* `HiveThriftServer2` thrift server.
36
41
*/
37
42
object HiveThriftServer2 extends Logging {
38
43
var LOG = LogFactory .getLog(classOf [HiveServer2 ])
44
+ var uiTab : Option [ThriftServerTab ] = _
45
+ var listener : HiveThriftServer2Listener = _
39
46
40
47
/**
41
48
* :: DeveloperApi ::
@@ -46,7 +53,13 @@ object HiveThriftServer2 extends Logging {
46
53
val server = new HiveThriftServer2 (sqlContext)
47
54
server.init(sqlContext.hiveconf)
48
55
server.start()
49
- sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener (server))
56
+ listener = new HiveThriftServer2Listener (server, sqlContext.conf)
57
+ sqlContext.sparkContext.addSparkListener(listener)
58
+ uiTab = if (sqlContext.sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
59
+ Some (new ThriftServerTab (sqlContext.sparkContext))
60
+ } else {
61
+ None
62
+ }
50
63
}
51
64
52
65
def main (args : Array [String ]) {
@@ -58,30 +71,164 @@ object HiveThriftServer2 extends Logging {
58
71
logInfo(" Starting SparkContext" )
59
72
SparkSQLEnv .init()
60
73
61
- Utils .addShutdownHook { () => SparkSQLEnv .stop() }
74
+ Utils .addShutdownHook { () =>
75
+ SparkSQLEnv .stop()
76
+ uiTab.foreach(_.detach())
77
+ }
62
78
63
79
try {
64
80
val server = new HiveThriftServer2 (SparkSQLEnv .hiveContext)
65
81
server.init(SparkSQLEnv .hiveContext.hiveconf)
66
82
server.start()
67
83
logInfo(" HiveThriftServer2 started" )
68
- SparkSQLEnv .sparkContext.addSparkListener(new HiveThriftServer2Listener (server))
84
+ listener = new HiveThriftServer2Listener (server, SparkSQLEnv .hiveContext.conf)
85
+ SparkSQLEnv .sparkContext.addSparkListener(listener)
86
+ uiTab = if (SparkSQLEnv .sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
87
+ Some (new ThriftServerTab (SparkSQLEnv .sparkContext))
88
+ } else {
89
+ None
90
+ }
69
91
} catch {
70
92
case e : Exception =>
71
93
logError(" Error starting HiveThriftServer2" , e)
72
94
System .exit(- 1 )
73
95
}
74
96
}
75
97
98
+ private [thriftserver] class SessionInfo (
99
+ val sessionId : String ,
100
+ val startTimestamp : Long ,
101
+ val ip : String ,
102
+ val userName : String ) {
103
+ var finishTimestamp : Long = 0L
104
+ var totalExecution : Int = 0
105
+ def totalTime : Long = {
106
+ if (finishTimestamp == 0L ) {
107
+ System .currentTimeMillis - startTimestamp
108
+ } else {
109
+ finishTimestamp - startTimestamp
110
+ }
111
+ }
112
+ }
113
+
114
+ private [thriftserver] object ExecutionState extends Enumeration {
115
+ val STARTED, COMPILED, FAILED, FINISHED = Value
116
+ type ExecutionState = Value
117
+ }
118
+
119
+ private [thriftserver] class ExecutionInfo (
120
+ val statement : String ,
121
+ val sessionId : String ,
122
+ val startTimestamp : Long ,
123
+ val userName : String ) {
124
+ var finishTimestamp : Long = 0L
125
+ var executePlan : String = " "
126
+ var detail : String = " "
127
+ var state : ExecutionState .Value = ExecutionState .STARTED
128
+ val jobId : ArrayBuffer [String ] = ArrayBuffer [String ]()
129
+ var groupId : String = " "
130
+ def totalTime : Long = {
131
+ if (finishTimestamp == 0L ) {
132
+ System .currentTimeMillis - startTimestamp
133
+ } else {
134
+ finishTimestamp - startTimestamp
135
+ }
136
+ }
137
+ }
138
+
139
+
76
140
/**
77
141
* A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
78
142
*/
79
- class HiveThriftServer2Listener (val server : HiveServer2 ) extends SparkListener {
143
+ private [thriftserver] class HiveThriftServer2Listener (
144
+ val server : HiveServer2 ,
145
+ val conf : SQLConf ) extends SparkListener {
146
+
80
147
override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ): Unit = {
81
148
server.stop()
82
149
}
83
- }
84
150
151
+ val sessionList = new mutable.LinkedHashMap [String , SessionInfo ]
152
+ val executionList = new mutable.LinkedHashMap [String , ExecutionInfo ]
153
+ val retainedStatements =
154
+ conf.getConf(SQLConf .THRIFTSERVER_UI_STATEMENT_LIMIT , " 200" ).toInt
155
+ val retainedSessions =
156
+ conf.getConf(SQLConf .THRIFTSERVER_UI_SESSION_LIMIT , " 200" ).toInt
157
+ var totalRunning = 0
158
+
159
+ override def onJobStart (jobStart : SparkListenerJobStart ): Unit = {
160
+ for {
161
+ props <- Option (jobStart.properties)
162
+ groupId <- Option (props.getProperty(SparkContext .SPARK_JOB_GROUP_ID ))
163
+ (_, info) <- executionList if info.groupId == groupId
164
+ } {
165
+ info.jobId += jobStart.jobId.toString
166
+ info.groupId = groupId
167
+ }
168
+ }
169
+
170
+ def onSessionCreated (ip : String , sessionId : String , userName : String = " UNKNOWN" ): Unit = {
171
+ val info = new SessionInfo (sessionId, System .currentTimeMillis, ip, userName)
172
+ sessionList.put(sessionId, info)
173
+ trimSessionIfNecessary()
174
+ }
175
+
176
+ def onSessionClosed (sessionId : String ): Unit = {
177
+ sessionList(sessionId).finishTimestamp = System .currentTimeMillis
178
+ }
179
+
180
+ def onStatementStart (
181
+ id : String ,
182
+ sessionId : String ,
183
+ statement : String ,
184
+ groupId : String ,
185
+ userName : String = " UNKNOWN" ): Unit = {
186
+ val info = new ExecutionInfo (statement, sessionId, System .currentTimeMillis, userName)
187
+ info.state = ExecutionState .STARTED
188
+ executionList.put(id, info)
189
+ trimExecutionIfNecessary()
190
+ sessionList(sessionId).totalExecution += 1
191
+ executionList(id).groupId = groupId
192
+ totalRunning += 1
193
+ }
194
+
195
+ def onStatementParsed (id : String , executionPlan : String ): Unit = {
196
+ executionList(id).executePlan = executionPlan
197
+ executionList(id).state = ExecutionState .COMPILED
198
+ }
199
+
200
+ def onStatementError (id : String , errorMessage : String , errorTrace : String ): Unit = {
201
+ executionList(id).finishTimestamp = System .currentTimeMillis
202
+ executionList(id).detail = errorMessage
203
+ executionList(id).state = ExecutionState .FAILED
204
+ totalRunning -= 1
205
+ }
206
+
207
+ def onStatementFinish (id : String ): Unit = {
208
+ executionList(id).finishTimestamp = System .currentTimeMillis
209
+ executionList(id).state = ExecutionState .FINISHED
210
+ totalRunning -= 1
211
+ }
212
+
213
+ private def trimExecutionIfNecessary () = synchronized {
214
+ if (executionList.size > retainedStatements) {
215
+ val toRemove = math.max(retainedStatements / 10 , 1 )
216
+ executionList.take(toRemove).foreach { s =>
217
+ executionList.remove(s._1)
218
+ }
219
+ }
220
+ }
221
+
222
+ private def trimSessionIfNecessary () = synchronized {
223
+ if (sessionList.size > retainedSessions) {
224
+ val toRemove = math.max(retainedSessions / 10 , 1 )
225
+ sessionList.take(toRemove).foreach { s =>
226
+ sessionList.remove(s._1)
227
+ }
228
+ }
229
+
230
+ }
231
+ }
85
232
}
86
233
87
234
private [hive] class HiveThriftServer2 (hiveContext : HiveContext )
0 commit comments