Skip to content

[SPARK-2706][SQL] Enable Spark to support Hive 0.13 #2241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
7d5fce2
test
zhzhan Aug 8, 2014
42585ec
test
zhzhan Aug 8, 2014
70ffd93
revert
zhzhan Aug 8, 2014
fe0f379
Merge branch 'master' of https://github.com/zhzhan/spark
zhzhan Aug 8, 2014
70964fe
revert
zhzhan Aug 8, 2014
dbedff3
Merge remote-tracking branch 'upstream/master'
zhzhan Aug 29, 2014
ba14f28
test
zhzhan Aug 8, 2014
f6a8a40
revert
zhzhan Aug 8, 2014
cb53a2c
Merge branch 'master' of https://github.com/apache/spark
zhzhan Aug 30, 2014
789ea21
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
f896b2a
Merge branch 'master' into spark-2706
zhzhan Sep 2, 2014
921e914
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
87ebf3b
Merge branch 'master' into spark-2706
zhzhan Sep 2, 2014
94b4fdc
Spark-2706: hive-0.13.1 support on spark
zhzhan Sep 2, 2014
e4c1982
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 5, 2014
05d3683
solve conflicts
zhzhan Sep 6, 2014
5f5619f
restructure the directory and different hive version support
zhzhan Sep 10, 2014
af9feb9
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 10, 2014
128b60b
ignore 0.12.0 test cases for the time being
zhzhan Sep 11, 2014
1ccd7cc
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 16, 2014
f4af934
rebase
zhzhan Sep 16, 2014
9412d24
address review comments
zhzhan Sep 17, 2014
2b0d513
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 17, 2014
57ea52e
Merge branch 'master' into spark-2706
zhzhan Sep 17, 2014
3ee3b2b
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
d48bd18
address review comments
zhzhan Sep 22, 2014
68deb11
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
d7c3e1e
Merge branch 'master' into spark-2706
zhzhan Sep 22, 2014
7e0cc36
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 25, 2014
dc7bdb3
solve conflicts
zhzhan Sep 25, 2014
d10bf00
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 30, 2014
3dd50e8
solve conflicts and remove unnecessary implicts
zhzhan Oct 1, 2014
adf4924
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
d9b981d
rebase and fix error due to rollback
zhzhan Oct 1, 2014
3ced0d7
rebase
zhzhan Oct 1, 2014
cedcc6f
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
d3aa3f2
Merge branch 'master' into spark-2706
zhzhan Oct 1, 2014
6bc9204
rebase and remove temparory repo
zhzhan Oct 2, 2014
10c3565
address review comments
zhzhan Oct 5, 2014
301eb4a
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 5, 2014
f7912a9
rebase and solve review feedback
zhzhan Oct 5, 2014
20f6cf7
solve compatability issue
zhzhan Oct 7, 2014
cb22863
correct the typo
zhzhan Oct 9, 2014
a72c0d4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 9, 2014
2b50502
rebase
zhzhan Oct 9, 2014
b0478c0
Changes to simplify the build of SPARK-2706
pwendell Oct 13, 2014
4cb1b93
Merge pull request #1 from pwendell/pr-2241
zhzhan Oct 13, 2014
4a2e36d
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
ab028d1
rebase
zhzhan Oct 13, 2014
8fad1cf
change the pom file and make hive-0.13.1 as the default
zhzhan Oct 13, 2014
497b0f4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
0d4d2ed
rebase
zhzhan Oct 13, 2014
cbb4691
change run-test for new options
zhzhan Oct 14, 2014
410b668
solve review comments
zhzhan Oct 16, 2014
3ece905
minor fix
zhzhan Oct 21, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<id>hive-0.12.0</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ CURRENT_BLOCK=$BLOCK_BUILD

