Skip to content

Commit ab07f7e

Browse files
committed
WIP
1 parent 4d8bf02 commit ab07f7e

File tree

10 files changed

+73
-23
lines changed

10 files changed

+73
-23
lines changed

project/MimaExcludes.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ object MimaExcludes {
8989
ProblemFilters.exclude[MissingMethodProblem](
9090
"org.apache.spark.mllib.linalg.Vector.numActives")
9191
) ++ Seq(
92+
// Execution should never be included as its always internal.
93+
MimaBuild.excludeSparkPackage("sql.execution"),
9294
// This `protected[sql]` method was removed in 1.3.1
9395
ProblemFilters.exclude[MissingMethodProblem](
9496
"org.apache.spark.sql.SQLContext.checkAnalysis"),

project/SparkBuild.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ object SparkBuild extends PomBuild {
193193
* Usage: `build/sbt sparkShell`
194194
*/
195195
val sparkShell = taskKey[Unit]("start a spark-shell.")
196+
val sparkSql = taskKey[Unit]("starts the spark sql CLI.")
196197

197198
enable(Seq(
198199
connectInput in run := true,
@@ -203,6 +204,9 @@ object SparkBuild extends PomBuild {
203204

204205
sparkShell := {
205206
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
207+
},
208+
sparkSql := {
209+
(runMain in Compile).toTask(" org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver").value
206210
}
207211
))(assembly)
208212

@@ -497,7 +501,7 @@ object TestSettings {
497501
// Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
498502
// launched by the tests have access to the correct test-time classpath.
499503
envVars in Test ++= Map(
500-
"SPARK_DIST_CLASSPATH" ->
504+
"SPARK_DIST_CLASSPATH" ->
501505
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
502506
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
503507
javaOptions in Test += "-Dspark.test.home=" + sparkHome,

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
215215
}
216216

217217
sparkContext.getConf.getAll.foreach {
218-
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
218+
case (key, value) if key.startsWith("spark.sql") =>
219+
println(s"$key=$value")
220+
setConf(key, value)
219221
case _ =>
220222
}
221223

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
3232
* A logical command that is executed for its side-effects. `RunnableCommand`s are
3333
* wrapped in `ExecutedCommand` during execution.
3434
*/
35-
trait RunnableCommand extends LogicalPlan with logical.Command {
35+
private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
3636
self: Product =>
3737

3838
override def output: Seq[Attribute] = Seq.empty

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,16 @@ private[hive] object SparkSQLCLIDriver {
7474
System.exit(1)
7575
}
7676

77-
val sessionState = new CliSessionState(new HiveConf(classOf[SessionState]))
77+
val localMetastore = {
78+
val temp = Utils.createTempDir()
79+
temp.delete()
80+
temp
81+
}
82+
val cliConf = new HiveConf(classOf[SessionState])
83+
// Override the location of the metastore since this is only used for local execution.
84+
cliConf.set(
85+
"javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true")
86+
val sessionState = new CliSessionState(cliConf)
7887

7988
sessionState.in = System.in
8089
try {
@@ -92,9 +101,9 @@ private[hive] object SparkSQLCLIDriver {
92101
// Set all properties specified via command line.
93102
val conf: HiveConf = sessionState.getConf
94103
sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
95-
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
96-
sessionState.getOverriddenConfigurations.put(
97-
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
104+
//conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
105+
//sessionState.getOverriddenConfigurations.put(
106+
// item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
98107
}
99108

100109
SessionState.start(sessionState)
@@ -138,8 +147,9 @@ private[hive] object SparkSQLCLIDriver {
138147
case e: UnsupportedEncodingException => System.exit(3)
139148
}
140149

141-
// use the specified database if specified
142-
cli.processSelectDatabase(sessionState);
150+
if (sessionState.database != null) {
151+
SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
152+
}
143153

144154
// Execute -i init files (always in silent mode)
145155
cli.processInitFiles(sessionState)

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20+
import java.io.PrintStream
21+
2022
import scala.collection.JavaConversions._
2123

2224
import org.apache.spark.scheduler.StatsReportListener
@@ -39,7 +41,6 @@ private[hive] object SparkSQLEnv extends Logging {
3941

4042
sparkConf
4143
.setAppName(s"SparkSQL::${Utils.localHostName()}")
42-
.set("spark.sql.hive.version", HiveShim.version)
4344
.set(
4445
"spark.serializer",
4546
maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
@@ -51,6 +52,12 @@ private[hive] object SparkSQLEnv extends Logging {
5152
sparkContext.addSparkListener(new StatsReportListener())
5253
hiveContext = new HiveContext(sparkContext)
5354

55+
hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
56+
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
57+
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
58+
59+
hiveContext.setConf("spark.sql.hive.version", HiveShim.version)
60+
5461
if (log.isDebugEnabled) {
5562
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
5663
logDebug(s"HiveConf var: $k=$v")

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll wit
539539
diagnosisBuffer.clear()
540540

541541
// Retries up to 3 times with different port numbers if the server fails to start
542-
(1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
542+
Seq.empty.foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
543543
started.orElse {
544544
listeningPort += 1
545545
stopThriftServer()

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
254254
override def setConf(key: String, value: String): Unit = {
255255
super.setConf(key, value)
256256
hiveconf.set(key, value)
257-
runSqlHive(s"SET $key=$value")
257+
executionHive.runSqlHive(s"SET $key=$value")
258+
metadataHive.runSqlHive(s"SET $key=$value")
258259
}
259260

260261
/* A catalyst metadata catalog that points to the Hive Metastore. */

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20+
import java.io.PrintStream
2021
import java.util.{Map => JMap}
2122

2223
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
@@ -88,6 +89,10 @@ trait ClientInterface {
8889
*/
8990
def runSqlHive(sql: String): Seq[String]
9091

92+
def setOut(stream: PrintStream): Unit
93+
def setInfo(stream: PrintStream): Unit
94+
def setError(stream: PrintStream): Unit
95+
9196
/** Returns the names of all tables in the given database. */
9297
def listTables(dbName: String): Seq[String]
9398

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ class ClientWrapper(
6262
with Logging
6363
with ReflectionMagic {
6464

65-
private val conf = new HiveConf(classOf[SessionState])
66-
config.foreach { case (k, v) =>
67-
logDebug(s"Hive Config: $k=$v")
68-
conf.set(k, v)
69-
}
70-
7165
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
7266
private val outputBuffer = new java.io.OutputStream {
7367
var pos: Int = 0
@@ -100,17 +94,30 @@ class ClientWrapper(
10094
val original = Thread.currentThread().getContextClassLoader
10195
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
10296
val ret = try {
103-
val newState = new SessionState(conf)
104-
SessionState.start(newState)
105-
newState.out = new PrintStream(outputBuffer, true, "UTF-8")
106-
newState.err = new PrintStream(outputBuffer, true, "UTF-8")
107-
newState
97+
val oldState = SessionState.get()
98+
if (oldState == null) {
99+
val initialConf = new HiveConf(classOf[SessionState])
100+
config.foreach { case (k, v) =>
101+
logDebug(s"Hive Config: $k=$v")
102+
initialConf.set(k, v)
103+
}
104+
val newState = new SessionState(initialConf)
105+
SessionState.start(newState)
106+
newState.out = new PrintStream(outputBuffer, true, "UTF-8")
107+
newState.err = new PrintStream(outputBuffer, true, "UTF-8")
108+
newState
109+
} else {
110+
oldState
111+
}
108112
} finally {
109113
Thread.currentThread().setContextClassLoader(original)
110114
}
111115
ret
112116
}
113117

118+
/** Returns the configuration for the current session. */
119+
def conf = SessionState.get().getConf
120+
114121
private val client = Hive.get(conf)
115122

116123
/**
@@ -134,6 +141,18 @@ class ClientWrapper(
134141
ret
135142
}
136143

144+
def setOut(stream: PrintStream): Unit = withHiveState {
145+
state.out = stream
146+
}
147+
148+
def setInfo(stream: PrintStream): Unit = withHiveState {
149+
state.info = stream
150+
}
151+
152+
def setError(stream: PrintStream): Unit = withHiveState {
153+
state.err = stream
154+
}
155+
137156
override def currentDatabase: String = withHiveState {
138157
state.getCurrentDatabase
139158
}

0 commit comments

Comments
 (0)