Skip to content

Commit faeca62

Browse files
committed
[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2
`HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems. This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`). Author: Cheng Lian <[email protected]> Closes apache#2887 from liancheng/spark-4037 and squashes the following commits: 8446675 [Cheng Lian] Removes redundant Driver initialization a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times 49b1c5b [Cheng Lian] Reuses existing started SessionState if any 3cd6fab [Cheng Lian] Fixes SPARK-4037 Conflicts: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
1 parent c58c1bb commit faeca62

File tree

4 files changed

+45
-38
lines changed

4 files changed

+45
-38
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import scala.collection.JavaConversions._
21-
2220
import org.apache.commons.logging.LogFactory
2321
import org.apache.hadoop.hive.conf.HiveConf
24-
import org.apache.hadoop.hive.ql.session.SessionState
2522
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
2623
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
2724

@@ -38,24 +35,12 @@ private[hive] object HiveThriftServer2 extends Logging {
3835

3936
def main(args: Array[String]) {
4037
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
41-
4238
if (!optionsProcessor.process(args)) {
4339
System.exit(-1)
4440
}
4541

46-
val ss = new SessionState(new HiveConf(classOf[SessionState]))
47-
48-
// Set all properties specified via command line.
49-
val hiveConf: HiveConf = ss.getConf
50-
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
51-
logDebug(s"HiveConf var: $k=$v")
52-
}
53-
54-
SessionState.start(ss)
55-
5642
logInfo("Starting SparkContext")
5743
SparkSQLEnv.init()
58-
SessionState.start(ss)
5944

6045
Runtime.getRuntime.addShutdownHook(
6146
new Thread() {
@@ -67,7 +52,7 @@ private[hive] object HiveThriftServer2 extends Logging {
6752

6853
try {
6954
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
70-
server.init(hiveConf)
55+
server.init(SparkSQLEnv.hiveContext.hiveconf)
7156
server.start()
7257
logInfo("HiveThriftServer2 started")
7358
} catch {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import org.apache.hadoop.hive.ql.session.SessionState
21-
22-
import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
23-
import org.apache.spark.Logging
20+
import org.apache.spark.scheduler.StatsReportListener
2421
import org.apache.spark.sql.hive.HiveContext
25-
import org.apache.spark.{SparkConf, SparkContext}
22+
import org.apache.spark.{Logging, SparkConf, SparkContext}
23+
import scala.collection.JavaConversions._
2624

2725
/** A singleton object for the master program. The slaves should not access this. */
2826
private[hive] object SparkSQLEnv extends Logging {
@@ -37,10 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
3735
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
3836

3937
sparkContext.addSparkListener(new StatsReportListener())
38+
hiveContext = new HiveContext(sparkContext)
4039

41-
hiveContext = new HiveContext(sparkContext) {
42-
@transient override lazy val sessionState = SessionState.get()
43-
@transient override lazy val hiveconf = sessionState.getConf
40+
if (log.isDebugEnabled) {
41+
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
42+
logDebug(s"HiveConf var: $k=$v")
43+
}
4444
}
4545
}
4646
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
134134
val dataFilePath =
135135
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
136136

137-
val queries = Seq(
138-
"CREATE TABLE test(key INT, val STRING)",
139-
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
140-
"CACHE TABLE test")
137+
val queries =
138+
s"""SET spark.sql.shuffle.partitions=3;
139+
|CREATE TABLE test(key INT, val STRING);
140+
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
141+
|CACHE TABLE test;
142+
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
141143

142144
queries.foreach(statement.execute)
143145

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -222,17 +222,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
222222
}
223223

224224
/**
225-
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
226-
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
227-
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
228-
* in the HiveConf.
225+
* SQLConf and HiveConf contracts:
226+
*
227+
* 1. reuse existing started SessionState if any
228+
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
229+
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
230+
* set in the SQLConf *as well as* in the HiveConf.
229231
*/
230-
@transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState])
231-
@transient protected[hive] lazy val sessionState = {
232-
val ss = new SessionState(hiveconf)
233-
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
234-
ss
235-
}
232+
@transient protected[hive] lazy val (hiveconf, sessionState) =
233+
Option(SessionState.get())
234+
.orElse {
235+
val newState = new SessionState(new HiveConf(classOf[SessionState]))
236+
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
237+
// returned by `SessionState.get()` must be the most recently started one.
238+
SessionState.start(newState)
239+
Some(newState)
240+
}
241+
.map { state =>
242+
setConf(state.getConf.getAllProperties)
243+
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
244+
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
245+
(state.getConf, state)
246+
}
247+
.get
236248

237249
sessionState.err = new PrintStream(outputBuffer, true, "UTF-8")
238250
sessionState.out = new PrintStream(outputBuffer, true, "UTF-8")
@@ -290,6 +302,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
290302

291303
SessionState.start(sessionState)
292304

305+
// Makes sure the session represented by the `sessionState` field is activated. This implies
306+
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
307+
// session isolation under multi-user scenarios (i.e. HiveThriftServer2).
308+
// TODO Fix session isolation
309+
if (SessionState.get() != sessionState) {
310+
SessionState.start(sessionState)
311+
}
312+
293313
proc match {
294314
case driver: Driver =>
295315
driver.init()

0 commit comments

Comments
 (0)