{
# We always build with Hive because the PySpark Spark SQL tests need it.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive"
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"

echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"

Expand All @@ -167,7 +167,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled.
# This must be a single argument, as it is.
if [ -n "$_RUN_SQL_TESTS" ]; then
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive"
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"
fi

if [ -n "$_SQL_TESTS_ONLY" ]; then
Expand Down
26 changes: 17 additions & 9 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,20 @@ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package
{% endhighlight %}

<!--- TODO: Update this when Hive 0.13 JDBC is added -->

# Building With Hive and JDBC Support
To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` profile to your existing build options.
add the `-Phive` profile to your existing build options. By default Spark
will build with Hive 0.13.1 bindings. You can also build for Hive 0.12.0 using
the `-Phive-0.12.0` profile. NOTE: currently the JDBC server is only
supported for Hive 0.12.0.
{% highlight bash %}
# Apache Hadoop 2.4.X with Hive support
# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package

# Apache Hadoop 2.4.X with Hive 12 support
mvn -Pyarn -Phive-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package
{% endhighlight %}

# Spark Tests in Maven
Expand All @@ -111,8 +119,8 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o

Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence:

mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive clean package
mvn -Pyarn -Phadoop-2.3 -Phive test
mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-0.12.0 clean package
mvn -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 test

The ScalaTest plugin also supports running only a specific test suite as follows:

Expand Down Expand Up @@ -175,21 +183,21 @@ can be set to control the SBT build. For example:

Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence:

sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly
sbt/sbt -Pyarn -Phadoop-2.3 -Phive test
sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 assembly
sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 test

To run only a specific test suite as follows:

sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite"
sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 "test-only org.apache.spark.repl.ReplSuite"

To run test suites of a specific sub project as follows:

sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test
sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 core/test

# Speeding up Compilation with Zinc

[Zinc](https://github.com/typesafehub/zinc) is a long-running server version of SBT's incremental
compiler. When run locally as a background process, it speeds up builds of Scala-based projects
like Spark. Developers who regularly recompile Spark with Maven will be the most interested in
Zinc. The project site gives instructions for building and running `zinc`; OS X users can
install it using `brew install zinc`.
install it using `brew install zinc`.
29 changes: 24 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@
<hbase.version>0.94.6</hbase.version>
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<!-- Version used in Maven Hive dependency -->
<hive.version>0.13.1</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
Expand Down Expand Up @@ -441,7 +445,7 @@
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.4.2.0</version>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
Expand Down Expand Up @@ -1272,16 +1276,31 @@
</dependency>
</dependencies>
</profile>

<profile>
<id>hive</id>
<id>hive-0.12.0</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<modules>
<module>sql/hive-thriftserver</module>
</modules>
<properties>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<hive.version.short>0.12.0</hive.version.short>
<derby.version>10.4.2.0</derby.version>
</properties>
</profile>
<profile>
<id>hive-0.13.1</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<hive.version>0.13.1</hive.version>
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
</properties>
</profile>

</profiles>
</project>
37 changes: 31 additions & 6 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -116,7 +111,6 @@
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>hive</id>
Expand Down Expand Up @@ -144,6 +138,19 @@
</plugins>
</build>
</profile>
<profile>
<id>hive-0.12.0</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand All @@ -154,6 +161,24 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-default-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v${hive.version.short}/src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- Deploy datanucleus jars to the spark/lib_managed/jars directory -->
<plugin>
Expand Down
23 changes: 10 additions & 13 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.io.TimestampWritable
import org.apache.hadoop.hive.serde2.io.DateWritable

Expand All @@ -47,6 +46,7 @@ import org.apache.spark.sql.execution.ExtractPythonUdfs
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
import org.apache.spark.sql.hive.HiveShim

/**
* DEPRECATED: Use HiveContext instead.
Expand Down Expand Up @@ -171,13 +171,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

val tableParameters = relation.hiveQlTable.getParameters
val oldTotalSize =
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L)
Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize))
.map(_.toLong)
.getOrElse(0L)
val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString)
tableParameters.put(HiveShim.getStatsSetupConstTotalSize, newTotalSize.toString)
val hiveTTable = relation.hiveQlTable.getTTable
hiveTTable.setParameters(tableParameters)
val tableFullName =
Expand Down Expand Up @@ -282,29 +284,24 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
*/
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = {
try {
// Session state must be initilized before the CommandProcessor is created .
SessionState.start(sessionState)

val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf)
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)

proc match {
case driver: Driver =>
driver.init()

val results = new JArrayList[String]
val results = HiveShim.createDriverResultsArray
val response: CommandProcessorResponse = driver.run(cmd)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
driver.destroy()
driver.close()
throw new QueryExecutionException(response.getErrorMessage)
}
driver.setMaxRows(maxRows)
driver.getResults(results)
driver.destroy()
results
driver.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick question since I ran into something similar when trying to run things on 0.13: can the driver be re-used after you call close()? Because I don't see the driver being removed from the CommandProcessorFactory cache, so another call to runHive will reuse the closed driver.

(I ran into this when running the HiveFromSpark example compiled against Hive 0.13, so it should be easy to check.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the driver is not removed from mapDriver of CommandProcessorFactory, and will be reused.

CommandProcessorFactory.clean(hiveConf) will destroy and remove the driver from mapDriver.

HiveShim.processResults(results)
case _ =>
sessionState.out.println(tokens(0) + " " + cmd_1)
Seq(proc.run(cmd_1).getResponseCode.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.HiveShim

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -149,7 +150,7 @@ private[hive] trait HiveInspectors {
case l: Long => l: java.lang.Long
case l: Short => l: java.lang.Short
case l: Byte => l: java.lang.Byte
case b: BigDecimal => new HiveDecimal(b.underlying())
case b: BigDecimal => HiveShim.createDecimal(b.underlying())
case b: Array[Byte] => b
case d: java.sql.Date => d
case t: java.sql.Timestamp => t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.util.parsing.combinator.RegexParsers
import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.Deserializer

import org.apache.spark.Logging
Expand All @@ -34,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.util.Utils

/* Implicit conversions */
Expand All @@ -56,7 +56,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = client.getTable(databaseName, tblName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
client.getAllPartitionsForPruner(table).toSeq
HiveShim.getAllPartitionsOf(client, table).toSeq
} else {
Nil
}
Expand Down Expand Up @@ -185,7 +185,7 @@ object HiveMetastoreTypes extends RegexParsers {
"bigint" ^^^ LongType |
"binary" ^^^ BinaryType |
"boolean" ^^^ BooleanType |
"decimal" ^^^ DecimalType |
HiveShim.metastoreDecimal ^^^ DecimalType |
"date" ^^^ DateType |
"timestamp" ^^^ TimestampType |
"varchar\\((\\d+)\\)".r ^^^ StringType
Expand Down Expand Up @@ -272,13 +272,13 @@ private[hive] case class MetastoreRelation
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE))
Option(hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize))
.map(_.toLong)
.getOrElse(sqlContext.defaultSizeInBytes))
}
)

val tableDesc = new TableDesc(
val tableDesc = HiveShim.getTableDesc(
Class.forName(
hiveQlTable.getSerializationLib,
true,
Expand Down
16 changes: 14 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.hive

import java.sql.Date

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
Expand Down Expand Up @@ -216,7 +217,18 @@ private[hive] object HiveQl {
/**
* Returns the AST for the given SQL string.
*/
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))
def getAst(sql: String): ASTNode = {
/*
* Context has to be passed in hive0.13.1.
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
val hContext = new Context(new HiveConf())
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}


/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser(sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.hive.HiveShim

/**
* A trait for subclasses that handle table scans.
Expand Down Expand Up @@ -142,7 +143,7 @@ class HadoopTableReader(
filterOpt: Option[PathFilter]): RDD[Row] = {
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getPartitionPath
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
Expand Down
Loading