Skip to content

Commit 49b1c5b

Browse files
committed
Reuses existing started SessionState if any
1 parent 3cd6fab commit 49b1c5b

File tree

1 file changed

+29
-14
lines changed

1 file changed

+29
-14
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
224224
}
225225

226226
/**
227-
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
228-
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
229-
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
230-
* in the HiveConf.
227+
* SQLConf and HiveConf contracts:
228+
*
229+
* 1. reuse existing started SessionState if any
230+
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
231+
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
232+
* set in the SQLConf *as well as* in the HiveConf.
231233
*/
232-
@transient lazy val hiveconf = new HiveConf(classOf[SessionState])
233-
@transient protected[hive] lazy val sessionState = {
234-
val ss = new SessionState(hiveconf)
235-
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
236-
SessionState.start(ss)
237-
ss.err = new PrintStream(outputBuffer, true, "UTF-8")
238-
ss.out = new PrintStream(outputBuffer, true, "UTF-8")
239-
240-
ss
241-
}
234+
@transient protected[hive] lazy val (hiveconf, sessionState) =
235+
Option(SessionState.get())
236+
.orElse {
237+
val newState = new SessionState(new HiveConf(classOf[SessionState]))
238+
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
239+
// returned by `SessionState.get()` must be the most recently started one.
240+
SessionState.start(newState)
241+
Some(newState)
242+
}
243+
.map { state =>
244+
setConf(state.getConf.getAllProperties)
245+
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
246+
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
247+
(state.getConf, state)
248+
}
249+
.get
242250

243251
override def setConf(key: String, value: String): Unit = {
244252
super.setConf(key, value)
@@ -288,8 +296,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
288296
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
289297
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
290298

299+
// Makes sure the session represented by the `sessionState` field is activated. This implies
300+
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
301+
// session isolation under multi-user scenarios (i.e. HiveThriftServer2).
302+
// TODO Fix session isolation
303+
SessionState.start(sessionState)
304+
291305
proc match {
292306
case driver: Driver =>
307+
driver.init()
293308
val results = HiveShim.createDriverResultsArray
294309
val response: CommandProcessorResponse = driver.run(cmd)
295310
// Throw an exception if there is an error in query processing.

0 commit comments

Comments
 (0)