Skip to content

Commit ad0fde1

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2
`HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems. This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`). Author: Cheng Lian <[email protected]> Closes #2887 from liancheng/spark-4037 and squashes the following commits: 8446675 [Cheng Lian] Removes redundant Driver initialization a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times 49b1c5b [Cheng Lian] Reuses existing started SessionState if any 3cd6fab [Cheng Lian] Fixes SPARK-4037
1 parent f55218a commit ad0fde1

File tree

4 files changed

+44
-45
lines changed

4 files changed

+44
-45
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import scala.collection.JavaConversions._
21-
2220
import org.apache.commons.logging.LogFactory
2321
import org.apache.hadoop.hive.conf.HiveConf
24-
import org.apache.hadoop.hive.ql.session.SessionState
2522
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
2623
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
2724

@@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging {
5148

5249
def main(args: Array[String]) {
5350
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
54-
5551
if (!optionsProcessor.process(args)) {
5652
System.exit(-1)
5753
}
5854

59-
val ss = new SessionState(new HiveConf(classOf[SessionState]))
60-
61-
// Set all properties specified via command line.
62-
val hiveConf: HiveConf = ss.getConf
63-
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
64-
logDebug(s"HiveConf var: $k=$v")
65-
}
66-
67-
SessionState.start(ss)
68-
6955
logInfo("Starting SparkContext")
7056
SparkSQLEnv.init()
71-
SessionState.start(ss)
7257

7358
Runtime.getRuntime.addShutdownHook(
7459
new Thread() {
@@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging {
8065

8166
try {
8267
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
83-
server.init(hiveConf)
68+
server.init(SparkSQLEnv.hiveContext.hiveconf)
8469
server.start()
8570
logInfo("HiveThriftServer2 started")
8671
} catch {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import org.apache.hadoop.hive.ql.session.SessionState
21-
22-
import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
23-
import org.apache.spark.Logging
20+
import org.apache.spark.scheduler.StatsReportListener
2421
import org.apache.spark.sql.hive.HiveContext
25-
import org.apache.spark.{SparkConf, SparkContext}
22+
import org.apache.spark.{Logging, SparkConf, SparkContext}
23+
import scala.collection.JavaConversions._
2624

2725
/** A singleton object for the master program. The slaves should not access this. */
2826
private[hive] object SparkSQLEnv extends Logging {
@@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
3735
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
3836

3937
sparkContext.addSparkListener(new StatsReportListener())
38+
hiveContext = new HiveContext(sparkContext)
4039

41-
hiveContext = new HiveContext(sparkContext) {
42-
@transient override lazy val sessionState = {
43-
val state = SessionState.get()
44-
setConf(state.getConf.getAllProperties)
45-
state
40+
if (log.isDebugEnabled) {
41+
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
42+
logDebug(s"HiveConf var: $k=$v")
4643
}
47-
@transient override lazy val hiveconf = sessionState.getConf
4844
}
4945
}
5046
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
150150
val dataFilePath =
151151
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
152152

153-
val queries = Seq(
154-
"CREATE TABLE test(key INT, val STRING)",
155-
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
156-
"CACHE TABLE test")
153+
val queries =
154+
s"""SET spark.sql.shuffle.partitions=3;
155+
|CREATE TABLE test(key INT, val STRING);
156+
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
157+
|CACHE TABLE test;
158+
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
157159

158160
queries.foreach(statement.execute)
159161

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
224224
}
225225

226226
/**
227-
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
228-
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
229-
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
230-
* in the HiveConf.
227+
* SQLConf and HiveConf contracts:
228+
*
229+
* 1. reuse existing started SessionState if any
230+
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
231+
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
232+
* set in the SQLConf *as well as* in the HiveConf.
231233
*/
232-
@transient lazy val hiveconf = new HiveConf(classOf[SessionState])
233-
@transient protected[hive] lazy val sessionState = {
234-
val ss = new SessionState(hiveconf)
235-
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
236-
SessionState.start(ss)
237-
ss.err = new PrintStream(outputBuffer, true, "UTF-8")
238-
ss.out = new PrintStream(outputBuffer, true, "UTF-8")
239-
240-
ss
241-
}
234+
@transient protected[hive] lazy val (hiveconf, sessionState) =
235+
Option(SessionState.get())
236+
.orElse {
237+
val newState = new SessionState(new HiveConf(classOf[SessionState]))
238+
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
239+
// returned by `SessionState.get()` must be the most recently started one.
240+
SessionState.start(newState)
241+
Some(newState)
242+
}
243+
.map { state =>
244+
setConf(state.getConf.getAllProperties)
245+
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
246+
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
247+
(state.getConf, state)
248+
}
249+
.get
242250

243251
override def setConf(key: String, value: String): Unit = {
244252
super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
288296
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
289297
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
290298

299+
// Makes sure the session represented by the `sessionState` field is activated. This implies
300+
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
301+
// session isolation under multi-user scenarios (i.e. HiveThriftServer2).
302+
// TODO Fix session isolation
303+
if (SessionState.get() != sessionState) {
304+
SessionState.start(sessionState)
305+
}
306+
291307
proc match {
292308
case driver: Driver =>
293309
val results = HiveShim.createDriverResultsArray

0 commit comments

Comments
 (0)