Skip to content

Commit cd1d411

Browse files
marmbrusyhuai
authored andcommitted
[SPARK-6908] [SQL] Use isolated Hive client
This PR switches Spark SQL's Hive support to use the isolated hive client interface introduced by apache#5851, instead of directly interacting with the client. By using this isolated client we can now allow users to dynamically configure the version of Hive that they are connecting to by setting `spark.sql.hive.metastore.version` without the need recompile. This also greatly reduces the surface area for our interaction with the hive libraries, hopefully making it easier to support other versions in the future. Jars for the desired hive version can be configured using `spark.sql.hive.metastore.jars`, which accepts the following options: - a colon-separated list of jar files or directories for hive and hadoop. - `builtin` - attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive. - `maven` - download the correct version of hive on demand from maven. By default, `builtin` is used for Hive 13. This PR also removes the test step for building against Hive 12, as this will no longer be required to talk to Hive 12 metastores. However, the full removal of the Shim is deferred until a later PR. Remaining TODOs: - Remove the Hive Shims and inline code for Hive 13. - Several HiveCompatibility tests are not yet passing. - `nullformatCTAS` - As detailed below, we now are handling CTAS parsing ourselves instead of hacking into the Hive semantic analyzer. However, we currently only handle the common cases and not things like CTAS where the null format is specified. - `combine1` now leaks state about compression somehow, breaking all subsequent tests. As such we currently add it to the blacklist - `part_inherit_tbl_props` and `part_inherit_tbl_props_with_star` do not work anymore. We are correctly propagating the information - "load_dyn_part14.*" - These tests pass when run on their own, but fail when run with all other tests. It seems our `RESET` mechanism may not be as robust as it used to be? Other required changes: - `CreateTableAsSelect` no longer carries parts of the HiveQL AST with it through the query execution pipeline. Instead, we parse CTAS during the HiveQL conversion and construct a `HiveTable`. The full parsing here is not yet complete as detailed above in the remaining TODOs. Since the operator is Hive specific, it is moved to the hive package. - `Command` is simplified to be a trait that simply acts as a marker for a LogicalPlan that should be eagerly evaluated. Author: Michael Armbrust <[email protected]> Closes apache#5876 from marmbrus/useIsolatedClient and squashes the following commits: 258d000 [Michael Armbrust] really really correct path handling e56fd4a [Michael Armbrust] getAbsolutePath 5a259f5 [Michael Armbrust] fix typos 81bb366 [Michael Armbrust] comments from vanzin 5f3945e [Michael Armbrust] Merge remote-tracking branch 'origin/master' into useIsolatedClient 4b5cd41 [Michael Armbrust] yin's comments f5de7de [Michael Armbrust] cleanup 11e9c72 [Michael Armbrust] better coverage in versions suite 7e8f010 [Michael Armbrust] better error messages and jar handling e7b3941 [Michael Armbrust] more permisive checking for function registration da91ba7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into useIsolatedClient 5fe5894 [Michael Armbrust] fix serialization suite 81711c4 [Michael Armbrust] Initial support for running without maven 1d8ae44 [Michael Armbrust] fix final tests? 1c50813 [Michael Armbrust] more comments a3bee70 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into useIsolatedClient a6f5df1 [Michael Armbrust] style ab07f7e [Michael Armbrust] WIP 4d8bf02 [Michael Armbrust] Remove hive 12 compilation 8843a25 [Michael Armbrust] [SPARK-6908] [SQL] Use isolated Hive client
1 parent 22ab70e commit cd1d411

File tree

33 files changed

+782
-671
lines changed

33 files changed

+782
-671
lines changed

dev/run-tests

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -142,29 +142,6 @@ CURRENT_BLOCK=$BLOCK_BUILD
142142

