Skip to content

Commit 6f71d0b

Browse files
committed
Makes toHiveString static
1 parent 26fa955 commit 6f71d0b

File tree

4 files changed

+72
-84
lines changed

4 files changed

+72
-84
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
200200

201201
test("Test JDBC query execution") {
202202
withJdbcStatement() { statement =>
203-
val queries =
204-
s"""SET spark.sql.shuffle.partitions=3;
205-
|CREATE TABLE test(key INT, val STRING);
206-
|LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test;
207-
|CACHE TABLE test;
208-
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
203+
val queries = Seq(
204+
"SET spark.sql.shuffle.partitions=3",
205+
"DROP TABLE IF EXISTS test",
206+
"CREATE TABLE test(key INT, val STRING)",
207+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
208+
"CACHE TABLE test")
209209

210210
queries.foreach(statement.execute)
211211

@@ -276,8 +276,9 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
276276

277277
queries.foreach(statement.execute)
278278

279+
val resultSet = statement.executeQuery("SELECT key FROM test_4292")
280+
279281
Seq(238, 86, 311, 27, 165).foreach { key =>
280-
val resultSet = statement.executeQuery("SELECT key FROM test_4292")
281282
resultSet.next()
282283
assert(resultSet.getInt(1) === key)
283284
}

sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,7 @@ private[hive] class SparkExecuteStatementOperation(
137137
to.addColumnValue(
138138
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
139139
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
140-
val hiveString = result
141-
.queryExecution
142-
.asInstanceOf[HiveContext#QueryExecution]
143-
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
140+
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
144141
to.addColumnValue(ColumnValue.stringValue(hiveString))
145142
}
146143
}

sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ private[hive] class SparkExecuteStatementOperation(
112112

113113
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
114114
dataTypes(ordinal) match {
115+
case StringType =>
116+
to += from.getString(ordinal)
115117
case IntegerType =>
116118
to += from.getInt(ordinal)
117119
case BooleanType =>
@@ -132,11 +134,8 @@ private[hive] class SparkExecuteStatementOperation(
132134
to += from.getAs[Date](ordinal)
133135
case TimestampType =>
134136
to += from.getAs[Timestamp](ordinal)
135-
case StringType | BinaryType | _: ArrayType | _: StructType | _: MapType =>
136-
val hiveString = result
137-
.queryExecution
138-
.asInstanceOf[HiveContext#QueryExecution]
139-
.toHiveString(from.get(ordinal) -> dataTypes(ordinal))
137+
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
138+
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
140139
to += hiveString
141140
}
142141
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 59 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,27 @@ package org.apache.spark.sql.hive
1919

2020
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
2121
import java.sql.{Date, Timestamp}
22-
import java.util.{ArrayList => JArrayList}
23-
24-
import org.apache.hadoop.hive.common.`type`.HiveDecimal
25-
import org.apache.spark.sql.catalyst.types.DecimalType
26-
import org.apache.spark.sql.catalyst.types.decimal.Decimal
2722

2823
import scala.collection.JavaConversions._
2924
import scala.language.implicitConversions
30-
import scala.reflect.runtime.universe.{TypeTag, typeTag}
25+
import scala.reflect.runtime.universe.TypeTag
3126

32-
import org.apache.hadoop.fs.FileSystem
33-
import org.apache.hadoop.fs.Path
27+
import org.apache.hadoop.fs.{FileSystem, Path}
3428
import org.apache.hadoop.hive.conf.HiveConf
3529
import org.apache.hadoop.hive.ql.Driver
3630
import org.apache.hadoop.hive.ql.metadata.Table
3731
import org.apache.hadoop.hive.ql.processors._
3832
import org.apache.hadoop.hive.ql.session.SessionState
39-
import org.apache.hadoop.hive.serde2.io.TimestampWritable
40-
import org.apache.hadoop.hive.serde2.io.DateWritable
33+
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
4134

4235
import org.apache.spark.SparkContext
43-
import org.apache.spark.rdd.RDD
4436
import org.apache.spark.sql._
4537
import org.apache.spark.sql.catalyst.ScalaReflection
46-
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators}
47-
import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry}
38+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
4839
import org.apache.spark.sql.catalyst.plans.logical._
49-
import org.apache.spark.sql.execution.ExtractPythonUdfs
50-
import org.apache.spark.sql.execution.QueryExecutionException
51-
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
40+
import org.apache.spark.sql.catalyst.types.DecimalType
41+
import org.apache.spark.sql.catalyst.types.decimal.Decimal
42+
import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
5243
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
5344
import org.apache.spark.sql.sources.DataSourceStrategy
5445

@@ -136,7 +127,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
136127
val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
137128

138129
relation match {
139-
case relation: MetastoreRelation => {
130+
case relation: MetastoreRelation =>
140131
// This method is mainly based on
141132
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
142133
// in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -147,7 +138,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
147138
// countFileSize to count the table size.
148139
def calculateTableSize(fs: FileSystem, path: Path): Long = {
149140
val fileStatus = fs.getFileStatus(path)
150-
val size = if (fileStatus.isDir) {
141+
val size = if (fileStatus.isDirectory) {
151142
fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum
152143
} else {
153144
fileStatus.getLen
@@ -157,7 +148,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
157148
}
158149

159150
def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
160-
val path = table.getPath()
151+
val path = table.getPath
161152
var size: Long = 0L
162153
try {
163154
val fs = path.getFileSystem(conf)
@@ -187,15 +178,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
187178
val hiveTTable = relation.hiveQlTable.getTTable
188179
hiveTTable.setParameters(tableParameters)
189180
val tableFullName =
190-
relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName()
181+
relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
191182

192183
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
193184
}
194-
}
195185
case otherRelation =>
196186
throw new NotImplementedError(
197187
s"Analyze has only implemented for Hive tables, " +
198-
s"but ${tableName} is a ${otherRelation.nodeName}")
188+
s"but $tableName is a ${otherRelation.nodeName}")
199189
}
200190
}
201191

@@ -374,50 +364,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
374364
/** Extends QueryExecution with hive specific features. */
375365
protected[sql] abstract class QueryExecution extends super.QueryExecution {
376366

377-
protected val primitiveTypes =
378-
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
379-
ShortType, DateType, TimestampType, BinaryType)
380-
381-
protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
382-
case (struct: Row, StructType(fields)) =>
383-
struct.zip(fields).map {
384-
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
385-
}.mkString("{", ",", "}")
386-
case (seq: Seq[_], ArrayType(typ, _)) =>
387-
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
388-
case (map: Map[_,_], MapType(kType, vType, _)) =>
389-
map.map {
390-
case (key, value) =>
391-
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
392-
}.toSeq.sorted.mkString("{", ",", "}")
393-
case (null, _) => "NULL"
394-
case (d: Date, DateType) => new DateWritable(d).toString
395-
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
396-
case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
397-
case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString
398-
HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
399-
case (other, tpe) if primitiveTypes contains tpe => other.toString
400-
}
401-
402-
/** Hive outputs fields of structs slightly differently than top level attributes. */
403-
protected def toHiveStructString(a: (Any, DataType)): String = a match {
404-
case (struct: Row, StructType(fields)) =>
405-
struct.zip(fields).map {
406-
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
407-
}.mkString("{", ",", "}")
408-
case (seq: Seq[_], ArrayType(typ, _)) =>
409-
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
410-
case (map: Map[_, _], MapType(kType, vType, _)) =>
411-
map.map {
412-
case (key, value) =>
413-
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
414-
}.toSeq.sorted.mkString("{", ",", "}")
415-
case (null, _) => "null"
416-
case (s: String, StringType) => "\"" + s + "\""
417-
case (decimal, DecimalType()) => decimal.toString
418-
case (other, tpe) if primitiveTypes contains tpe => other.toString
419-
}
420-
421367
/**
422368
* Returns the result as a hive compatible sequence of strings. For native commands, the
423369
* execution is simply passed back to Hive.
@@ -435,8 +381,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
435381
// We need the types so we can output struct field names
436382
val types = analyzed.output.map(_.dataType)
437383
// Reformat to match hive tab delimited output.
438-
val asString = result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq
439-
asString
384+
result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq
440385
}
441386

442387
override def simpleString: String =
@@ -447,3 +392,49 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
447392
}
448393
}
449394
}
395+
396+
object HiveContext {
397+
protected val primitiveTypes =
398+
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
399+
ShortType, DateType, TimestampType, BinaryType)
400+
401+
protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
402+
case (struct: Row, StructType(fields)) =>
403+
struct.zip(fields).map {
404+
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
405+
}.mkString("{", ",", "}")
406+
case (seq: Seq[_], ArrayType(typ, _)) =>
407+
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
408+
case (map: Map[_,_], MapType(kType, vType, _)) =>
409+
map.map {
410+
case (key, value) =>
411+
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
412+
}.toSeq.sorted.mkString("{", ",", "}")
413+
case (null, _) => "NULL"
414+
case (d: Date, DateType) => new DateWritable(d).toString
415+
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
416+
case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
417+
case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString
418+
HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
419+
case (other, tpe) if primitiveTypes contains tpe => other.toString
420+
}
421+
422+
/** Hive outputs fields of structs slightly differently than top level attributes. */
423+
protected def toHiveStructString(a: (Any, DataType)): String = a match {
424+
case (struct: Row, StructType(fields)) =>
425+
struct.zip(fields).map {
426+
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
427+
}.mkString("{", ",", "}")
428+
case (seq: Seq[_], ArrayType(typ, _)) =>
429+
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
430+
case (map: Map[_, _], MapType(kType, vType, _)) =>
431+
map.map {
432+
case (key, value) =>
433+
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
434+
}.toSeq.sorted.mkString("{", ",", "}")
435+
case (null, _) => "null"
436+
case (s: String, StringType) => "\"" + s + "\""
437+
case (decimal, DecimalType()) => decimal.toString
438+
case (other, tpe) if primitiveTypes contains tpe => other.toString
439+
}
440+
}

0 commit comments

Comments
 (0)