143143
{
144144
HIVE_BUILD_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
145-
HIVE_12_BUILD_ARGS="$HIVE_BUILD_ARGS -Phive-0.12.0"
146-
147-
# First build with Hive 0.12.0 to ensure patches do not break the Hive 0.12.0 build
148-
echo "[info] Compile with Hive 0.12.0"
149-
[ -d "lib_managed" ] && rm -rf lib_managed
150-
echo "[info] Building Spark with these arguments: $HIVE_12_BUILD_ARGS"
151-
152-
if [ "${AMPLAB_JENKINS_BUILD_TOOL}" == "maven" ]; then
153-
build/mvn $HIVE_12_BUILD_ARGS clean package -DskipTests
154-
else
155-
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
156-
# (either resolution or compilation) prompts the user for input either q, r, etc
157-
# to quit or retry. This echo is there to make it not block.
158-
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
159-
# single argument!
160-
# QUESTION: Why doesn't 'yes "q"' work?
161-
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
162-
echo -e "q\n" \
163-
| build/sbt $HIVE_12_BUILD_ARGS clean hive/compile hive-thriftserver/compile \
164-
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
165-
fi
166-
167-
# Then build with default Hive version (0.13.1) because tests are based on this version
168145
echo "[info] Compile with Hive 0.13.1"
169146
[ -d "lib_managed" ] && rm -rf lib_managed
170147
echo "[info] Building Spark with these arguments: $HIVE_BUILD_ARGS"

project/MimaExcludes.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ object MimaExcludes {
8989
ProblemFilters.exclude[MissingMethodProblem](
9090
"org.apache.spark.mllib.linalg.Vector.numActives")
9191
) ++ Seq(
92+
// Execution should never be included as its always internal.
93+
MimaBuild.excludeSparkPackage("sql.execution"),
9294
// This `protected[sql]` method was removed in 1.3.1
9395
ProblemFilters.exclude[MissingMethodProblem](
9496
"org.apache.spark.sql.SQLContext.checkAnalysis"),

project/SparkBuild.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ object SparkBuild extends PomBuild {
193193
* Usage: `build/sbt sparkShell`
194194
*/
195195
val sparkShell = taskKey[Unit]("start a spark-shell.")
196+
val sparkSql = taskKey[Unit]("starts the spark sql CLI.")
196197

197198
enable(Seq(
198199
connectInput in run := true,
@@ -203,6 +204,12 @@ object SparkBuild extends PomBuild {
203204

204205
sparkShell := {
205206
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
207+
},
208+
209+
javaOptions in Compile += "-Dspark.master=local",
210+
211+
sparkSql := {
212+
(runMain in Compile).toTask(" org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver").value
206213
}
207214
))(assembly)
208215

@@ -497,7 +504,7 @@ object TestSettings {
497504
// Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
498505
// launched by the tests have access to the correct test-time classpath.
499506
envVars in Test ++= Map(
500-
"SPARK_DIST_CLASSPATH" ->
507+
"SPARK_DIST_CLASSPATH" ->
501508
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
502509
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
503510
javaOptions in Test += "-Dspark.test.home=" + sparkHome,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,6 @@ case class InsertIntoTable(
149149
}
150150
}
151151

152-
case class CreateTableAsSelect[T](
153-
databaseName: Option[String],
154-
tableName: String,
155-
child: LogicalPlan,
156-
allowExisting: Boolean,
157-
desc: Option[T] = None) extends UnaryNode {
158-
override def output: Seq[Attribute] = Seq.empty[Attribute]
159-
override lazy val resolved: Boolean = databaseName != None && childrenResolved
160-
}
161-
162152
/**
163153
* A container for holding named common table expressions (CTEs) and a query plan.
164154
* This operator will be removed during analysis and the relations will be substituted into child.
@@ -184,10 +174,10 @@ case class WriteToFile(
184174
}
185175

186176
/**
187-
* @param order The ordering expressions
188-
* @param global True means global sorting apply for entire data set,
177+
* @param order The ordering expressions
178+
* @param global True means global sorting apply for entire data set,
189179
* False means sorting only apply within the partition.
190-
* @param child Child logical plan
180+
* @param child Child logical plan
191181
*/
192182
case class Sort(
193183
order: Seq[SortOrder],

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
2121

2222
/**
2323
* A logical node that represents a non-query command to be executed by the system. For example,
24-
* commands can be used by parsers to represent DDL operations.
24+
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
25+
* eagerly executed.
2526
*/
26-
abstract class Command extends LeafNode {
27-
self: Product =>
28-
def output: Seq[Attribute] = Seq.empty
29-
}
27+
trait Command

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SqlParserSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import org.apache.spark.sql.catalyst.expressions.Attribute
2021
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2122
import org.apache.spark.sql.catalyst.plans.logical.Command
2223
import org.scalatest.FunSuite
2324

24-
private[sql] case class TestCommand(cmd: String) extends Command
25+
private[sql] case class TestCommand(cmd: String) extends LogicalPlan with Command {
26+
override def output: Seq[Attribute] = Seq.empty
27+
override def children: Seq[LogicalPlan] = Seq.empty
28+
}
2529

2630
private[sql] class SuperLongKeywordTestParser extends AbstractSparkSQLParser {
2731
protected val EXECUTE = Keyword("THISISASUPERLONGKEYWORDTEST")

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ class DataFrame private[sql](
143143
// happen right away to let these side effects take place eagerly.
144144
case _: Command |
145145
_: InsertIntoTable |
146-
_: CreateTableAsSelect[_] |
147146
_: CreateTableUsingAsSelect |
148147
_: WriteToFile =>
149148
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import org.apache.spark.{Partition, SparkContext}
7070
* spark-sql> SELECT * FROM src LIMIT 1;
7171
*
7272
*-- Exception will be thrown and switch to dialect
73-
*-- "sql" (for SQLContext) or
73+
*-- "sql" (for SQLContext) or
7474
*-- "hiveql" (for HiveContext)
7575
* }}}
7676
*/
@@ -107,7 +107,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
107107
/**
108108
* @return Spark SQL configuration
109109
*/
110-
protected[sql] def conf = tlSession.get().conf
110+
protected[sql] def conf = currentSession().conf
111111

112112
/**
113113
* Set Spark SQL configuration properties.
@@ -1197,13 +1197,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
11971197
|${stringOrError(executedPlan)}
11981198
""".stripMargin.trim
11991199

1200-
override def toString: String =
1200+
override def toString: String = {
1201+
def output =
1202+
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
1203+
12011204
// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
12021205
// however, the `toRdd` will cause the real execution, which is not what we want.
12031206
// We need to think about how to avoid the side effect.
12041207
s"""== Parsed Logical Plan ==
12051208
|${stringOrError(logical)}
12061209
|== Analyzed Logical Plan ==
1210+
|${stringOrError(output)}
12071211
|${stringOrError(analyzed)}
12081212
|== Optimized Logical Plan ==
12091213
|${stringOrError(optimizedPlan)}
@@ -1212,6 +1216,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
12121216
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
12131217
|== RDD ==
12141218
""".stripMargin.trim
1219+
}
12151220
}
12161221

12171222
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
3232
* A logical command that is executed for its side-effects. `RunnableCommand`s are
3333
* wrapped in `ExecutedCommand` during execution.
3434
*/
35-
trait RunnableCommand extends logical.Command {
35+
private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
3636
self: Product =>
3737

38+
override def output: Seq[Attribute] = Seq.empty
39+
override def children: Seq[LogicalPlan] = Seq.empty
3840
def run(sqlContext: SQLContext): Seq[Row]
3941
}
4042

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,10 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel
269269
*/
270270
private[sql] case class DescribeCommand(
271271
table: LogicalPlan,
272-
isExtended: Boolean) extends Command {
273-
override val output = Seq(
272+
isExtended: Boolean) extends LogicalPlan with Command {
273+
274+
override def children: Seq[LogicalPlan] = Seq.empty
275+
override val output: Seq[Attribute] = Seq(
274276
// Column names are based on Hive.
275277
AttributeReference("col_name", StringType, nullable = false,
276278
new MetadataBuilder().putString("comment", "name of the column").build())(),
@@ -292,7 +294,11 @@ private[sql] case class CreateTableUsing(
292294
temporary: Boolean,
293295
options: Map[String, String],
294296
allowExisting: Boolean,
295-
managedIfNoPath: Boolean) extends Command
297+
managedIfNoPath: Boolean) extends LogicalPlan with Command {
298+
299+
override def output: Seq[Attribute] = Seq.empty
300+
override def children: Seq[LogicalPlan] = Seq.empty
301+
}
296302

297303
/**
298304
* A node used to support CTAS statements and saveAsTable for the data source API.
@@ -318,7 +324,7 @@ private[sql] case class CreateTempTableUsing(
318324
provider: String,
319325
options: Map[String, String]) extends RunnableCommand {
320326

321-
def run(sqlContext: SQLContext): Seq[Row] = {
327+
override def run(sqlContext: SQLContext): Seq[Row] = {
322328
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
323329
sqlContext.registerDataFrameAsTable(
324330
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
@@ -333,7 +339,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
333339
options: Map[String, String],
334340
query: LogicalPlan) extends RunnableCommand {
335341

336-
def run(sqlContext: SQLContext): Seq[Row] = {
342+
override def run(sqlContext: SQLContext): Seq[Row] = {
337343
val df = DataFrame(sqlContext, query)
338344
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
339345
sqlContext.registerDataFrameAsTable(

0 commit comments

Comments
 (0)